Compare commits
No commits in common. "384be969b7631ed7d37b909fb7f8b8c36b8bf408" and "7c87a578d3fc3f7bce6da28ccc49508a098a48a4" have entirely different histories.
384be969b7
...
7c87a578d3
|
@ -4,5 +4,5 @@
|
||||||
|
|
||||||
- error: { lhs: return, rhs: pure }
|
- error: { lhs: return, rhs: pure }
|
||||||
|
|
||||||
- ignore: {name: ["Avoid lambda using `infix`", "Use lambda-case"]}
|
- ignore: {name: "Avoid lambda using `infix`"}
|
||||||
|
|
||||||
|
|
|
@ -4,16 +4,14 @@ NodeID ::= INTEGER (0..115792089237316195423570985008687907853269984665640564039
|
||||||
|
|
||||||
Domain ::= VisibleString
|
Domain ::= VisibleString
|
||||||
|
|
||||||
Partnum ::= INTEGER (0..150)
|
|
||||||
|
|
||||||
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
|
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
|
||||||
|
|
||||||
Request ::= SEQUENCE {
|
Request ::= SEQUENCE {
|
||||||
action Action,
|
action Action,
|
||||||
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
requestID INTEGER,
|
||||||
sender NodeState,
|
sender NodeState,
|
||||||
part Partnum, -- part number of this message, starts at 1
|
parts INTEGER (0..150), -- number of message parts
|
||||||
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
|
part INTEGER (0..150), -- part number of this message, starts at 1
|
||||||
actionPayload CHOICE {
|
actionPayload CHOICE {
|
||||||
queryIDRequestPayload QueryIDRequestPayload,
|
queryIDRequestPayload QueryIDRequestPayload,
|
||||||
joinRequestPayload JoinRequestPayload,
|
joinRequestPayload JoinRequestPayload,
|
||||||
|
@ -27,11 +25,10 @@ Request ::= SEQUENCE {
|
||||||
-- request and response instead of explicit flag
|
-- request and response instead of explicit flag
|
||||||
|
|
||||||
Response ::= SEQUENCE {
|
Response ::= SEQUENCE {
|
||||||
-- requestID of the request responding to
|
responseTo INTEGER,
|
||||||
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
|
||||||
senderID NodeID,
|
senderID NodeID,
|
||||||
part Partnum,
|
parts INTEGER (0..150),
|
||||||
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
|
part INTEGER (0..150),
|
||||||
action Action,
|
action Action,
|
||||||
actionPayload CHOICE {
|
actionPayload CHOICE {
|
||||||
queryIDResponsePayload QueryIDResponsePayload,
|
queryIDResponsePayload QueryIDResponsePayload,
|
||||||
|
@ -47,7 +44,7 @@ NodeState ::= SEQUENCE {
|
||||||
domain Domain,
|
domain Domain,
|
||||||
ipAddr OCTET STRING (SIZE(16)),
|
ipAddr OCTET STRING (SIZE(16)),
|
||||||
dhtPort INTEGER,
|
dhtPort INTEGER,
|
||||||
servicePort INTEGER,
|
apPort INTEGER,
|
||||||
vServerID INTEGER (0..255)
|
vServerID INTEGER (0..255)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,8 +59,8 @@ NodeCache ::= SEQUENCE OF CacheEntry
|
||||||
JoinRequestPayload ::= NULL
|
JoinRequestPayload ::= NULL
|
||||||
|
|
||||||
JoinResponsePayload ::= SEQUENCE {
|
JoinResponsePayload ::= SEQUENCE {
|
||||||
successors SEQUENCE OF NodeState,
|
successors SEQUENCE OF NodeID,
|
||||||
predecessors SEQUENCE OF NodeState,
|
predecessors SEQUENCE OF NodeID,
|
||||||
cache NodeCache
|
cache NodeCache
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,14 +79,14 @@ QueryIDResponsePayload ::= SEQUENCE {
|
||||||
StabiliseRequestPayload ::= NULL
|
StabiliseRequestPayload ::= NULL
|
||||||
|
|
||||||
StabiliseResponsePayload ::= SEQUENCE {
|
StabiliseResponsePayload ::= SEQUENCE {
|
||||||
successors SEQUENCE OF NodeState,
|
successors SEQUENCE OF NodeID,
|
||||||
predecessors SEQUENCE OF NodeState
|
predecessors SEQUENCE OF NodeID
|
||||||
-- ToDo: transfer of handled key data, if newly responsible for it
|
-- ToDo: transfer of handled key data, if newly responsible for it
|
||||||
}
|
}
|
||||||
|
|
||||||
LeaveRequestPayload ::= SEQUENCE {
|
LeaveRequestPayload ::= SEQUENCE {
|
||||||
successors SEQUENCE OF NodeState,
|
successors SEQUENCE OF NodeID,
|
||||||
predecessors SEQUENCE OF NodeState
|
predecessors SEQUENCE OF NodeID
|
||||||
-- ToDo: transfer of own data to newly responsible node
|
-- ToDo: transfer of own data to newly responsible node
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@ category: Network
|
||||||
extra-source-files: CHANGELOG.md
|
extra-source-files: CHANGELOG.md
|
||||||
|
|
||||||
common deps
|
common deps
|
||||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random
|
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute
|
||||||
ghc-options: -Wall
|
ghc-options: -Wall
|
||||||
|
|
||||||
|
|
||||||
|
@ -55,7 +55,7 @@ library
|
||||||
import: deps
|
import: deps
|
||||||
|
|
||||||
-- Modules exported by the library.
|
-- Modules exported by the library.
|
||||||
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes
|
exposed-modules: Hash2Pub.FediChord, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding
|
||||||
|
|
||||||
-- Modules included in this library but not exported.
|
-- Modules included in this library but not exported.
|
||||||
other-modules: Hash2Pub.Utils
|
other-modules: Hash2Pub.Utils
|
||||||
|
|
36
app/Main.hs
36
app/Main.hs
|
@ -1,12 +1,7 @@
|
||||||
module Main where
|
module Main where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Data.IP (IPv6, toHostAddress6)
|
||||||
import Control.Concurrent.STM
|
|
||||||
import Control.Concurrent.STM.TVar
|
|
||||||
import Control.Exception
|
|
||||||
import Data.Either
|
|
||||||
import Data.IP (IPv6, toHostAddress6)
|
|
||||||
import System.Environment
|
import System.Environment
|
||||||
|
|
||||||
import Hash2Pub.FediChord
|
import Hash2Pub.FediChord
|
||||||
|
@ -16,39 +11,22 @@ main = do
|
||||||
-- ToDo: parse and pass config
|
-- ToDo: parse and pass config
|
||||||
-- probably use `tomland` for that
|
-- probably use `tomland` for that
|
||||||
conf <- readConfig
|
conf <- readConfig
|
||||||
-- TODO: first initialise 'RealNode', then the vservers
|
|
||||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||||
(serverSock, thisNode) <- fediChordInit conf
|
(serverSock, thisNode) <- fediChordInit conf
|
||||||
|
print thisNode
|
||||||
|
print serverSock
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
||||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
-- idea: list of bootstrapping nodes, try joining within a timeout
|
||||||
joinedState <- tryBootstrapJoining thisNode
|
-- stop main thread from terminating during development
|
||||||
either (\err -> do
|
getChar
|
||||||
-- handle unsuccessful join
|
|
||||||
|
|
||||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
|
||||||
print =<< readTVarIO thisNode
|
|
||||||
-- launch thread attempting to join on new cache entries
|
|
||||||
_ <- forkIO $ joinOnNewEntriesThread thisNode
|
|
||||||
wait =<< async (fediMainThreads serverSock thisNode)
|
|
||||||
)
|
|
||||||
(\joinedNS -> do
|
|
||||||
-- launch main eventloop with successfully joined state
|
|
||||||
putStrLn "successful join"
|
|
||||||
wait =<< async (fediMainThreads serverSock thisNode)
|
|
||||||
)
|
|
||||||
joinedState
|
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
readConfig :: IO FediChordConf
|
readConfig :: IO FediChordConf
|
||||||
readConfig = do
|
readConfig = do
|
||||||
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
|
confDomainString : ipString : portString : _ <- getArgs
|
||||||
pure $ FediChordConf {
|
pure $ FediChordConf {
|
||||||
confDomain = confDomainString
|
confDomain = confDomainString
|
||||||
, confIP = toHostAddress6 . read $ ipString
|
, confIP = toHostAddress6 . read $ ipString
|
||||||
, confDhtPort = read portString
|
, confDhtPort = read portString
|
||||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
|
||||||
--, confStabiliseInterval = 60
|
|
||||||
, confBootstrapSamplingInterval = 180
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,8 +16,8 @@ import qualified Data.Set as Set
|
||||||
import Data.Time.Clock.POSIX ()
|
import Data.Time.Clock.POSIX ()
|
||||||
import Safe
|
import Safe
|
||||||
|
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.DHTProtocol
|
||||||
import Hash2Pub.ProtocolTypes
|
import Hash2Pub.FediChord
|
||||||
import Hash2Pub.Utils
|
import Hash2Pub.Utils
|
||||||
|
|
||||||
import Debug.Trace
|
import Debug.Trace
|
||||||
|
@ -77,9 +77,6 @@ chunkLength numParts totalSize = ceiling $ (realToFrac totalSize :: Double) / re
|
||||||
-- The number of parts per message is limited to 150 for DOS protection reasons.
|
-- The number of parts per message is limited to 150 for DOS protection reasons.
|
||||||
-- The returned byte strings might exceed the desired maximum length, as only the payload (and not all of them)
|
-- The returned byte strings might exceed the desired maximum length, as only the payload (and not all of them)
|
||||||
-- can be split into multiple parts.
|
-- can be split into multiple parts.
|
||||||
--
|
|
||||||
-- The return type is a Map from part number to encoded part, to be able to acknowledge
|
|
||||||
-- an encoded part without having to decode its number.
|
|
||||||
serialiseMessage :: Int -- maximum message size in bytes
|
serialiseMessage :: Int -- maximum message size in bytes
|
||||||
-> FediChordMessage -- mesage to be serialised in preparation for sending
|
-> FediChordMessage -- mesage to be serialised in preparation for sending
|
||||||
-> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing
|
-> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing
|
||||||
|
@ -103,11 +100,11 @@ serialiseMessage maxBytesLength msg =
|
||||||
modifyMessage i (partNum, pl) pls = (partNum, msg {
|
modifyMessage i (partNum, pl) pls = (partNum, msg {
|
||||||
part = partNum
|
part = partNum
|
||||||
, payload = Just pl
|
, payload = Just pl
|
||||||
, isFinalPart = partNum == fromIntegral i
|
, parts = fromIntegral i
|
||||||
}):pls
|
}):pls
|
||||||
-- part starts at 1
|
-- part starts at 1
|
||||||
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
|
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
|
||||||
payloadParts i = zip [(part msg)..] . splitPayload i <$> actionPayload
|
payloadParts i = zip [1..] . splitPayload i <$> actionPayload
|
||||||
actionPayload = payload msg
|
actionPayload = payload msg
|
||||||
encodedMsgs i = Map.map encodeMsg $ messageParts i
|
encodedMsgs i = Map.map encodeMsg $ messageParts i
|
||||||
maxMsgLength = maximum . fmap BS.length . Map.elems
|
maxMsgLength = maximum . fmap BS.length . Map.elems
|
||||||
|
@ -130,20 +127,20 @@ encodePayload LeaveResponsePayload = [Null]
|
||||||
encodePayload payload'@LeaveRequestPayload{} =
|
encodePayload payload'@LeaveRequestPayload{} =
|
||||||
Start Sequence
|
Start Sequence
|
||||||
: Start Sequence
|
: Start Sequence
|
||||||
: concatMap encodeNodeState (leaveSuccessors payload')
|
: fmap (IntVal . getNodeID) (leaveSuccessors payload')
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, Start Sequence]
|
, Start Sequence]
|
||||||
<> concatMap encodeNodeState (leavePredecessors payload')
|
<> fmap (IntVal . getNodeID) (leavePredecessors payload')
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, End Sequence]
|
, End Sequence]
|
||||||
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
|
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
|
||||||
encodePayload payload'@StabiliseResponsePayload{} =
|
encodePayload payload'@StabiliseResponsePayload{} =
|
||||||
Start Sequence
|
Start Sequence
|
||||||
: Start Sequence
|
: Start Sequence
|
||||||
: concatMap encodeNodeState (stabiliseSuccessors payload')
|
: fmap (IntVal . getNodeID) (stabiliseSuccessors payload')
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, Start Sequence]
|
, Start Sequence]
|
||||||
<> concatMap encodeNodeState (stabilisePredecessors payload')
|
<> fmap (IntVal . getNodeID) (stabilisePredecessors payload')
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, End Sequence]
|
, End Sequence]
|
||||||
encodePayload payload'@StabiliseRequestPayload = [Null]
|
encodePayload payload'@StabiliseRequestPayload = [Null]
|
||||||
|
@ -170,10 +167,10 @@ encodePayload payload'@QueryIDRequestPayload{} = [
|
||||||
encodePayload payload'@JoinResponsePayload{} =
|
encodePayload payload'@JoinResponsePayload{} =
|
||||||
Start Sequence
|
Start Sequence
|
||||||
: Start Sequence
|
: Start Sequence
|
||||||
: concatMap encodeNodeState (joinSuccessors payload')
|
: fmap (IntVal . getNodeID) (joinSuccessors payload')
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, Start Sequence]
|
, Start Sequence]
|
||||||
<> concatMap encodeNodeState (joinPredecessors payload')
|
<> fmap (IntVal . getNodeID) (joinPredecessors payload')
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, Start Sequence]
|
, Start Sequence]
|
||||||
<> concatMap encodeCacheEntry (joinCache payload')
|
<> concatMap encodeCacheEntry (joinCache payload')
|
||||||
|
@ -186,15 +183,15 @@ encodePayload payload'@PingResponsePayload{} =
|
||||||
: concatMap encodeNodeState (pingNodeStates payload')
|
: concatMap encodeNodeState (pingNodeStates payload')
|
||||||
<> [End Sequence]
|
<> [End Sequence]
|
||||||
|
|
||||||
encodeNodeState :: NodeState a => a -> [ASN1]
|
encodeNodeState :: NodeState -> [ASN1]
|
||||||
encodeNodeState ns = [
|
encodeNodeState ns = [
|
||||||
Start Sequence
|
Start Sequence
|
||||||
, IntVal (getNodeID . getNid $ ns)
|
, IntVal (getNodeID . nid $ ns)
|
||||||
, ASN1String . asn1CharacterString Visible $ getDomain ns
|
, ASN1String . asn1CharacterString Visible $ domain ns
|
||||||
, OctetString (ipAddrAsBS $ getIpAddr ns)
|
, OctetString (ipAddrAsBS $ ipAddr ns)
|
||||||
, IntVal (toInteger . getDhtPort $ ns)
|
, IntVal (toInteger . dhtPort $ ns)
|
||||||
, IntVal (toInteger . getServicePort $ ns)
|
, IntVal (maybe 0 toInteger $ apPort ns)
|
||||||
, IntVal (getVServerID ns)
|
, IntVal (vServerID ns)
|
||||||
, End Sequence
|
, End Sequence
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -216,22 +213,23 @@ encodeQueryResult FORWARD{} = Enumerated 1
|
||||||
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
|
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
|
||||||
-> [ASN1]
|
-> [ASN1]
|
||||||
encodeMessage
|
encodeMessage
|
||||||
(Request requestID sender part isFinalPart action requestPayload) =
|
(Request requestID sender parts part action requestPayload) =
|
||||||
Start Sequence
|
Start Sequence
|
||||||
: (Enumerated . fromIntegral . fromEnum $ action)
|
: (Enumerated . fromIntegral . fromEnum $ action)
|
||||||
: IntVal requestID
|
: IntVal requestID
|
||||||
: encodeNodeState sender
|
: encodeNodeState sender
|
||||||
<> [IntVal part
|
<> [
|
||||||
, Boolean isFinalPart]
|
IntVal parts
|
||||||
|
, IntVal part ]
|
||||||
<> maybe [] encodePayload requestPayload
|
<> maybe [] encodePayload requestPayload
|
||||||
<> [End Sequence]
|
<> [End Sequence]
|
||||||
encodeMessage
|
encodeMessage
|
||||||
(Response requestID senderID part isFinalPart action responsePayload) = [
|
(Response responseTo senderID parts part action responsePayload) = [
|
||||||
Start Sequence
|
Start Sequence
|
||||||
, IntVal requestID
|
, IntVal responseTo
|
||||||
, IntVal . getNodeID $ senderID
|
, IntVal . getNodeID $ senderID
|
||||||
|
, IntVal parts
|
||||||
, IntVal part
|
, IntVal part
|
||||||
, Boolean isFinalPart
|
|
||||||
, Enumerated . fromIntegral . fromEnum $ action]
|
, Enumerated . fromIntegral . fromEnum $ action]
|
||||||
<> maybe [] encodePayload responsePayload
|
<> maybe [] encodePayload responsePayload
|
||||||
<> [End Sequence]
|
<> [End Sequence]
|
||||||
|
@ -264,8 +262,8 @@ parseRequest :: Action -> ParseASN1 FediChordMessage
|
||||||
parseRequest action = do
|
parseRequest action = do
|
||||||
requestID <- parseInteger
|
requestID <- parseInteger
|
||||||
sender <- parseNodeState
|
sender <- parseNodeState
|
||||||
|
parts <- parseInteger
|
||||||
part <- parseInteger
|
part <- parseInteger
|
||||||
isFinalPart <- parseBool
|
|
||||||
hasPayload <- hasNext
|
hasPayload <- hasNext
|
||||||
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
||||||
QueryID -> parseQueryIDRequest
|
QueryID -> parseQueryIDRequest
|
||||||
|
@ -274,13 +272,13 @@ parseRequest action = do
|
||||||
Stabilise -> parseStabiliseRequest
|
Stabilise -> parseStabiliseRequest
|
||||||
Ping -> parsePingRequest
|
Ping -> parsePingRequest
|
||||||
|
|
||||||
pure $ Request requestID sender part isFinalPart action payload
|
pure $ Request requestID sender parts part action payload
|
||||||
|
|
||||||
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
||||||
parseResponse requestID = do
|
parseResponse responseTo = do
|
||||||
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
|
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
|
||||||
|
parts <- parseInteger
|
||||||
part <- parseInteger
|
part <- parseInteger
|
||||||
isFinalPart <- parseBool
|
|
||||||
action <- parseEnum :: ParseASN1 Action
|
action <- parseEnum :: ParseASN1 Action
|
||||||
hasPayload <- hasNext
|
hasPayload <- hasNext
|
||||||
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
||||||
|
@ -290,14 +288,7 @@ parseResponse requestID = do
|
||||||
Stabilise -> parseStabiliseResponse
|
Stabilise -> parseStabiliseResponse
|
||||||
Ping -> parsePingResponse
|
Ping -> parsePingResponse
|
||||||
|
|
||||||
pure $ Response requestID senderID part isFinalPart action payload
|
pure $ Response responseTo senderID parts part action payload
|
||||||
|
|
||||||
parseBool :: ParseASN1 Bool
|
|
||||||
parseBool = do
|
|
||||||
i <- getNext
|
|
||||||
case i of
|
|
||||||
Boolean parsed -> pure parsed
|
|
||||||
x -> throwParseError $ "Expected Boolean but got " <> show x
|
|
||||||
|
|
||||||
parseInteger :: ParseASN1 Integer
|
parseInteger :: ParseASN1 Integer
|
||||||
parseInteger = do
|
parseInteger = do
|
||||||
|
@ -334,21 +325,22 @@ parseNull = do
|
||||||
Null -> pure ()
|
Null -> pure ()
|
||||||
x -> throwParseError $ "Expected Null but got " <> show x
|
x -> throwParseError $ "Expected Null but got " <> show x
|
||||||
|
|
||||||
parseNodeState :: ParseASN1 RemoteNodeState
|
parseNodeState :: ParseASN1 NodeState
|
||||||
parseNodeState = onNextContainer Sequence $ do
|
parseNodeState = onNextContainer Sequence $ do
|
||||||
nid' <- fromInteger <$> parseInteger
|
nid' <- fromInteger <$> parseInteger
|
||||||
domain' <- parseString
|
domain' <- parseString
|
||||||
ip' <- bsAsIpAddr <$> parseOctets
|
ip' <- bsAsIpAddr <$> parseOctets
|
||||||
dhtPort' <- fromInteger <$> parseInteger
|
dhtPort' <- fromInteger <$> parseInteger
|
||||||
servicePort' <- fromInteger <$> parseInteger
|
apPort' <- fromInteger <$> parseInteger
|
||||||
vServer' <- parseInteger
|
vServer' <- parseInteger
|
||||||
pure RemoteNodeState {
|
pure NodeState {
|
||||||
nid = nid'
|
nid = nid'
|
||||||
, domain = domain'
|
, domain = domain'
|
||||||
, dhtPort = dhtPort'
|
, dhtPort = dhtPort'
|
||||||
, servicePort = servicePort'
|
, apPort = if apPort' == 0 then Nothing else Just apPort'
|
||||||
, vServerID = vServer'
|
, vServerID = vServer'
|
||||||
, ipAddr = ip'
|
, ipAddr = ip'
|
||||||
|
, internals = Nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -368,8 +360,8 @@ parseJoinRequest = do
|
||||||
|
|
||||||
parseJoinResponse :: ParseASN1 ActionPayload
|
parseJoinResponse :: ParseASN1 ActionPayload
|
||||||
parseJoinResponse = onNextContainer Sequence $ do
|
parseJoinResponse = onNextContainer Sequence $ do
|
||||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||||
cache <- parseNodeCache
|
cache <- parseNodeCache
|
||||||
pure $ JoinResponsePayload {
|
pure $ JoinResponsePayload {
|
||||||
joinSuccessors = succ'
|
joinSuccessors = succ'
|
||||||
|
@ -404,8 +396,8 @@ parseStabiliseRequest = do
|
||||||
|
|
||||||
parseStabiliseResponse :: ParseASN1 ActionPayload
|
parseStabiliseResponse :: ParseASN1 ActionPayload
|
||||||
parseStabiliseResponse = onNextContainer Sequence $ do
|
parseStabiliseResponse = onNextContainer Sequence $ do
|
||||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||||
pure $ StabiliseResponsePayload {
|
pure $ StabiliseResponsePayload {
|
||||||
stabiliseSuccessors = succ'
|
stabiliseSuccessors = succ'
|
||||||
, stabilisePredecessors = pred'
|
, stabilisePredecessors = pred'
|
||||||
|
@ -413,8 +405,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do
|
||||||
|
|
||||||
parseLeaveRequest :: ParseASN1 ActionPayload
|
parseLeaveRequest :: ParseASN1 ActionPayload
|
||||||
parseLeaveRequest = onNextContainer Sequence $ do
|
parseLeaveRequest = onNextContainer Sequence $ do
|
||||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||||
pure $ LeaveRequestPayload {
|
pure $ LeaveRequestPayload {
|
||||||
leaveSuccessors = succ'
|
leaveSuccessors = succ'
|
||||||
, leavePredecessors = pred'
|
, leavePredecessors = pred'
|
||||||
|
|
|
@ -3,146 +3,153 @@ module Hash2Pub.DHTProtocol
|
||||||
, queryLocalCache
|
, queryLocalCache
|
||||||
, addCacheEntry
|
, addCacheEntry
|
||||||
, addCacheEntryPure
|
, addCacheEntryPure
|
||||||
, addNodeAsVerified
|
|
||||||
, addNodeAsVerifiedPure
|
|
||||||
, deleteCacheEntry
|
, deleteCacheEntry
|
||||||
, deserialiseMessage
|
, markCacheEntryAsVerified
|
||||||
, RemoteCacheEntry(..)
|
, RemoteCacheEntry(..)
|
||||||
, toRemoteCacheEntry
|
, toRemoteCacheEntry
|
||||||
, remoteNode
|
, remoteNode_
|
||||||
, Action(..)
|
, Action(..)
|
||||||
, ActionPayload(..)
|
, ActionPayload(..)
|
||||||
, FediChordMessage(..)
|
, FediChordMessage(..)
|
||||||
, maximumParts
|
, maximumParts
|
||||||
, sendQueryIdMessages
|
|
||||||
, requestQueryID
|
|
||||||
, requestJoin
|
|
||||||
, requestPing
|
|
||||||
, requestStabilise
|
|
||||||
, lookupMessage
|
|
||||||
, sendRequestTo
|
|
||||||
, queryIdLookupLoop
|
|
||||||
, queueAddEntries
|
|
||||||
, queueDeleteEntries
|
|
||||||
, queueDeleteEntry
|
|
||||||
, resolve
|
|
||||||
, mkSendSocket
|
|
||||||
, mkServerSocket
|
|
||||||
, handleIncomingRequest
|
|
||||||
, ackRequest
|
|
||||||
, isPossibleSuccessor
|
|
||||||
, isPossiblePredecessor
|
|
||||||
, isJoined
|
|
||||||
, closestCachePredecessors
|
|
||||||
)
|
)
|
||||||
where
|
where
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
import qualified Data.Map as Map
|
||||||
import Control.Concurrent.STM
|
import Data.Maybe (fromMaybe, maybe)
|
||||||
import Control.Concurrent.STM.TBQueue
|
import qualified Data.Set as Set
|
||||||
import Control.Concurrent.STM.TQueue
|
|
||||||
import Control.Concurrent.STM.TVar
|
|
||||||
import Control.Exception
|
|
||||||
import Control.Monad (foldM, forM, forM_)
|
|
||||||
import qualified Data.ByteString as BS
|
|
||||||
import Data.Either (rights)
|
|
||||||
import Data.Foldable (foldl', foldr')
|
|
||||||
import Data.Functor.Identity
|
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
|
||||||
toHostAddress6)
|
|
||||||
import Data.List (delete, nub, sortBy)
|
|
||||||
import qualified Data.Map as Map
|
|
||||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
|
||||||
isNothing, mapMaybe, maybe)
|
|
||||||
import qualified Data.Set as Set
|
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Network.Socket hiding (recv, recvFrom, send,
|
import Network.Socket hiding (recv, recvFrom, send, sendTo)
|
||||||
sendTo)
|
|
||||||
import Network.Socket.ByteString
|
import Network.Socket.ByteString
|
||||||
import Safe
|
|
||||||
import System.Random
|
|
||||||
import System.Timeout
|
|
||||||
|
|
||||||
import Hash2Pub.ASN1Coding
|
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID,
|
||||||
import Hash2Pub.FediChordTypes (CacheEntry (..),
|
NodeState (..),
|
||||||
CacheEntry (..), HasKeyID (..),
|
cacheGetNodeStateUnvalidated,
|
||||||
LocalNodeState (..),
|
cacheLookup, cacheLookupPred,
|
||||||
LocalNodeStateSTM, NodeCache,
|
cacheLookupSucc, getPredecessors,
|
||||||
NodeID, NodeState (..),
|
getSuccessors, localCompare,
|
||||||
RemoteNodeState (..),
|
putPredecessors, putSuccessors)
|
||||||
RingEntry (..), RingMap (..),
|
|
||||||
addRMapEntry, addRMapEntryWith,
|
|
||||||
cacheGetNodeStateUnvalidated,
|
|
||||||
cacheLookup, cacheLookupPred,
|
|
||||||
cacheLookupSucc, genNodeID,
|
|
||||||
getKeyID, localCompare,
|
|
||||||
rMapFromList, rMapLookupPred,
|
|
||||||
rMapLookupSucc,
|
|
||||||
setPredecessors, setSuccessors)
|
|
||||||
import Hash2Pub.ProtocolTypes
|
|
||||||
|
|
||||||
import Debug.Trace (trace)
|
import Debug.Trace (trace)
|
||||||
|
|
||||||
-- === queries ===
|
-- === queries ===
|
||||||
|
|
||||||
|
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^ return closest nodes from local cache.
|
||||||
|
-- whole cache entry is returned for making
|
||||||
|
-- the entry time stamp available to the
|
||||||
|
-- protocol serialiser
|
||||||
|
| FOUND NodeState -- ^node is the responsible node for queried ID
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
-- TODO: evaluate more fine-grained argument passing to allow granular locking
|
-- TODO: evaluate more fine-grained argument passing to allow granular locking
|
||||||
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
|
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
|
||||||
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
|
queryLocalCache :: NodeState -> NodeCache -> Int -> NodeID -> QueryResponse
|
||||||
queryLocalCache ownState nCache lBestNodes targetID
|
queryLocalCache ownState nCache lBestNodes targetID
|
||||||
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
||||||
-- This only makes sense if the node is part of the DHT by having joined.
|
| (targetID `localCompare` ownID) `elem` [LT, EQ] && not (null preds) && (targetID `localCompare` head preds == GT) = FOUND ownState
|
||||||
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
|
|
||||||
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
|
||||||
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
||||||
-- the closest succeeding node (like with the p initiated parallel queries
|
-- the closest succeeding node (like with the p initiated parallel queries
|
||||||
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
|
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
|
||||||
where
|
where
|
||||||
ownID = getNid ownState
|
preds = fromMaybe [] $ getPredecessors ownState
|
||||||
preds = predecessors ownState
|
ownID = nid ownState
|
||||||
|
|
||||||
closestSuccessor :: Set.Set RemoteCacheEntry
|
closestSuccessor :: Set.Set RemoteCacheEntry
|
||||||
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
|
closestSuccessor = maybe Set.empty Set.singleton $ toRemoteCacheEntry =<< cacheLookupSucc targetID nCache
|
||||||
|
|
||||||
|
closestPredecessors :: Set.Set RemoteCacheEntry
|
||||||
|
closestPredecessors = closestPredecessor (lBestNodes-1) $ nid ownState
|
||||||
|
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
|
||||||
|
closestPredecessor 0 _ = Set.empty
|
||||||
|
closestPredecessor remainingLookups lastID
|
||||||
|
| remainingLookups < 0 = Set.empty
|
||||||
|
| otherwise =
|
||||||
|
let result = cacheLookupPred lastID nCache
|
||||||
|
in
|
||||||
|
case toRemoteCacheEntry =<< result of
|
||||||
|
Nothing -> Set.empty
|
||||||
|
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns)
|
||||||
|
|
||||||
-- | look up the 3 direct predecessor cache entries of a given ID
|
-- === protocol serialisation data types
|
||||||
closestCachePredecessors :: (Integral n)
|
|
||||||
=> n -- ^ number of entries to look up
|
|
||||||
-> NodeID -- ^ target ID to get the predecessors of
|
|
||||||
-> NodeCache -- ^ cache to use for lookup
|
|
||||||
-> Set.Set RemoteCacheEntry
|
|
||||||
closestCachePredecessors 0 _ _ = Set.empty
|
|
||||||
closestCachePredecessors remainingLookups lastID nCache
|
|
||||||
| remainingLookups < 0 = Set.empty
|
|
||||||
| otherwise =
|
|
||||||
let result = cacheLookupPred lastID nCache
|
|
||||||
in
|
|
||||||
case toRemoteCacheEntry <$> result of
|
|
||||||
Nothing -> Set.empty
|
|
||||||
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache
|
|
||||||
|
|
||||||
-- | Determines whether a lookup key is within the responsibility slice of a node,
|
data Action =
|
||||||
-- as it falls between its first predecessor and the node itself.
|
QueryID
|
||||||
-- Looks up the successor of the lookup key on a 'RingMap' representation of the
|
| Join
|
||||||
-- predecessor list with the node itself added. If the result is the same as the node
|
| Leave
|
||||||
-- itself then it falls into the responsibility interval.
|
| Stabilise
|
||||||
isInOwnResponsibilitySlice :: HasKeyID a => a -> LocalNodeState -> Bool
|
| Ping
|
||||||
isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs)
|
deriving (Show, Eq, Enum)
|
||||||
where
|
|
||||||
predecessorList = predecessors ownNs
|
|
||||||
-- add node itself to RingMap representation, to distinguish between
|
|
||||||
-- responsibility of own node and predecessor
|
|
||||||
predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList
|
|
||||||
closestPredecessor = headMay predecessorList
|
|
||||||
|
|
||||||
isPossiblePredecessor :: HasKeyID a => a -> LocalNodeState -> Bool
|
data FediChordMessage =
|
||||||
isPossiblePredecessor = isInOwnResponsibilitySlice
|
Request {
|
||||||
|
requestID :: Integer
|
||||||
|
, sender :: NodeState
|
||||||
|
, parts :: Integer
|
||||||
|
, part :: Integer
|
||||||
|
-- ^ part starts at 0
|
||||||
|
, action :: Action
|
||||||
|
, payload :: Maybe ActionPayload
|
||||||
|
}
|
||||||
|
| Response {
|
||||||
|
responseTo :: Integer
|
||||||
|
, senderID :: NodeID
|
||||||
|
, parts :: Integer
|
||||||
|
, part :: Integer
|
||||||
|
, action :: Action
|
||||||
|
, payload :: Maybe ActionPayload
|
||||||
|
} deriving (Show, Eq)
|
||||||
|
|
||||||
isPossibleSuccessor :: HasKeyID a => a -> LocalNodeState -> Bool
|
data ActionPayload =
|
||||||
isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget) successorRMap) == pure (getNid ownNs)
|
QueryIDRequestPayload {
|
||||||
where
|
queryTargetID :: NodeID
|
||||||
successorList = successors ownNs
|
, queryLBestNodes :: Integer
|
||||||
successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList
|
}
|
||||||
closestSuccessor = headMay successorList
|
| JoinRequestPayload
|
||||||
|
| LeaveRequestPayload {
|
||||||
|
leaveSuccessors :: [NodeID]
|
||||||
|
, leavePredecessors :: [NodeID]
|
||||||
|
}
|
||||||
|
| StabiliseRequestPayload
|
||||||
|
| PingRequestPayload
|
||||||
|
| QueryIDResponsePayload {
|
||||||
|
queryResult :: QueryResponse
|
||||||
|
}
|
||||||
|
| JoinResponsePayload {
|
||||||
|
joinSuccessors :: [NodeID]
|
||||||
|
, joinPredecessors :: [NodeID]
|
||||||
|
, joinCache :: [RemoteCacheEntry]
|
||||||
|
}
|
||||||
|
| LeaveResponsePayload
|
||||||
|
| StabiliseResponsePayload {
|
||||||
|
stabiliseSuccessors :: [NodeID]
|
||||||
|
, stabilisePredecessors :: [NodeID]
|
||||||
|
}
|
||||||
|
| PingResponsePayload {
|
||||||
|
pingNodeStates :: [NodeState]
|
||||||
|
}
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
-- | global limit of parts per message used when (de)serialising messages.
|
||||||
|
-- Used to limit the impact of DOS attempts with partial messages.
|
||||||
|
maximumParts :: Num a => a
|
||||||
|
maximumParts = 150
|
||||||
|
|
||||||
|
-- | dedicated data type for cache entries sent to or received from the network,
|
||||||
|
-- as these have to be considered as unvalidated. Also helps with separation of trust.
|
||||||
|
data RemoteCacheEntry = RemoteCacheEntry NodeState POSIXTime
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
instance Ord RemoteCacheEntry where
|
||||||
|
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
|
||||||
|
|
||||||
|
toRemoteCacheEntry :: CacheEntry -> Maybe RemoteCacheEntry
|
||||||
|
toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
|
||||||
|
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
|
||||||
|
toRemoteCacheEntry _ = Nothing
|
||||||
|
|
||||||
|
-- helper function for use in tests
|
||||||
|
remoteNode_ :: RemoteCacheEntry -> NodeState
|
||||||
|
remoteNode_ (RemoteCacheEntry ns _) = ns
|
||||||
|
|
||||||
-- cache operations
|
-- cache operations
|
||||||
|
|
||||||
|
@ -157,18 +164,18 @@ addCacheEntry entry cache = do
|
||||||
|
|
||||||
-- | pure version of 'addCacheEntry' with current time explicitly specified as argument
|
-- | pure version of 'addCacheEntry' with current time explicitly specified as argument
|
||||||
addCacheEntryPure :: POSIXTime -- ^ current time
|
addCacheEntryPure :: POSIXTime -- ^ current time
|
||||||
-> RemoteCacheEntry -- ^ a remote cache entry received from network
|
-> RemoteCacheEntry -- ^ a remote cache entry received from network
|
||||||
-> NodeCache -- ^ node cache to insert to
|
-> NodeCache -- ^ node cache to insert to
|
||||||
-> NodeCache -- ^ new node cache with the element inserted
|
-> NodeCache -- ^ new node cache with the element inserted
|
||||||
addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
|
addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
|
||||||
let
|
let
|
||||||
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
|
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
|
||||||
timestamp' = if ts <= now then ts else now
|
timestamp' = if ts <= now then ts else now
|
||||||
newCache = addRMapEntryWith insertCombineFunction (CacheEntry False ns timestamp') cache
|
newCache = Map.insertWith insertCombineFunction (nid ns) (NodeEntry False ns timestamp') cache
|
||||||
insertCombineFunction newVal@(KeyEntry (CacheEntry newValidationState newNode newTimestamp)) oldVal =
|
insertCombineFunction newVal@(NodeEntry newValidationState newNode newTimestamp) oldVal =
|
||||||
case oldVal of
|
case oldVal of
|
||||||
ProxyEntry n _ -> ProxyEntry n (Just newVal)
|
ProxyEntry n _ -> ProxyEntry n (Just newVal)
|
||||||
KeyEntry (CacheEntry oldValidationState _ oldTimestamp) -> KeyEntry (CacheEntry oldValidationState newNode (max oldTimestamp newTimestamp))
|
NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp)
|
||||||
in
|
in
|
||||||
newCache
|
newCache
|
||||||
|
|
||||||
|
@ -176,30 +183,10 @@ addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
|
||||||
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
|
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
|
||||||
-> NodeCache -- ^cache to delete from
|
-> NodeCache -- ^cache to delete from
|
||||||
-> NodeCache -- ^cache without the specified element
|
-> NodeCache -- ^cache without the specified element
|
||||||
deleteCacheEntry nid = RingMap . Map.update modifier nid . getRingMap
|
deleteCacheEntry = Map.update modifier
|
||||||
where
|
where
|
||||||
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
||||||
modifier KeyEntry {} = Nothing
|
modifier NodeEntry {} = Nothing
|
||||||
|
|
||||||
|
|
||||||
-- | Add a 'RemoteNodeState' to the node cache marked as verified.
|
|
||||||
-- If an entry already exists, it is replaced by the new verified one.
|
|
||||||
addNodeAsVerified :: RemoteNodeState
|
|
||||||
-> NodeCache
|
|
||||||
-> IO NodeCache
|
|
||||||
addNodeAsVerified node cache = do
|
|
||||||
now <- getPOSIXTime
|
|
||||||
pure $ addNodeAsVerifiedPure now node cache
|
|
||||||
|
|
||||||
|
|
||||||
-- | Pure variant of 'addNodeAsVerified' with current time explicitly specified as an argument
|
|
||||||
addNodeAsVerifiedPure :: POSIXTime
|
|
||||||
-> RemoteNodeState
|
|
||||||
-> NodeCache
|
|
||||||
-> NodeCache
|
|
||||||
addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp.
|
-- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp.
|
||||||
markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be
|
markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be
|
||||||
|
@ -207,529 +194,12 @@ markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to
|
||||||
-> NodeID -- ^ which node to mark
|
-> NodeID -- ^ which node to mark
|
||||||
-> NodeCache -- ^ current node cache
|
-> NodeCache -- ^ current node cache
|
||||||
-> NodeCache -- ^ new NodeCache with the updated entry
|
-> NodeCache -- ^ new NodeCache with the updated entry
|
||||||
markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . getRingMap
|
markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
||||||
where
|
where
|
||||||
adjustFunc (KeyEntry (CacheEntry _ ns ts)) = KeyEntry (CacheEntry True ns $ fromMaybe ts timestamp)
|
adjustFunc (NodeEntry _ ns ts) = NodeEntry True ns $ fromMaybe ts timestamp
|
||||||
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
||||||
adjustFunc entry = entry
|
adjustFunc entry = entry
|
||||||
|
|
||||||
|
|
||||||
-- | uses the successor and predecessor list of a node as an indicator for whether a
|
|
||||||
-- node has properly joined the DHT
|
|
||||||
isJoined :: LocalNodeState -> Bool
|
|
||||||
isJoined ns = not . all null $ [successors ns, predecessors ns]
|
|
||||||
|
|
||||||
-- | the size limit to be used when serialising messages for sending
|
|
||||||
sendMessageSize :: Num i => i
|
|
||||||
sendMessageSize = 1200
|
|
||||||
|
|
||||||
-- ====== message send and receive operations ======
|
|
||||||
|
|
||||||
-- encode the response to a request that just signals successful receipt
|
|
||||||
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
|
||||||
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
|
||||||
requestID = requestID req
|
|
||||||
, senderID = ownID
|
|
||||||
, part = part req
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = action req
|
|
||||||
, payload = Nothing
|
|
||||||
}
|
|
||||||
ackRequest _ _ = Map.empty
|
|
||||||
|
|
||||||
|
|
||||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
|
||||||
-- the response to be sent.
|
|
||||||
handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node
|
|
||||||
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
|
||||||
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
|
|
||||||
-> SockAddr -- ^ source address of the request
|
|
||||||
-> IO ()
|
|
||||||
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
|
||||||
putStrLn $ "handling incoming request: " <> show msgSet
|
|
||||||
ns <- readTVarIO nsSTM
|
|
||||||
-- add nodestate to cache
|
|
||||||
now <- getPOSIXTime
|
|
||||||
case headMay . Set.elems $ msgSet of
|
|
||||||
Nothing -> pure ()
|
|
||||||
Just aPart -> do
|
|
||||||
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
|
|
||||||
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
|
||||||
maybe (pure ()) (
|
|
||||||
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
|
||||||
)
|
|
||||||
=<< (case action aPart of
|
|
||||||
Ping -> Just <$> respondPing nsSTM msgSet
|
|
||||||
Join -> Just <$> respondJoin nsSTM msgSet
|
|
||||||
-- ToDo: figure out what happens if not joined
|
|
||||||
QueryID -> Just <$> respondQueryID nsSTM msgSet
|
|
||||||
-- only when joined
|
|
||||||
Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing
|
|
||||||
Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing
|
|
||||||
)
|
|
||||||
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
|
||||||
|
|
||||||
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
|
|
||||||
-- TODO: test case: mixed message types of parts
|
|
||||||
|
|
||||||
|
|
||||||
-- ....... response sending .......
|
|
||||||
|
|
||||||
-- TODO: could all these respond* functions be in STM instead of IO?
|
|
||||||
|
|
||||||
|
|
||||||
-- | execute a key ID lookup on local cache and respond with the result
|
|
||||||
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
||||||
respondQueryID nsSTM msgSet = do
|
|
||||||
putStrLn "responding to a QueryID request"
|
|
||||||
-- this message cannot be split reasonably, so just
|
|
||||||
-- consider the first payload
|
|
||||||
let
|
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
|
||||||
senderID = getNid . sender $ aRequestPart
|
|
||||||
senderPayload = foldr' (\msg plAcc ->
|
|
||||||
if isNothing plAcc && isJust (payload msg)
|
|
||||||
then payload msg
|
|
||||||
else plAcc
|
|
||||||
) Nothing msgSet
|
|
||||||
-- return only empty message serialisation if no payload was included in parts
|
|
||||||
maybe (pure Map.empty) (\senderPayload' -> do
|
|
||||||
responseMsg <- atomically $ do
|
|
||||||
nsSnap <- readTVar nsSTM
|
|
||||||
cache <- readTVar $ nodeCacheSTM nsSnap
|
|
||||||
let
|
|
||||||
responsePayload = QueryIDResponsePayload {
|
|
||||||
queryResult = if isJoined nsSnap
|
|
||||||
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
|
||||||
-- if not joined yet, attract responsibility for
|
|
||||||
-- all keys to make bootstrapping possible
|
|
||||||
else FOUND (toRemoteNodeState nsSnap)
|
|
||||||
}
|
|
||||||
queryResponseMsg = Response {
|
|
||||||
requestID = requestID aRequestPart
|
|
||||||
, senderID = getNid nsSnap
|
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = QueryID
|
|
||||||
, payload = Just responsePayload
|
|
||||||
}
|
|
||||||
pure queryResponseMsg
|
|
||||||
pure $ serialiseMessage sendMessageSize responseMsg
|
|
||||||
) senderPayload
|
|
||||||
|
|
||||||
-- | Respond to a Leave request by removing the leaving node from local data structures
|
|
||||||
-- and confirming with response.
|
|
||||||
-- TODO: copy over key data from leaver and confirm
|
|
||||||
respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
||||||
respondLeave nsSTM msgSet = do
|
|
||||||
-- combine payload of all parts
|
|
||||||
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
|
||||||
(maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg)
|
|
||||||
,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg))
|
|
||||||
)
|
|
||||||
([],[]) msgSet
|
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
|
||||||
senderID = getNid . sender $ aRequestPart
|
|
||||||
responseMsg <- atomically $ do
|
|
||||||
nsSnap <- readTVar nsSTM
|
|
||||||
-- remove leaving node from successors, predecessors and NodeCache
|
|
||||||
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID
|
|
||||||
writeTVar nsSTM $
|
|
||||||
-- add predecessors and successors of leaving node to own lists
|
|
||||||
setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap)
|
|
||||||
. setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
|
|
||||||
-- TODO: handle handover of key data
|
|
||||||
let leaveResponse = Response {
|
|
||||||
requestID = requestID aRequestPart
|
|
||||||
, senderID = getNid nsSnap
|
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = Leave
|
|
||||||
, payload = Just LeaveResponsePayload
|
|
||||||
}
|
|
||||||
pure leaveResponse
|
|
||||||
pure $ serialiseMessage sendMessageSize responseMsg
|
|
||||||
|
|
||||||
-- | respond to stabilise requests by returning successor and predecessor list
|
|
||||||
respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
||||||
respondStabilise nsSTM msgSet = do
|
|
||||||
nsSnap <- readTVarIO nsSTM
|
|
||||||
let
|
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
|
||||||
responsePayload = StabiliseResponsePayload {
|
|
||||||
stabiliseSuccessors = successors nsSnap
|
|
||||||
, stabilisePredecessors = predecessors nsSnap
|
|
||||||
}
|
|
||||||
stabiliseResponse = Response {
|
|
||||||
requestID = requestID aRequestPart
|
|
||||||
, senderID = getNid nsSnap
|
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = Stabilise
|
|
||||||
, payload = Just responsePayload
|
|
||||||
}
|
|
||||||
-- TODO: return service endpoint for copying over key data
|
|
||||||
pure $ serialiseMessage sendMessageSize stabiliseResponse
|
|
||||||
|
|
||||||
|
|
||||||
-- | respond to Ping request by returning all active vserver NodeStates
|
|
||||||
respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
||||||
respondPing nsSTM msgSet = do
|
|
||||||
-- TODO: respond with all active VS when implementing k-choices
|
|
||||||
nsSnap <- readTVarIO nsSTM
|
|
||||||
let
|
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
|
||||||
responsePayload = PingResponsePayload { pingNodeStates = [ toRemoteNodeState nsSnap ] }
|
|
||||||
pingResponse = Response {
|
|
||||||
requestID = requestID aRequestPart
|
|
||||||
, senderID = getNid nsSnap
|
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = Ping
|
|
||||||
, payload = Just responsePayload
|
|
||||||
}
|
|
||||||
pure $ serialiseMessage sendMessageSize pingResponse
|
|
||||||
|
|
||||||
-- this modifies node state, so locking and IO seems to be necessary.
|
|
||||||
-- Still try to keep as much code as possible pure
|
|
||||||
respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
|
||||||
respondJoin nsSTM msgSet = do
|
|
||||||
-- atomically read and modify the node state according to the parsed request
|
|
||||||
responseMsg <- atomically $ do
|
|
||||||
nsSnap <- readTVar nsSTM
|
|
||||||
cache <- readTVar $ nodeCacheSTM nsSnap
|
|
||||||
let
|
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
|
||||||
senderNS = sender aRequestPart
|
|
||||||
responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS)
|
|
||||||
thisNodeResponsible (FOUND _) = True
|
|
||||||
thisNodeResponsible (FORWARD _) = False
|
|
||||||
-- check whether the joining node falls into our responsibility
|
|
||||||
if thisNodeResponsible responsibilityLookup
|
|
||||||
then do
|
|
||||||
-- if yes, adjust own predecessors/ successors and return those in a response
|
|
||||||
let
|
|
||||||
newPreds = senderNS:predecessors nsSnap
|
|
||||||
joinedNS = setPredecessors newPreds nsSnap
|
|
||||||
responsePayload = JoinResponsePayload {
|
|
||||||
joinSuccessors = successors joinedNS
|
|
||||||
, joinPredecessors = predecessors joinedNS
|
|
||||||
, joinCache = toRemoteCache cache
|
|
||||||
}
|
|
||||||
joinResponse = Response {
|
|
||||||
requestID = requestID aRequestPart
|
|
||||||
, senderID = getNid joinedNS
|
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = Join
|
|
||||||
, payload = Just responsePayload
|
|
||||||
}
|
|
||||||
writeTVar nsSTM joinedNS
|
|
||||||
pure joinResponse
|
|
||||||
-- otherwise respond with empty payload
|
|
||||||
else pure Response {
|
|
||||||
requestID = requestID aRequestPart
|
|
||||||
, senderID = getNid nsSnap
|
|
||||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = Join
|
|
||||||
, payload = Nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
pure $ serialiseMessage sendMessageSize responseMsg
|
|
||||||
-- TODO: notify service layer to copy over data now handled by the new joined node
|
|
||||||
|
|
||||||
-- ....... request sending .......
|
|
||||||
|
|
||||||
-- | send a join request and return the joined 'LocalNodeState' including neighbours
|
|
||||||
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
|
|
||||||
-> LocalNodeStateSTM -- ^ joining NodeState
|
|
||||||
-> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information
|
|
||||||
requestJoin toJoinOn ownStateSTM =
|
|
||||||
bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
|
||||||
-- extract own state for getting request information
|
|
||||||
ownState <- readTVarIO ownStateSTM
|
|
||||||
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
|
||||||
(cacheInsertQ, joinedState) <- atomically $ do
|
|
||||||
stateSnap <- readTVar ownStateSTM
|
|
||||||
let
|
|
||||||
(cacheInsertQ, predAccSet, succAccSet) = foldl'
|
|
||||||
(\(insertQ, predAccSet', succAccSet') msg ->
|
|
||||||
let
|
|
||||||
insertQ' = maybe insertQ (\msgPl ->
|
|
||||||
-- collect list of insertion statements into global cache
|
|
||||||
queueAddEntries (joinCache msgPl) : insertQ
|
|
||||||
) $ payload msg
|
|
||||||
-- collect received predecessors and successors
|
|
||||||
predAccSet'' = maybe predAccSet' (
|
|
||||||
foldr' Set.insert predAccSet' . joinPredecessors
|
|
||||||
) $ payload msg
|
|
||||||
succAccSet'' = maybe succAccSet' (
|
|
||||||
foldr' Set.insert succAccSet' . joinSuccessors
|
|
||||||
) $ payload msg
|
|
||||||
in
|
|
||||||
(insertQ', predAccSet'', succAccSet'')
|
|
||||||
)
|
|
||||||
-- reset predecessors and successors
|
|
||||||
([], Set.empty, Set.empty)
|
|
||||||
responses
|
|
||||||
-- sort, slice and set the accumulated successors and predecessors
|
|
||||||
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
|
|
||||||
writeTVar ownStateSTM newState
|
|
||||||
pure (cacheInsertQ, newState)
|
|
||||||
-- execute the cache insertions
|
|
||||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
|
||||||
pure $ if responses == Set.empty
|
|
||||||
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
|
||||||
else if null (predecessors joinedState) && null (successors joinedState)
|
|
||||||
then Left "join error: no predecessors or successors"
|
|
||||||
-- successful join
|
|
||||||
else Right ownStateSTM
|
|
||||||
)
|
|
||||||
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
|
||||||
|
|
||||||
|
|
||||||
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
|
|
||||||
requestQueryID :: LocalNodeState -- ^ NodeState of the querying node
|
|
||||||
-> NodeID -- ^ target key ID to look up
|
|
||||||
-> IO RemoteNodeState -- ^ the node responsible for handling that key
|
|
||||||
-- 1. do a local lookup for the l closest nodes
|
|
||||||
-- 2. create l sockets
|
|
||||||
-- 3. send a message async concurrently to all l nodes
|
|
||||||
-- 4. collect the results, insert them into cache
|
|
||||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
|
||||||
-- TODO: deal with lookup failures
|
|
||||||
requestQueryID ns targetID = do
|
|
||||||
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
|
|
||||||
-- TODO: make maxAttempts configurable
|
|
||||||
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
|
|
||||||
|
|
||||||
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
|
|
||||||
queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState
|
|
||||||
-- return node itself as default fallback value against infinite recursion.
|
|
||||||
-- TODO: consider using an Either instead of a default value
|
|
||||||
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
|
|
||||||
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
|
|
||||||
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
|
|
||||||
-- FOUND can only be returned if targetID is owned by local node
|
|
||||||
case localResult of
|
|
||||||
FOUND thisNode -> pure thisNode
|
|
||||||
FORWARD nodeSet -> do
|
|
||||||
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
|
|
||||||
now <- getPOSIXTime
|
|
||||||
-- check for a FOUND and return it
|
|
||||||
case responseEntries of
|
|
||||||
FOUND foundNode -> pure foundNode
|
|
||||||
-- if not FOUND, insert entries into local cache copy and recurse
|
|
||||||
FORWARD entrySet ->
|
|
||||||
let newLCache = foldr' (
|
|
||||||
addCacheEntryPure now
|
|
||||||
) cacheSnapshot entrySet
|
|
||||||
in
|
|
||||||
queryIdLookupLoop newLCache ns (maxAttempts - 1) targetID
|
|
||||||
|
|
||||||
|
|
||||||
sendQueryIdMessages :: (Integral i)
|
|
||||||
=> NodeID -- ^ target key ID to look up
|
|
||||||
-> LocalNodeState -- ^ node state of the node doing the query
|
|
||||||
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
|
|
||||||
-> [RemoteNodeState] -- ^ nodes to query
|
|
||||||
-> IO QueryResponse -- ^ accumulated response
|
|
||||||
sendQueryIdMessages targetID ns lParam targets = do
|
|
||||||
|
|
||||||
-- create connected sockets to all query targets and use them for request handling
|
|
||||||
-- ToDo: make attempts and timeout configurable
|
|
||||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (
|
|
||||||
sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
|
|
||||||
)) targets
|
|
||||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
|
||||||
-- ToDo: exception handling, maybe log them
|
|
||||||
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
|
||||||
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
|
|
||||||
now <- getPOSIXTime
|
|
||||||
-- collect cache entries from all responses
|
|
||||||
foldM (\acc resp -> do
|
|
||||||
let entrySet = case queryResult <$> payload resp of
|
|
||||||
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
|
|
||||||
Just (FORWARD resultset) -> resultset
|
|
||||||
_ -> Set.empty
|
|
||||||
|
|
||||||
-- forward entries to global cache
|
|
||||||
queueAddEntries entrySet ns
|
|
||||||
-- return accumulated QueryResult
|
|
||||||
pure $ case acc of
|
|
||||||
-- once a FOUND as been encountered, return this as a result
|
|
||||||
isFound@FOUND{} -> isFound
|
|
||||||
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
|
|
||||||
|
|
||||||
) (FORWARD Set.empty) responses
|
|
||||||
|
|
||||||
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
|
||||||
lookupMessage :: Integral i
|
|
||||||
=> NodeID -- ^ target ID
|
|
||||||
-> LocalNodeState -- ^ sender node state
|
|
||||||
-> Maybe i -- ^ optionally provide a different l parameter
|
|
||||||
-> (Integer -> FediChordMessage)
|
|
||||||
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
|
||||||
where
|
|
||||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
|
|
||||||
|
|
||||||
|
|
||||||
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
|
||||||
-- return parsed neighbour lists
|
|
||||||
requestStabilise :: LocalNodeState -- ^ sending node
|
|
||||||
-> RemoteNodeState -- ^ neighbour node to send to
|
|
||||||
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
|
|
||||||
requestStabilise ns neighbour = do
|
|
||||||
responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
|
|
||||||
Request {
|
|
||||||
requestID = rid
|
|
||||||
, sender = toRemoteNodeState ns
|
|
||||||
, part = 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = Stabilise
|
|
||||||
, payload = Just StabiliseRequestPayload
|
|
||||||
}
|
|
||||||
)
|
|
||||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
|
||||||
either
|
|
||||||
-- forward IO error messages
|
|
||||||
(pure . Left)
|
|
||||||
(\respSet -> do
|
|
||||||
-- fold all reply parts together
|
|
||||||
let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
|
||||||
(maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg)
|
|
||||||
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
|
|
||||||
)
|
|
||||||
([],[]) respSet
|
|
||||||
-- update successfully responded neighbour in cache
|
|
||||||
now <- getPOSIXTime
|
|
||||||
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
|
|
||||||
pure $ if null responsePreds && null responseSuccs
|
|
||||||
then Left "no neighbours returned"
|
|
||||||
else Right (responsePreds, responseSuccs)
|
|
||||||
) responses
|
|
||||||
|
|
||||||
|
|
||||||
requestPing :: LocalNodeState -- ^ sending node
|
|
||||||
-> RemoteNodeState -- ^ node to be PINGed
|
|
||||||
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
|
|
||||||
requestPing ns target = do
|
|
||||||
responses <- bracket (mkSendSocket (getDomain target) (getDhtPort target)) close
|
|
||||||
(\sock -> do
|
|
||||||
resp <- sendRequestTo 5000 3 (\rid ->
|
|
||||||
Request {
|
|
||||||
requestID = rid
|
|
||||||
, sender = toRemoteNodeState ns
|
|
||||||
, part = 1
|
|
||||||
, isFinalPart = False
|
|
||||||
, action = Ping
|
|
||||||
, payload = Just PingRequestPayload
|
|
||||||
}
|
|
||||||
) sock
|
|
||||||
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
|
|
||||||
pure $ Right (peerAddr, resp)
|
|
||||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
|
||||||
either
|
|
||||||
-- forward IO error messages
|
|
||||||
(pure . Left)
|
|
||||||
(\(peerAddr, respSet) -> do
|
|
||||||
-- fold all reply parts together
|
|
||||||
let responseVss = foldr' (\msg acc ->
|
|
||||||
maybe acc (foldr' (:) acc) (pingNodeStates <$> payload msg)
|
|
||||||
)
|
|
||||||
[] respSet
|
|
||||||
-- recompute ID for each received node and mark as verified in cache
|
|
||||||
now <- getPOSIXTime
|
|
||||||
forM_ responseVss (\vs ->
|
|
||||||
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
|
|
||||||
in if recomputedID == getNid vs
|
|
||||||
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
|
|
||||||
else pure ()
|
|
||||||
)
|
|
||||||
pure $ if null responseVss
|
|
||||||
then Left "no active vServer IDs returned, ignoring node"
|
|
||||||
else Right responseVss
|
|
||||||
) responses
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
|
||||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
|
||||||
sendRequestTo :: Int -- ^ timeout in seconds
|
|
||||||
-> Int -- ^ number of retries
|
|
||||||
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
|
||||||
-> Socket -- ^ connected socket to use for sending
|
|
||||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
|
||||||
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
|
||||||
-- give the message a random request ID
|
|
||||||
randomID <- randomRIO (0, 2^32-1)
|
|
||||||
let
|
|
||||||
msgComplete = msgIncomplete randomID
|
|
||||||
requests = serialiseMessage sendMessageSize msgComplete
|
|
||||||
putStrLn $ "sending request message " <> show msgComplete
|
|
||||||
-- create a queue for passing received response messages back, even after a timeout
|
|
||||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
|
||||||
-- start sendAndAck with timeout
|
|
||||||
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
|
|
||||||
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
|
||||||
recvdParts <- atomically $ flushTBQueue responseQ
|
|
||||||
pure $ Set.fromList recvdParts
|
|
||||||
where
|
|
||||||
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
|
||||||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
|
||||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
|
||||||
-> IO ()
|
|
||||||
sendAndAck responseQueue sock remainingSends = do
|
|
||||||
sendMany sock $ Map.elems remainingSends
|
|
||||||
-- if all requests have been acked/ responded to, return prematurely
|
|
||||||
recvLoop responseQueue remainingSends Set.empty Nothing
|
|
||||||
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
|
||||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
|
||||||
-> Set.Set Integer -- ^ already received response part numbers
|
|
||||||
-> Maybe Integer -- ^ total number of response parts if already known
|
|
||||||
-> IO ()
|
|
||||||
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
|
|
||||||
-- 65535 is maximum length of UDP packets, as long as
|
|
||||||
-- no IPv6 jumbograms are used
|
|
||||||
response <- deserialiseMessage <$> recv sock 65535
|
|
||||||
case response of
|
|
||||||
Right msg@Response{} -> do
|
|
||||||
atomically $ writeTBQueue responseQueue msg
|
|
||||||
let
|
|
||||||
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
|
|
||||||
newRemaining = Map.delete (part msg) remainingSends'
|
|
||||||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
|
||||||
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
|
||||||
then pure ()
|
|
||||||
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
|
|
||||||
-- drop errors and invalid messages
|
|
||||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
|
|
||||||
|
|
||||||
|
|
||||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
|
||||||
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
|
||||||
-> LocalNodeState
|
|
||||||
-> IO ()
|
|
||||||
queueAddEntries entries ns = do
|
|
||||||
now <- getPOSIXTime
|
|
||||||
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
|
|
||||||
|
|
||||||
|
|
||||||
-- | enque a list of node IDs to be deleted from the global NodeCache
|
|
||||||
queueDeleteEntries :: Foldable c
|
|
||||||
=> c NodeID
|
|
||||||
-> LocalNodeState
|
|
||||||
-> IO ()
|
|
||||||
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
|
|
||||||
|
|
||||||
|
|
||||||
-- | enque a single node ID to be deleted from the global NodeCache
|
|
||||||
queueDeleteEntry :: NodeID
|
|
||||||
-> LocalNodeState
|
|
||||||
-> IO ()
|
|
||||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
|
||||||
|
|
||||||
-- | retry an IO action at most *i* times until it delivers a result
|
-- | retry an IO action at most *i* times until it delivers a result
|
||||||
attempts :: Int -- ^ number of retries *i*
|
attempts :: Int -- ^ number of retries *i*
|
||||||
-> IO (Maybe a) -- ^ action to retry
|
-> IO (Maybe a) -- ^ action to retry
|
||||||
|
@ -740,39 +210,3 @@ attempts i action = do
|
||||||
case actionResult of
|
case actionResult of
|
||||||
Nothing -> attempts (i-1) action
|
Nothing -> attempts (i-1) action
|
||||||
Just res -> pure $ Just res
|
Just res -> pure $ Just res
|
||||||
|
|
||||||
-- ====== network socket operations ======
|
|
||||||
|
|
||||||
-- | resolve a specified host and return the 'AddrInfo' for it.
|
|
||||||
-- If no hostname or IP is specified, the 'AddrInfo' can be used to bind to all
|
|
||||||
-- addresses;
|
|
||||||
-- if no port is specified an arbitrary free port is selected.
|
|
||||||
resolve :: Maybe String -- ^ hostname or IP address to be resolved
|
|
||||||
-> Maybe PortNumber -- ^ port number of either local bind or remote
|
|
||||||
-> IO AddrInfo
|
|
||||||
resolve host port = let
|
|
||||||
hints = defaultHints { addrFamily = AF_INET6, addrSocketType = Datagram
|
|
||||||
, addrFlags = [AI_PASSIVE] }
|
|
||||||
in
|
|
||||||
head <$> getAddrInfo (Just hints) host (show <$> port)
|
|
||||||
|
|
||||||
-- | create an unconnected UDP Datagram 'Socket' bound to the specified address
|
|
||||||
mkServerSocket :: HostAddress6 -> PortNumber -> IO Socket
|
|
||||||
mkServerSocket ip port = do
|
|
||||||
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
|
|
||||||
sock <- socket AF_INET6 Datagram defaultProtocol
|
|
||||||
setSocketOption sock IPv6Only 1
|
|
||||||
bind sock sockAddr
|
|
||||||
pure sock
|
|
||||||
|
|
||||||
-- | create a UDP datagram socket, connected to a destination.
|
|
||||||
-- The socket gets an arbitrary free local port assigned.
|
|
||||||
mkSendSocket :: String -- ^ destination hostname or IP
|
|
||||||
-> PortNumber -- ^ destination port
|
|
||||||
-> IO Socket -- ^ a socket with an arbitrary source port
|
|
||||||
mkSendSocket dest destPort = do
|
|
||||||
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
|
|
||||||
sendSock <- socket AF_INET6 Datagram defaultProtocol
|
|
||||||
setSocketOption sendSock IPv6Only 1
|
|
||||||
connect sendSock destAddr
|
|
||||||
pure sendSock
|
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -1,604 +0,0 @@
|
||||||
{-# LANGUAGE DataKinds #-}
|
|
||||||
{-# LANGUAGE DerivingStrategies #-}
|
|
||||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
|
|
||||||
{-# LANGUAGE OverloadedStrings #-}
|
|
||||||
{-# LANGUAGE RankNTypes #-}
|
|
||||||
|
|
||||||
module Hash2Pub.FediChordTypes (
|
|
||||||
NodeID -- abstract, but newtype constructors cannot be hidden
|
|
||||||
, idBits
|
|
||||||
, getNodeID
|
|
||||||
, toNodeID
|
|
||||||
, NodeState (..)
|
|
||||||
, LocalNodeState (..)
|
|
||||||
, LocalNodeStateSTM
|
|
||||||
, RemoteNodeState (..)
|
|
||||||
, RealNode (..)
|
|
||||||
, RealNodeSTM
|
|
||||||
, setSuccessors
|
|
||||||
, setPredecessors
|
|
||||||
, NodeCache
|
|
||||||
, CacheEntry(..)
|
|
||||||
, RingEntry(..)
|
|
||||||
, RingMap(..)
|
|
||||||
, HasKeyID
|
|
||||||
, getKeyID
|
|
||||||
, rMapSize
|
|
||||||
, rMapLookup
|
|
||||||
, rMapLookupPred
|
|
||||||
, rMapLookupSucc
|
|
||||||
, addRMapEntry
|
|
||||||
, addRMapEntryWith
|
|
||||||
, addPredecessors
|
|
||||||
, addSuccessors
|
|
||||||
, takeRMapPredecessors
|
|
||||||
, takeRMapSuccessors
|
|
||||||
, deleteRMapEntry
|
|
||||||
, setRMapEntries
|
|
||||||
, rMapFromList
|
|
||||||
, rMapToList
|
|
||||||
, cacheGetNodeStateUnvalidated
|
|
||||||
, initCache
|
|
||||||
, cacheEntries
|
|
||||||
, cacheLookup
|
|
||||||
, cacheLookupSucc
|
|
||||||
, cacheLookupPred
|
|
||||||
, localCompare
|
|
||||||
, genNodeID
|
|
||||||
, genNodeIDBS
|
|
||||||
, genKeyID
|
|
||||||
, genKeyIDBS
|
|
||||||
, byteStringToUInteger
|
|
||||||
, ipAddrAsBS
|
|
||||||
, bsAsIpAddr
|
|
||||||
, FediChordConf(..)
|
|
||||||
) where
|
|
||||||
|
|
||||||
import Control.Exception
|
|
||||||
import Data.Foldable (foldr')
|
|
||||||
import Data.Function (on)
|
|
||||||
import Data.List (delete, nub, sortBy)
|
|
||||||
import qualified Data.Map.Strict as Map
|
|
||||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
|
||||||
isNothing, mapMaybe)
|
|
||||||
import qualified Data.Set as Set
|
|
||||||
import Data.Time.Clock.POSIX
|
|
||||||
import Network.Socket
|
|
||||||
|
|
||||||
-- for hashing and ID conversion
|
|
||||||
import Control.Concurrent.STM
|
|
||||||
import Control.Concurrent.STM.TQueue
|
|
||||||
import Control.Concurrent.STM.TVar
|
|
||||||
import Control.Monad (forever)
|
|
||||||
import Crypto.Hash
|
|
||||||
import qualified Data.ByteArray as BA
|
|
||||||
import qualified Data.ByteString as BS
|
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
|
||||||
toHostAddress6)
|
|
||||||
import Data.Typeable (Typeable (..), typeOf)
|
|
||||||
import Data.Word
|
|
||||||
import qualified Network.ByteOrder as NetworkBytes
|
|
||||||
|
|
||||||
import Hash2Pub.Utils
|
|
||||||
|
|
||||||
import Debug.Trace (trace)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- define protocol constants
|
|
||||||
-- | static definition of ID length in bits
|
|
||||||
idBits :: Integer
|
|
||||||
idBits = 256
|
|
||||||
|
|
||||||
-- |NodeIDs are Integers wrapped in a newtype, to be able to redefine
|
|
||||||
-- their instance behaviour
|
|
||||||
--
|
|
||||||
-- for being able to check value bounds, the constructor should not be used directly
|
|
||||||
-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden)
|
|
||||||
newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum
|
|
||||||
|
|
||||||
-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values.
|
|
||||||
-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@
|
|
||||||
toNodeID :: Integer -> NodeID
|
|
||||||
toNodeID i = assert (i >= getNodeID minBound && i <= getNodeID maxBound) $ NodeID i
|
|
||||||
|
|
||||||
-- |NodeIDs are bounded by the value range of an unsigned Integer of length 'idBits'
|
|
||||||
instance Bounded NodeID where
|
|
||||||
minBound = NodeID 0
|
|
||||||
maxBound = NodeID (2^idBits - 1)
|
|
||||||
|
|
||||||
-- |calculations with NodeIDs are modular arithmetic operations
|
|
||||||
instance Num NodeID where
|
|
||||||
a + b = NodeID $ (getNodeID a + getNodeID b) `mod` (getNodeID maxBound + 1)
|
|
||||||
a * b = NodeID $ (getNodeID a * getNodeID b) `mod` (getNodeID maxBound + 1)
|
|
||||||
a - b = NodeID $ (getNodeID a - getNodeID b) `mod` (getNodeID maxBound + 1)
|
|
||||||
-- |safe constructor for NodeID values with the drawback, that out-of-bound values are wrapped around
|
|
||||||
-- with modulo to fit in the allowed value space. For runtime checking, look at @toNodeID@.
|
|
||||||
fromInteger i = NodeID $ i `mod` (getNodeID maxBound + 1)
|
|
||||||
signum = NodeID . signum . getNodeID
|
|
||||||
abs = NodeID . abs . getNodeID -- ToDo: make sure that at creation time only IDs within the range are used
|
|
||||||
|
|
||||||
-- | use normal strict monotonic ordering of integers, realising the ring structure
|
|
||||||
-- is done in the @NodeCache@ implementation
|
|
||||||
instance Ord NodeID where
|
|
||||||
a `compare` b = getNodeID a `compare` getNodeID b
|
|
||||||
|
|
||||||
-- | local comparison of 2 node IDs, only relevant for determining a successor or predecessor on caches with just 2 nodes
|
|
||||||
localCompare :: NodeID -> NodeID -> Ordering
|
|
||||||
a `localCompare` b
|
|
||||||
| getNodeID a == getNodeID b = EQ
|
|
||||||
| wayForwards > wayBackwards = GT
|
|
||||||
| otherwise = LT
|
|
||||||
where
|
|
||||||
wayForwards = getNodeID (b - a)
|
|
||||||
wayBackwards = getNodeID (a - b)
|
|
||||||
|
|
||||||
-- | Data for managing the virtual server nodes of this real node.
|
|
||||||
-- Also contains shared data and config values.
|
|
||||||
-- TODO: more data structures for k-choices bookkeeping
|
|
||||||
data RealNode = RealNode
|
|
||||||
{ vservers :: [LocalNodeStateSTM]
|
|
||||||
-- ^ references to all active versers
|
|
||||||
, nodeConfig :: FediChordConf
|
|
||||||
-- ^ holds the initial configuration read at program start
|
|
||||||
, bootstrapNodes :: [(String, PortNumber)]
|
|
||||||
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
|
|
||||||
}
|
|
||||||
|
|
||||||
type RealNodeSTM = TVar RealNode
|
|
||||||
|
|
||||||
-- | represents a node and all its important state
|
|
||||||
data RemoteNodeState = RemoteNodeState
|
|
||||||
{ nid :: NodeID
|
|
||||||
, domain :: String
|
|
||||||
-- ^ full public domain name the node is reachable under
|
|
||||||
, ipAddr :: HostAddress6
|
|
||||||
-- the node's public IPv6 address
|
|
||||||
, dhtPort :: PortNumber
|
|
||||||
-- ^ port of the DHT itself
|
|
||||||
, servicePort :: PortNumber
|
|
||||||
-- ^ port of the service provided on top of the DHT
|
|
||||||
, vServerID :: Integer
|
|
||||||
-- ^ ID of this vserver
|
|
||||||
}
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
instance Ord RemoteNodeState where
|
|
||||||
a `compare` b = nid a `compare` nid b
|
|
||||||
|
|
||||||
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes
|
|
||||||
data LocalNodeState = LocalNodeState
|
|
||||||
{ nodeState :: RemoteNodeState
|
|
||||||
-- ^ represents common data present both in remote and local node representations
|
|
||||||
, nodeCacheSTM :: TVar NodeCache
|
|
||||||
-- ^ EpiChord node cache with expiry times for nodes
|
|
||||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
|
||||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
|
||||||
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
|
|
||||||
-- ^ successor nodes in ascending order by distance
|
|
||||||
, predecessors :: [RemoteNodeState]
|
|
||||||
-- ^ predecessor nodes in ascending order by distance
|
|
||||||
, kNeighbours :: Int
|
|
||||||
-- ^ desired length of predecessor and successor list
|
|
||||||
, lNumBestNodes :: Int
|
|
||||||
-- ^ number of best next hops to provide
|
|
||||||
, pNumParallelQueries :: Int
|
|
||||||
-- ^ number of parallel sent queries
|
|
||||||
, jEntriesPerSlice :: Int
|
|
||||||
-- ^ number of desired entries per cache slice
|
|
||||||
, parentRealNode :: RealNodeSTM
|
|
||||||
-- ^ the parent node managing this vserver instance
|
|
||||||
}
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
-- | for concurrent access, LocalNodeState is wrapped in a TVar
|
|
||||||
type LocalNodeStateSTM = TVar LocalNodeState
|
|
||||||
|
|
||||||
-- | class for various NodeState representations, providing
|
|
||||||
-- getters and setters for common values
|
|
||||||
class NodeState a where
|
|
||||||
-- getters for common properties
|
|
||||||
getNid :: a -> NodeID
|
|
||||||
getDomain :: a -> String
|
|
||||||
getIpAddr :: a -> HostAddress6
|
|
||||||
getDhtPort :: a -> PortNumber
|
|
||||||
getServicePort :: a -> PortNumber
|
|
||||||
getVServerID :: a -> Integer
|
|
||||||
-- setters for common properties
|
|
||||||
setNid :: NodeID -> a -> a
|
|
||||||
setDomain :: String -> a -> a
|
|
||||||
setIpAddr :: HostAddress6 -> a -> a
|
|
||||||
setDhtPort :: PortNumber -> a -> a
|
|
||||||
setServicePort :: PortNumber -> a -> a
|
|
||||||
setVServerID :: Integer -> a -> a
|
|
||||||
toRemoteNodeState :: a -> RemoteNodeState
|
|
||||||
|
|
||||||
instance NodeState RemoteNodeState where
|
|
||||||
getNid = nid
|
|
||||||
getDomain = domain
|
|
||||||
getIpAddr = ipAddr
|
|
||||||
getDhtPort = dhtPort
|
|
||||||
getServicePort = servicePort
|
|
||||||
getVServerID = vServerID
|
|
||||||
setNid nid' ns = ns {nid = nid'}
|
|
||||||
setDomain domain' ns = ns {domain = domain'}
|
|
||||||
setIpAddr ipAddr' ns = ns {ipAddr = ipAddr'}
|
|
||||||
setDhtPort dhtPort' ns = ns {dhtPort = dhtPort'}
|
|
||||||
setServicePort servicePort' ns = ns {servicePort = servicePort'}
|
|
||||||
setVServerID vServerID' ns = ns {vServerID = vServerID'}
|
|
||||||
toRemoteNodeState = id
|
|
||||||
|
|
||||||
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
|
|
||||||
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState
|
|
||||||
propagateNodeStateSet_ func ns = let
|
|
||||||
newNs = func $ nodeState ns
|
|
||||||
in
|
|
||||||
ns {nodeState = newNs}
|
|
||||||
|
|
||||||
|
|
||||||
instance NodeState LocalNodeState where
|
|
||||||
getNid = getNid . nodeState
|
|
||||||
getDomain = getDomain . nodeState
|
|
||||||
getIpAddr = getIpAddr . nodeState
|
|
||||||
getDhtPort = getDhtPort . nodeState
|
|
||||||
getServicePort = getServicePort . nodeState
|
|
||||||
getVServerID = getVServerID . nodeState
|
|
||||||
setNid nid' = propagateNodeStateSet_ $ setNid nid'
|
|
||||||
setDomain domain' = propagateNodeStateSet_ $ setDomain domain'
|
|
||||||
setIpAddr ipAddr' = propagateNodeStateSet_ $ setIpAddr ipAddr'
|
|
||||||
setDhtPort dhtPort' = propagateNodeStateSet_ $ setDhtPort dhtPort'
|
|
||||||
setServicePort servicePort' = propagateNodeStateSet_ $ setServicePort servicePort'
|
|
||||||
setVServerID vServerID' = propagateNodeStateSet_ $ setVServerID vServerID'
|
|
||||||
toRemoteNodeState = nodeState
|
|
||||||
|
|
||||||
-- | defining Show instances to be able to print NodeState for debug purposes
|
|
||||||
instance Typeable a => Show (TVar a) where
|
|
||||||
show x = show (typeOf x)
|
|
||||||
|
|
||||||
instance Typeable a => Show (TQueue a) where
|
|
||||||
show x = show (typeOf x)
|
|
||||||
|
|
||||||
|
|
||||||
-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list
|
|
||||||
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
|
|
||||||
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ preds}
|
|
||||||
|
|
||||||
-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list
|
|
||||||
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
|
|
||||||
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . filter ((/=) (getNid ns) . getNid) $ succs}
|
|
||||||
|
|
||||||
-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined
|
|
||||||
addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
|
|
||||||
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) preds) . rMapFromList $ predecessors ns}
|
|
||||||
|
|
||||||
-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined
|
|
||||||
addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
|
|
||||||
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (filter ((/=) (getNid ns) . getNid) succs) . rMapFromList $ successors ns}
|
|
||||||
|
|
||||||
-- | Class for all types that can be identified via an EpiChord key.
|
|
||||||
-- Used for restricting the types a 'RingMap' can store
|
|
||||||
class (Eq a, Show a) => HasKeyID a where
|
|
||||||
getKeyID :: a -> NodeID
|
|
||||||
|
|
||||||
instance HasKeyID RemoteNodeState where
|
|
||||||
getKeyID = getNid
|
|
||||||
|
|
||||||
instance HasKeyID CacheEntry where
|
|
||||||
getKeyID (CacheEntry _ ns _) = getNid ns
|
|
||||||
|
|
||||||
instance HasKeyID NodeID where
|
|
||||||
getKeyID = id
|
|
||||||
|
|
||||||
type NodeCache = RingMap CacheEntry
|
|
||||||
|
|
||||||
-- | generic data structure for holding elements with a key and modular lookup
|
|
||||||
newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) }
|
|
||||||
|
|
||||||
instance (HasKeyID a) => Eq (RingMap a) where
|
|
||||||
a == b = getRingMap a == getRingMap b
|
|
||||||
|
|
||||||
instance (HasKeyID a) => Show (RingMap a) where
|
|
||||||
show rmap = shows "RingMap " (show $ getRingMap rmap)
|
|
||||||
|
|
||||||
-- | entry of a 'RingMap' that holds a value and can also
|
|
||||||
-- wrap around the lookup direction at the edges of the name space.
|
|
||||||
data RingEntry a = KeyEntry a
|
|
||||||
| ProxyEntry (NodeID, ProxyDirection) (Maybe (RingEntry a))
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
-- | 'RingEntry' type for usage as a node cache
|
|
||||||
data CacheEntry = CacheEntry Bool RemoteNodeState POSIXTime
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
|
|
||||||
-- | as a compromise, only KeyEntry components are ordered by their NodeID
|
|
||||||
-- while ProxyEntry components should never be tried to be ordered.
|
|
||||||
instance (HasKeyID a, Eq a) => Ord (RingEntry a) where
|
|
||||||
a `compare` b = compare (extractID a) (extractID b)
|
|
||||||
where
|
|
||||||
extractID (KeyEntry e) = getKeyID e
|
|
||||||
extractID ProxyEntry{} = error "proxy entries should never appear outside of the RingMap"
|
|
||||||
|
|
||||||
data ProxyDirection = Backwards
|
|
||||||
| Forwards
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
instance Enum ProxyDirection where
|
|
||||||
toEnum (-1) = Backwards
|
|
||||||
toEnum 1 = Forwards
|
|
||||||
toEnum _ = error "no such ProxyDirection"
|
|
||||||
fromEnum Backwards = - 1
|
|
||||||
fromEnum Forwards = 1
|
|
||||||
|
|
||||||
-- | helper function for getting the a from a RingEntry a
|
|
||||||
extractRingEntry :: HasKeyID a => RingEntry a -> Maybe a
|
|
||||||
extractRingEntry (KeyEntry entry) = Just entry
|
|
||||||
extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
|
|
||||||
extractRingEntry _ = Nothing
|
|
||||||
|
|
||||||
--- useful function for getting entries for a full cache transfer
|
|
||||||
cacheEntries :: NodeCache -> [CacheEntry]
|
|
||||||
cacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
|
|
||||||
|
|
||||||
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
|
|
||||||
-- linking the modular name space together by connecting @minBound@ and @maxBound@
|
|
||||||
emptyRMap :: HasKeyID a => RingMap a
|
|
||||||
emptyRMap = RingMap . Map.fromList $ proxyEntry <$> [(maxBound, (minBound, Forwards)), (minBound, (maxBound, Backwards))]
|
|
||||||
where
|
|
||||||
proxyEntry (from,to) = (from, ProxyEntry to Nothing)
|
|
||||||
|
|
||||||
initCache :: NodeCache
|
|
||||||
initCache = emptyRMap
|
|
||||||
|
|
||||||
-- | Maybe returns the entry stored at given key
|
|
||||||
rMapLookup :: HasKeyID a
|
|
||||||
=> NodeID -- ^lookup key
|
|
||||||
-> RingMap a -- ^lookup cache
|
|
||||||
-> Maybe a
|
|
||||||
rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap)
|
|
||||||
|
|
||||||
cacheLookup :: NodeID -- ^lookup key
|
|
||||||
-> NodeCache -- ^lookup cache
|
|
||||||
-> Maybe CacheEntry
|
|
||||||
cacheLookup = rMapLookup
|
|
||||||
|
|
||||||
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
|
|
||||||
rMapSize :: (HasKeyID a, Integral i)
|
|
||||||
=> RingMap a
|
|
||||||
-> i
|
|
||||||
rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry minBound - oneIfEntry maxBound
|
|
||||||
where
|
|
||||||
innerMap = getRingMap rmap
|
|
||||||
oneIfEntry :: Integral i => NodeID -> i
|
|
||||||
oneIfEntry nid
|
|
||||||
| isNothing (rMapLookup nid rmap) = 1
|
|
||||||
| otherwise = 0
|
|
||||||
|
|
||||||
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
|
|
||||||
-- to simulate a modular ring
|
|
||||||
lookupWrapper :: HasKeyID a
|
|
||||||
=> (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a))
|
|
||||||
-> (NodeID -> Map.Map NodeID (RingEntry a) -> Maybe (NodeID, RingEntry a))
|
|
||||||
-> ProxyDirection
|
|
||||||
-> NodeID
|
|
||||||
-> RingMap a
|
|
||||||
-> Maybe a
|
|
||||||
lookupWrapper f fRepeat direction key rmap =
|
|
||||||
case f key $ getRingMap rmap of
|
|
||||||
-- the proxy entry found holds a
|
|
||||||
Just (_, ProxyEntry _ (Just (KeyEntry entry))) -> Just entry
|
|
||||||
-- proxy entry holds another proxy entry, this should not happen
|
|
||||||
Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing
|
|
||||||
-- proxy entry without own entry is a pointer on where to continue
|
|
||||||
-- if lookup direction is the same as pointer direction: follow pointer
|
|
||||||
Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) ->
|
|
||||||
let newKey = if pointerDirection == direction
|
|
||||||
then pointerID
|
|
||||||
else foundKey + (fromInteger . toInteger . fromEnum $ direction)
|
|
||||||
in if rMapNotEmpty rmap
|
|
||||||
then lookupWrapper fRepeat fRepeat direction newKey rmap
|
|
||||||
else Nothing
|
|
||||||
-- normal entries are returned
|
|
||||||
Just (_, KeyEntry entry) -> Just entry
|
|
||||||
Nothing -> Nothing
|
|
||||||
where
|
|
||||||
rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool
|
|
||||||
rMapNotEmpty rmap' = (Map.size (getRingMap rmap') > 2) -- there are more than the 2 ProxyEntries
|
|
||||||
|| isJust (rMapLookup minBound rmap') -- or one of the ProxyEntries holds a node
|
|
||||||
|| isJust (rMapLookup maxBound rmap')
|
|
||||||
|
|
||||||
-- | find the successor node to a given key on a modular EpiChord ring.
|
|
||||||
-- Note: The EpiChord definition of "successor" includes the node at the key itself,
|
|
||||||
-- if existing.
|
|
||||||
rMapLookupSucc :: HasKeyID a
|
|
||||||
=> NodeID -- ^lookup key
|
|
||||||
-> RingMap a -- ^ring cache
|
|
||||||
-> Maybe a
|
|
||||||
rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
|
|
||||||
|
|
||||||
cacheLookupSucc :: NodeID -- ^lookup key
|
|
||||||
-> NodeCache -- ^ring cache
|
|
||||||
-> Maybe CacheEntry
|
|
||||||
cacheLookupSucc = rMapLookupSucc
|
|
||||||
|
|
||||||
-- | find the predecessor node to a given key on a modular EpiChord ring.
|
|
||||||
rMapLookupPred :: HasKeyID a
|
|
||||||
=> NodeID -- ^lookup key
|
|
||||||
-> RingMap a -- ^ring cache
|
|
||||||
-> Maybe a
|
|
||||||
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
|
|
||||||
|
|
||||||
cacheLookupPred :: NodeID -- ^lookup key
|
|
||||||
-> NodeCache -- ^ring cache
|
|
||||||
-> Maybe CacheEntry
|
|
||||||
cacheLookupPred = rMapLookupPred
|
|
||||||
|
|
||||||
addRMapEntryWith :: HasKeyID a
|
|
||||||
=> (RingEntry a -> RingEntry a -> RingEntry a)
|
|
||||||
-> a
|
|
||||||
-> RingMap a
|
|
||||||
-> RingMap a
|
|
||||||
addRMapEntryWith combineFunc entry = RingMap
|
|
||||||
. Map.insertWith combineFunc (getKeyID entry) (KeyEntry entry)
|
|
||||||
. getRingMap
|
|
||||||
|
|
||||||
addRMapEntry :: HasKeyID a
|
|
||||||
=> a
|
|
||||||
-> RingMap a
|
|
||||||
-> RingMap a
|
|
||||||
addRMapEntry = addRMapEntryWith insertCombineFunction
|
|
||||||
where
|
|
||||||
insertCombineFunction newVal oldVal =
|
|
||||||
case oldVal of
|
|
||||||
ProxyEntry n _ -> ProxyEntry n (Just newVal)
|
|
||||||
KeyEntry _ -> newVal
|
|
||||||
|
|
||||||
|
|
||||||
addRMapEntries :: (Foldable t, HasKeyID a)
|
|
||||||
=> t a
|
|
||||||
-> RingMap a
|
|
||||||
-> RingMap a
|
|
||||||
addRMapEntries entries rmap = foldr' addRMapEntry rmap entries
|
|
||||||
|
|
||||||
setRMapEntries :: (Foldable t, HasKeyID a)
|
|
||||||
=> t a
|
|
||||||
-> RingMap a
|
|
||||||
setRMapEntries entries = addRMapEntries entries emptyRMap
|
|
||||||
|
|
||||||
deleteRMapEntry :: (HasKeyID a)
|
|
||||||
=> NodeID
|
|
||||||
-> RingMap a
|
|
||||||
-> RingMap a
|
|
||||||
deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
|
|
||||||
where
|
|
||||||
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
|
||||||
modifier KeyEntry {} = Nothing
|
|
||||||
|
|
||||||
rMapToList :: (HasKeyID a) => RingMap a -> [a]
|
|
||||||
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
|
|
||||||
|
|
||||||
rMapFromList :: (HasKeyID a) => [a] -> RingMap a
|
|
||||||
rMapFromList = setRMapEntries
|
|
||||||
|
|
||||||
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
|
|
||||||
-- *startAt* value and after that on the previously returned value.
|
|
||||||
-- Stops once i entries have been taken or an entry has been encountered twice
|
|
||||||
-- (meaning the ring has been traversed completely).
|
|
||||||
-- Forms the basis for 'takeRMapSuccessors' and 'takeRMapPredecessors'.
|
|
||||||
takeRMapEntries_ :: (HasKeyID a, Integral i)
|
|
||||||
=> (NodeID -> RingMap a -> Maybe a)
|
|
||||||
-> NodeID
|
|
||||||
-> i
|
|
||||||
-> RingMap a
|
|
||||||
-> [a]
|
|
||||||
-- TODO: might be more efficient with dlists
|
|
||||||
takeRMapEntries_ getterFunc startAt num rmap = reverse $
|
|
||||||
case getterFunc startAt rmap of
|
|
||||||
Nothing -> []
|
|
||||||
Just anEntry -> takeEntriesUntil (getKeyID anEntry) (getKeyID anEntry) (num-1) [anEntry]
|
|
||||||
where
|
|
||||||
takeEntriesUntil havingReached previousEntry remaining takeAcc
|
|
||||||
| remaining <= 0 = takeAcc
|
|
||||||
| getKeyID (fromJust $ getterFunc previousEntry rmap) == havingReached = takeAcc
|
|
||||||
| otherwise = let (Just gotEntry) = getterFunc previousEntry rmap
|
|
||||||
in takeEntriesUntil havingReached (getKeyID gotEntry) (remaining-1) (gotEntry:takeAcc)
|
|
||||||
|
|
||||||
takeRMapPredecessors :: (HasKeyID a, Integral i)
|
|
||||||
=> NodeID
|
|
||||||
-> i
|
|
||||||
-> RingMap a
|
|
||||||
-> [a]
|
|
||||||
takeRMapPredecessors = takeRMapEntries_ rMapLookupPred
|
|
||||||
|
|
||||||
takeRMapSuccessors :: (HasKeyID a, Integral i)
|
|
||||||
=> NodeID
|
|
||||||
-> i
|
|
||||||
-> RingMap a
|
|
||||||
-> [a]
|
|
||||||
takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
|
|
||||||
|
|
||||||
-- clean up cache entries: once now - entry > maxAge
|
|
||||||
-- transfer difference now - entry to other node
|
|
||||||
|
|
||||||
-- | return the @NodeState@ data from a cache entry without checking its validation status
|
|
||||||
cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState
|
|
||||||
cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
|
|
||||||
|
|
||||||
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
|
|
||||||
ipAddrAsBS :: HostAddress6 -> BS.ByteString
|
|
||||||
ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d]
|
|
||||||
|
|
||||||
-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6'
|
|
||||||
bsAsIpAddr :: BS.ByteString -> HostAddress6
|
|
||||||
bsAsIpAddr bytes = (a,b,c,d)
|
|
||||||
where
|
|
||||||
a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes
|
|
||||||
|
|
||||||
|
|
||||||
-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString
|
|
||||||
genNodeIDBS :: HostAddress6 -- ^a node's IPv6 address
|
|
||||||
-> String -- ^a node's 1st and 2nd level domain name
|
|
||||||
-> Word8 -- ^the used vserver ID
|
|
||||||
-> BS.ByteString -- ^the NodeID as a 256bit ByteString big-endian unsigned integer
|
|
||||||
genNodeIDBS ip nodeDomain vserver =
|
|
||||||
hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower
|
|
||||||
where
|
|
||||||
vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255
|
|
||||||
ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS
|
|
||||||
nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS
|
|
||||||
hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128))
|
|
||||||
(hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet
|
|
||||||
|
|
||||||
|
|
||||||
-- | generates a 256 bit long @NodeID@ using SHAKE128
|
|
||||||
genNodeID :: HostAddress6 -- ^a node's IPv6 address
|
|
||||||
-> String -- ^a node's 1st and 2nd level domain name
|
|
||||||
-> Word8 -- ^the used vserver ID
|
|
||||||
-> NodeID -- ^the generated @NodeID@
|
|
||||||
genNodeID ip nodeDomain vs = NodeID . byteStringToUInteger $ genNodeIDBS ip nodeDomain vs
|
|
||||||
|
|
||||||
-- | generates a 256 bit long key identifier, represented as ByteString, for looking up its data on the DHT
|
|
||||||
genKeyIDBS :: String -- ^the key string
|
|
||||||
-> BS.ByteString -- ^the key ID represented as a @ByteString@
|
|
||||||
genKeyIDBS key = BS.pack . BA.unpack $ (hash (BSU.fromString key) :: Digest SHA3_256)
|
|
||||||
|
|
||||||
-- | generates a 256 bit long key identifier for looking up its data on the DHT
|
|
||||||
genKeyID :: String -- ^the key string
|
|
||||||
-> NodeID -- ^the key ID
|
|
||||||
genKeyID = NodeID . byteStringToUInteger . genKeyIDBS
|
|
||||||
|
|
||||||
|
|
||||||
-- | parses the bit pattern of a ByteString as an unsigned Integer in Big Endian order
|
|
||||||
-- by iterating it byte-wise from the back and shifting the byte values according to their offset
|
|
||||||
byteStringToUInteger :: BS.ByteString -> Integer
|
|
||||||
byteStringToUInteger bs = sum $ parsedBytes 0 bs
|
|
||||||
where
|
|
||||||
parsedBytes :: Integer -> BS.ByteString -> [ Integer ]
|
|
||||||
parsedBytes offset uintBs = case BS.unsnoc uintBs of
|
|
||||||
Nothing -> []
|
|
||||||
Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs'
|
|
||||||
|
|
||||||
parseWithOffset :: Integer -> Word8 -> Integer
|
|
||||||
parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0
|
|
||||||
parseWithOffset offset word = toInteger word * 2^(8 * offset)
|
|
||||||
|
|
||||||
-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages
|
|
||||||
-- persist them on disk so they can be used for all following bootstraps
|
|
||||||
|
|
||||||
-- | configuration values used for initialising the FediChord DHT
|
|
||||||
data FediChordConf = FediChordConf
|
|
||||||
{ confDomain :: String
|
|
||||||
-- ^ the domain/ hostname the node is reachable under
|
|
||||||
, confIP :: HostAddress6
|
|
||||||
-- ^ IP address of outgoing packets
|
|
||||||
, confDhtPort :: Int
|
|
||||||
-- ^ listening port for the FediChord DHT
|
|
||||||
, confBootstrapNodes :: [(String, PortNumber)]
|
|
||||||
-- ^ list of potential bootstrapping nodes
|
|
||||||
, confBootstrapSamplingInterval :: Int
|
|
||||||
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
|
|
||||||
}
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
|
|
|
@ -1,101 +0,0 @@
|
||||||
module Hash2Pub.ProtocolTypes where
|
|
||||||
|
|
||||||
import qualified Data.Map as Map
|
|
||||||
import Data.Maybe (mapMaybe)
|
|
||||||
import qualified Data.Set as Set
|
|
||||||
import Data.Time.Clock.POSIX (POSIXTime)
|
|
||||||
|
|
||||||
import Hash2Pub.FediChordTypes
|
|
||||||
|
|
||||||
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
|
|
||||||
| FOUND RemoteNodeState
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
-- === protocol serialisation data types
|
|
||||||
|
|
||||||
data Action = QueryID
|
|
||||||
| Join
|
|
||||||
| Leave
|
|
||||||
| Stabilise
|
|
||||||
| Ping
|
|
||||||
deriving (Show, Eq, Enum)
|
|
||||||
|
|
||||||
data FediChordMessage = Request
|
|
||||||
{ requestID :: Integer
|
|
||||||
, sender :: RemoteNodeState
|
|
||||||
, part :: Integer
|
|
||||||
, isFinalPart :: Bool
|
|
||||||
-- ^ part starts at 1
|
|
||||||
, action :: Action
|
|
||||||
, payload :: Maybe ActionPayload
|
|
||||||
}
|
|
||||||
| Response
|
|
||||||
{ requestID :: Integer
|
|
||||||
, senderID :: NodeID
|
|
||||||
, part :: Integer
|
|
||||||
, isFinalPart :: Bool
|
|
||||||
, action :: Action
|
|
||||||
, payload :: Maybe ActionPayload
|
|
||||||
}
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
instance Ord FediChordMessage where
|
|
||||||
compare a@Request{} b@Request{} | requestID a == requestID b = part a `compare` part b
|
|
||||||
| otherwise = requestID a `compare` requestID b
|
|
||||||
compare a@Response{} b@Response{} | requestID a == requestID b = part a `compare` part b
|
|
||||||
| otherwise = requestID a `compare` requestID b
|
|
||||||
-- comparing different constructor types always yields "not equal"
|
|
||||||
compare _ _ = LT
|
|
||||||
|
|
||||||
data ActionPayload = QueryIDRequestPayload
|
|
||||||
{ queryTargetID :: NodeID
|
|
||||||
, queryLBestNodes :: Integer
|
|
||||||
}
|
|
||||||
| JoinRequestPayload
|
|
||||||
| LeaveRequestPayload
|
|
||||||
{ leaveSuccessors :: [RemoteNodeState]
|
|
||||||
, leavePredecessors :: [RemoteNodeState]
|
|
||||||
}
|
|
||||||
| StabiliseRequestPayload
|
|
||||||
| PingRequestPayload
|
|
||||||
| QueryIDResponsePayload
|
|
||||||
{ queryResult :: QueryResponse
|
|
||||||
}
|
|
||||||
| JoinResponsePayload
|
|
||||||
{ joinSuccessors :: [RemoteNodeState]
|
|
||||||
, joinPredecessors :: [RemoteNodeState]
|
|
||||||
, joinCache :: [RemoteCacheEntry]
|
|
||||||
}
|
|
||||||
| LeaveResponsePayload
|
|
||||||
| StabiliseResponsePayload
|
|
||||||
{ stabiliseSuccessors :: [RemoteNodeState]
|
|
||||||
, stabilisePredecessors :: [RemoteNodeState]
|
|
||||||
}
|
|
||||||
| PingResponsePayload
|
|
||||||
{ pingNodeStates :: [RemoteNodeState]
|
|
||||||
}
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
-- | global limit of parts per message used when (de)serialising messages.
|
|
||||||
-- Used to limit the impact of DOS attempts with partial messages.
|
|
||||||
maximumParts :: Num a => a
|
|
||||||
maximumParts = 150
|
|
||||||
|
|
||||||
-- | dedicated data type for cache entries sent to or received from the network,
|
|
||||||
-- as these have to be considered as unvalidated. Also helps with separation of trust.
|
|
||||||
data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
|
|
||||||
deriving (Show, Eq)
|
|
||||||
|
|
||||||
instance Ord RemoteCacheEntry where
|
|
||||||
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
|
|
||||||
|
|
||||||
toRemoteCacheEntry :: CacheEntry -> RemoteCacheEntry
|
|
||||||
toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts
|
|
||||||
|
|
||||||
-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
|
|
||||||
toRemoteCache :: NodeCache -> [RemoteCacheEntry]
|
|
||||||
toRemoteCache cache = toRemoteCacheEntry <$> cacheEntries cache
|
|
||||||
|
|
||||||
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
|
|
||||||
remoteNode :: RemoteCacheEntry -> RemoteNodeState
|
|
||||||
remoteNode (RemoteCacheEntry ns _) = ns
|
|
|
@ -1,13 +1,13 @@
|
||||||
{-# LANGUAGE OverloadedStrings #-}
|
{-# LANGUAGE OverloadedStrings #-}
|
||||||
module FediChordSpec where
|
module FediChordSpec where
|
||||||
|
|
||||||
import Control.Concurrent.STM.TVar
|
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Data.ASN1.Parse (runParseASN1)
|
import Data.ASN1.Parse (runParseASN1)
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import qualified Data.Map.Strict as Map
|
import Data.IORef
|
||||||
import Data.Maybe (fromJust, isJust)
|
import qualified Data.Map.Strict as Map
|
||||||
import qualified Data.Set as Set
|
import Data.Maybe (fromJust)
|
||||||
|
import qualified Data.Set as Set
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Network.Socket
|
import Network.Socket
|
||||||
import Test.Hspec
|
import Test.Hspec
|
||||||
|
@ -15,7 +15,6 @@ import Test.Hspec
|
||||||
import Hash2Pub.ASN1Coding
|
import Hash2Pub.ASN1Coding
|
||||||
import Hash2Pub.DHTProtocol
|
import Hash2Pub.DHTProtocol
|
||||||
import Hash2Pub.FediChord
|
import Hash2Pub.FediChord
|
||||||
import Hash2Pub.FediChordTypes
|
|
||||||
|
|
||||||
spec :: Spec
|
spec :: Spec
|
||||||
spec = do
|
spec = do
|
||||||
|
@ -56,13 +55,14 @@ spec = do
|
||||||
it "can be initialised" $
|
it "can be initialised" $
|
||||||
print exampleNodeState
|
print exampleNodeState
|
||||||
it "can be initialised partly and then modified later" $ do
|
it "can be initialised partly and then modified later" $ do
|
||||||
let ns = RemoteNodeState {
|
let ns = NodeState {
|
||||||
nid = undefined
|
nid = undefined
|
||||||
, domain = exampleNodeDomain
|
, domain = exampleNodeDomain
|
||||||
, ipAddr = exampleIp
|
, ipAddr = exampleIp
|
||||||
, dhtPort = 2342
|
, dhtPort = 2342
|
||||||
, servicePort = 513
|
, apPort = Nothing
|
||||||
, vServerID = undefined
|
, vServerID = undefined
|
||||||
|
, internals = Nothing
|
||||||
}
|
}
|
||||||
nsReady = ns {
|
nsReady = ns {
|
||||||
nid = genNodeID (ipAddr ns) (domain ns) 3
|
nid = genNodeID (ipAddr ns) (domain ns) 3
|
||||||
|
@ -81,8 +81,8 @@ spec = do
|
||||||
newCache = addCacheEntryPure 10 (RemoteCacheEntry exampleNodeState 10) (addCacheEntryPure 10 (RemoteCacheEntry anotherNode 10) emptyCache)
|
newCache = addCacheEntryPure 10 (RemoteCacheEntry exampleNodeState 10) (addCacheEntryPure 10 (RemoteCacheEntry anotherNode 10) emptyCache)
|
||||||
exampleID = nid exampleNodeState
|
exampleID = nid exampleNodeState
|
||||||
it "entries can be added to a node cache and looked up again" $ do
|
it "entries can be added to a node cache and looked up again" $ do
|
||||||
rMapSize emptyCache `shouldBe` 0
|
-- the cache includes 2 additional proxy elements right from the start
|
||||||
rMapSize newCache `shouldBe` 2
|
Map.size newCache - Map.size emptyCache `shouldBe` 2
|
||||||
-- normal entry lookup
|
-- normal entry lookup
|
||||||
nid . cacheGetNodeStateUnvalidated <$> cacheLookup anotherID newCache `shouldBe` Just anotherID
|
nid . cacheGetNodeStateUnvalidated <$> cacheLookup anotherID newCache `shouldBe` Just anotherID
|
||||||
nid . cacheGetNodeStateUnvalidated <$> cacheLookup (anotherID+1) newCache `shouldBe` Nothing
|
nid . cacheGetNodeStateUnvalidated <$> cacheLookup (anotherID+1) newCache `shouldBe` Nothing
|
||||||
|
@ -121,72 +121,50 @@ spec = do
|
||||||
let
|
let
|
||||||
emptyCache = initCache
|
emptyCache = initCache
|
||||||
nid1 = toNodeID 2^(23::Integer)+1
|
nid1 = toNodeID 2^(23::Integer)+1
|
||||||
node1 = setPredecessors [node4] . setNid nid1 <$> exampleLocalNode
|
node1 = do
|
||||||
|
eln <- exampleLocalNode -- is at 2^23.00000017198264 = 8388609
|
||||||
|
pure $ putPredecessors [nid4] $ eln {nid = nid1}
|
||||||
nid2 = toNodeID 2^(230::Integer)+12
|
nid2 = toNodeID 2^(230::Integer)+12
|
||||||
node2 = exampleNodeState { nid = nid2}
|
node2 = exampleNodeState { nid = nid2}
|
||||||
nid3 = toNodeID 2^(25::Integer)+10
|
nid3 = toNodeID 2^(25::Integer)+10
|
||||||
node3 = exampleNodeState { nid = nid3}
|
node3 = exampleNodeState { nid = nid3}
|
||||||
nid4 = toNodeID 2^(9::Integer)+100
|
nid4 = toNodeID 2^(9::Integer)+100
|
||||||
node4 = exampleNodeState { nid = nid4}
|
node4 = exampleNodeState { nid = nid4}
|
||||||
nid5 = toNodeID 2^(25::Integer)+100
|
cacheWith2Entries :: IO NodeCache
|
||||||
node5 = exampleNodeState { nid = nid5}
|
cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
|
||||||
cacheWith2Entries :: NodeCache
|
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries)
|
||||||
cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
|
it "works on an empty cache" $ do
|
||||||
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries)
|
|
||||||
it "unjoined nodes should never return themselfs" $ do
|
|
||||||
exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode
|
|
||||||
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty
|
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty
|
||||||
(FORWARD fwSet) <- queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> (getNid <$> exampleLocalNode)
|
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty
|
||||||
remoteNode (head $ Set.elems fwSet) `shouldBe` node4
|
|
||||||
it "joined nodes do not fall back to the default" $
|
|
||||||
queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty
|
|
||||||
it "works on a cache with less entries than needed" $ do
|
it "works on a cache with less entries than needed" $ do
|
||||||
(FORWARD nodeset) <- queryLocalCache <$> node1 <*> pure cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5)
|
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5)
|
||||||
Set.map (nid . remoteNode) nodeset `shouldBe` Set.fromList [ nid5, nid2 ]
|
Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
|
||||||
it "works on a cache with sufficient entries" $ do
|
it "works on a cache with sufficient entries" $ do
|
||||||
(FORWARD nodeset1) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5)
|
(FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5)
|
||||||
(FORWARD nodeset2) <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5)
|
(FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5)
|
||||||
Set.map (nid . remoteNode) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid5]
|
Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
|
||||||
Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4]
|
Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4]
|
||||||
it "recognises the node's own responsibility" $ do
|
it "recognises the node's own responsibility" $ do
|
||||||
FOUND selfQueryRes <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure nid1
|
FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1
|
||||||
getNid <$> node1 `shouldReturn` getNid selfQueryRes
|
nid <$> node1 `shouldReturn` nid selfQueryRes
|
||||||
FOUND responsibilityResult <- queryLocalCache <$> node1 <*> pure cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))
|
FOUND responsibilityResult <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))
|
||||||
getNid <$> node1 `shouldReturn` getNid responsibilityResult
|
nid <$> node1 `shouldReturn` nid responsibilityResult
|
||||||
describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do
|
it "does not fail on nodes without neighbours (initial state)" $ do
|
||||||
let
|
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11)
|
||||||
emptyCache = initCache
|
Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
|
||||||
-- implicitly relies on kNeighbours to be <= 3
|
|
||||||
thisNid = toNodeID 1000
|
|
||||||
thisNode = setNid thisNid <$> exampleLocalNode
|
|
||||||
nid2 = toNodeID 1003
|
|
||||||
node2 = exampleNodeState { nid = nid2}
|
|
||||||
nid3 = toNodeID 1010
|
|
||||||
node3 = exampleNodeState { nid = nid3}
|
|
||||||
nid4 = toNodeID 1020
|
|
||||||
node4 = exampleNodeState { nid = nid4}
|
|
||||||
nid5 = toNodeID 1025
|
|
||||||
node5 = exampleNodeState { nid = nid5}
|
|
||||||
allRemoteNodes = [node2, node3, node4, node5]
|
|
||||||
it "lookups also work for slices larger than 1/2 key space" $ do
|
|
||||||
node <- setSuccessors allRemoteNodes . setPredecessors allRemoteNodes <$> thisNode
|
|
||||||
-- do lookup on empty cache but with successors for a key > 1/2 key space
|
|
||||||
-- succeeding the node
|
|
||||||
queryLocalCache node emptyCache 1 (nid5 + 10) `shouldBe` FOUND (toRemoteNodeState node)
|
|
||||||
|
|
||||||
|
|
||||||
describe "Messages can be encoded to and decoded from ASN.1" $ do
|
describe "Messages can be encoded to and decoded from ASN.1" $ do
|
||||||
-- define test messages
|
-- define test messages
|
||||||
let
|
let
|
||||||
someNodes = fmap (flip setNid exampleNodeState . fromInteger) [3..12]
|
someNodeIDs = fmap fromInteger [3..12]
|
||||||
qidReqPayload = QueryIDRequestPayload {
|
qidReqPayload = QueryIDRequestPayload {
|
||||||
queryTargetID = nid exampleNodeState
|
queryTargetID = nid exampleNodeState
|
||||||
, queryLBestNodes = 3
|
, queryLBestNodes = 3
|
||||||
}
|
}
|
||||||
jReqPayload = JoinRequestPayload
|
jReqPayload = JoinRequestPayload
|
||||||
lReqPayload = LeaveRequestPayload {
|
lReqPayload = LeaveRequestPayload {
|
||||||
leaveSuccessors = someNodes
|
leaveSuccessors = someNodeIDs
|
||||||
, leavePredecessors = someNodes
|
, leavePredecessors = someNodeIDs
|
||||||
}
|
}
|
||||||
stabReqPayload = StabiliseRequestPayload
|
stabReqPayload = StabiliseRequestPayload
|
||||||
pingReqPayload = PingRequestPayload
|
pingReqPayload = PingRequestPayload
|
||||||
|
@ -200,8 +178,8 @@ spec = do
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
jResPayload = JoinResponsePayload {
|
jResPayload = JoinResponsePayload {
|
||||||
joinSuccessors = someNodes
|
joinSuccessors = someNodeIDs
|
||||||
, joinPredecessors = someNodes
|
, joinPredecessors = someNodeIDs
|
||||||
, joinCache = [
|
, joinCache = [
|
||||||
RemoteCacheEntry exampleNodeState (toEnum 23420001)
|
RemoteCacheEntry exampleNodeState (toEnum 23420001)
|
||||||
, RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0)
|
, RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0)
|
||||||
|
@ -209,7 +187,7 @@ spec = do
|
||||||
}
|
}
|
||||||
lResPayload = LeaveResponsePayload
|
lResPayload = LeaveResponsePayload
|
||||||
stabResPayload = StabiliseResponsePayload {
|
stabResPayload = StabiliseResponsePayload {
|
||||||
stabiliseSuccessors = someNodes
|
stabiliseSuccessors = someNodeIDs
|
||||||
, stabilisePredecessors = []
|
, stabilisePredecessors = []
|
||||||
}
|
}
|
||||||
pingResPayload = PingResponsePayload {
|
pingResPayload = PingResponsePayload {
|
||||||
|
@ -221,16 +199,16 @@ spec = do
|
||||||
requestTemplate = Request {
|
requestTemplate = Request {
|
||||||
requestID = 2342
|
requestID = 2342
|
||||||
, sender = exampleNodeState
|
, sender = exampleNodeState
|
||||||
|
, parts = 1
|
||||||
, part = 1
|
, part = 1
|
||||||
, isFinalPart = True
|
|
||||||
, action = undefined
|
, action = undefined
|
||||||
, payload = undefined
|
, payload = undefined
|
||||||
}
|
}
|
||||||
responseTemplate = Response {
|
responseTemplate = Response {
|
||||||
requestID = 2342
|
responseTo = 2342
|
||||||
, senderID = nid exampleNodeState
|
, senderID = nid exampleNodeState
|
||||||
|
, parts = 1
|
||||||
, part = 1
|
, part = 1
|
||||||
, isFinalPart = True
|
|
||||||
, action = undefined
|
, action = undefined
|
||||||
, payload = undefined
|
, payload = undefined
|
||||||
}
|
}
|
||||||
|
@ -238,12 +216,6 @@ spec = do
|
||||||
responseWith a pa = responseTemplate {action = a, payload = Just pa}
|
responseWith a pa = responseTemplate {action = a, payload = Just pa}
|
||||||
|
|
||||||
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
|
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
|
||||||
largeMessage = responseWith Join $ JoinResponsePayload {
|
|
||||||
joinSuccessors = flip setNid exampleNodeState . fromInteger <$> [-20..150]
|
|
||||||
, joinPredecessors = flip setNid exampleNodeState . fromInteger <$> [5..11]
|
|
||||||
, joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]]
|
|
||||||
}
|
|
||||||
|
|
||||||
it "messages are encoded and decoded correctly from and to ASN1" $ do
|
it "messages are encoded and decoded correctly from and to ASN1" $ do
|
||||||
encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
|
encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
|
||||||
encodeDecodeAndCheck $ requestWith Join jReqPayload
|
encodeDecodeAndCheck $ requestWith Join jReqPayload
|
||||||
|
@ -259,54 +231,35 @@ spec = do
|
||||||
it "messages are encoded and decoded to ASN.1 DER properly" $
|
it "messages are encoded and decoded to ASN.1 DER properly" $
|
||||||
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
|
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
|
||||||
it "messages too large for a single packet can (often) be split into multiple parts" $ do
|
it "messages too large for a single packet can (often) be split into multiple parts" $ do
|
||||||
|
let largeMessage = responseWith Join $ JoinResponsePayload {
|
||||||
|
joinSuccessors = fromInteger <$> [-20..150]
|
||||||
|
, joinPredecessors = fromInteger <$> [5..11]
|
||||||
|
, joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]]
|
||||||
|
}
|
||||||
-- TODO: once splitting works more efficient, test for exact number or payload, see #18
|
-- TODO: once splitting works more efficient, test for exact number or payload, see #18
|
||||||
length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True
|
length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True
|
||||||
length (serialiseMessage 60000 largeMessage) `shouldBe` 1
|
length (serialiseMessage 6000 largeMessage) `shouldBe` 1
|
||||||
it "message part numbering starts at the submitted part number" $ do
|
|
||||||
isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True
|
|
||||||
let startAt5 = serialiseMessage 600 (largeMessage {part = 5})
|
|
||||||
Map.lookup 1 startAt5 `shouldBe` Nothing
|
|
||||||
part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5
|
|
||||||
describe "join cache lookup" $
|
|
||||||
it "A bootstrap cache initialised with just one node returns that one." $ do
|
|
||||||
let
|
|
||||||
bootstrapNid = toNodeID 34804191837661041451755206127000721433747285589603756490902196113256157045194
|
|
||||||
bootstrapNode = setNid bootstrapNid exampleNodeState
|
|
||||||
bootstrapCache = addCacheEntryPure 10 (RemoteCacheEntry bootstrapNode 19) initCache
|
|
||||||
ownId = toNodeID 34804191837661041451755206127000721433707928516052624394829818586723613390165
|
|
||||||
ownNode <- setNid ownId <$> exampleLocalNode
|
|
||||||
let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId
|
|
||||||
remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- some example data
|
-- some example data
|
||||||
|
|
||||||
exampleNodeState :: RemoteNodeState
|
exampleNodeState :: NodeState
|
||||||
exampleNodeState = RemoteNodeState {
|
exampleNodeState = NodeState {
|
||||||
nid = toNodeID 12
|
nid = toNodeID 12
|
||||||
, domain = exampleNodeDomain
|
, domain = exampleNodeDomain
|
||||||
, ipAddr = exampleIp
|
, ipAddr = exampleIp
|
||||||
, dhtPort = 2342
|
, dhtPort = 2342
|
||||||
, servicePort = 513
|
, apPort = Nothing
|
||||||
, vServerID = 0
|
, vServerID = 0
|
||||||
|
, internals = Nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
exampleLocalNode :: IO LocalNodeState
|
exampleLocalNode :: IO NodeState
|
||||||
exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode {
|
exampleLocalNode = nodeStateInit $ FediChordConf {
|
||||||
vservers = []
|
|
||||||
, nodeConfig = exampleFediConf
|
|
||||||
, bootstrapNodes = confBootstrapNodes exampleFediConf
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
exampleFediConf :: FediChordConf
|
|
||||||
exampleFediConf = FediChordConf {
|
|
||||||
confDomain = "example.social"
|
confDomain = "example.social"
|
||||||
, confIP = exampleIp
|
, confIP = exampleIp
|
||||||
, confDhtPort = 2342
|
, confDhtPort = 2342
|
||||||
}
|
}
|
||||||
|
|
||||||
exampleNodeDomain :: String
|
exampleNodeDomain :: String
|
||||||
exampleNodeDomain = "example.social"
|
exampleNodeDomain = "example.social"
|
||||||
exampleVs :: (Integral i) => i
|
exampleVs :: (Integral i) => i
|
||||||
|
|
Loading…
Reference in a new issue