From f15d83baffe2870d432553c625b42a890d17cbce Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 9 Jun 2020 15:21:22 +0200 Subject: [PATCH] Refactor predecessors and successors to hold RemoteNodeStates - neighbour nodes need to be contacted reliably - Only holding NodeIDs requires a cache lookup for getting hostname and port. This is brittle as the entry could've been purged from cache. - refactored ASN.1 scheme, types and add/ sort/ remove implementations closes #46 --- FediChord.asn1 | 12 ++++++------ src/Hash2Pub/ASN1Coding.hs | 24 ++++++++++++------------ src/Hash2Pub/DHTProtocol.hs | 34 +++++++++++++++++----------------- src/Hash2Pub/FediChordTypes.hs | 16 ++++++++++------ src/Hash2Pub/ProtocolTypes.hs | 12 ++++++------ test/FediChordSpec.hs | 20 ++++++++++---------- 6 files changed, 61 insertions(+), 57 deletions(-) diff --git a/FediChord.asn1 b/FediChord.asn1 index 41f9650..f278f8f 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -62,8 +62,8 @@ NodeCache ::= SEQUENCE OF CacheEntry JoinRequestPayload ::= NULL JoinResponsePayload ::= SEQUENCE { - successors SEQUENCE OF NodeID, - predecessors SEQUENCE OF NodeID, + successors SEQUENCE OF NodeState, + predecessors SEQUENCE OF NodeState, cache NodeCache } @@ -82,14 +82,14 @@ QueryIDResponsePayload ::= SEQUENCE { StabiliseRequestPayload ::= NULL StabiliseResponsePayload ::= SEQUENCE { - successors SEQUENCE OF NodeID, - predecessors SEQUENCE OF NodeID + successors SEQUENCE OF NodeState, + predecessors SEQUENCE OF NodeState -- ToDo: transfer of handled key data, if newly responsible for it } LeaveRequestPayload ::= SEQUENCE { - successors SEQUENCE OF NodeID, - predecessors SEQUENCE OF NodeID + successors SEQUENCE OF NodeState, + predecessors SEQUENCE OF NodeState -- ToDo: transfer of own data to newly responsible node } diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index d476809..456dac6 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -130,20 +130,20 @@ encodePayload LeaveResponsePayload = [Null] encodePayload payload'@LeaveRequestPayload{} = Start Sequence : Start Sequence - : fmap (IntVal . getNodeID) (leaveSuccessors payload') + : concatMap encodeNodeState (leaveSuccessors payload') <> [End Sequence , Start Sequence] - <> fmap (IntVal . getNodeID) (leavePredecessors payload') + <> concatMap encodeNodeState (leavePredecessors payload') <> [End Sequence , End Sequence] -- currently StabiliseResponsePayload and LeaveRequestPayload are equal encodePayload payload'@StabiliseResponsePayload{} = Start Sequence : Start Sequence - : fmap (IntVal . getNodeID) (stabiliseSuccessors payload') + : concatMap encodeNodeState (stabiliseSuccessors payload') <> [End Sequence , Start Sequence] - <> fmap (IntVal . getNodeID) (stabilisePredecessors payload') + <> concatMap encodeNodeState (stabilisePredecessors payload') <> [End Sequence , End Sequence] encodePayload payload'@StabiliseRequestPayload = [Null] @@ -170,10 +170,10 @@ encodePayload payload'@QueryIDRequestPayload{} = [ encodePayload payload'@JoinResponsePayload{} = Start Sequence : Start Sequence - : fmap (IntVal . getNodeID) (joinSuccessors payload') + : concatMap encodeNodeState (joinSuccessors payload') <> [End Sequence , Start Sequence] - <> fmap (IntVal . getNodeID) (joinPredecessors payload') + <> concatMap encodeNodeState (joinPredecessors payload') <> [End Sequence , Start Sequence] <> concatMap encodeCacheEntry (joinCache payload') @@ -368,8 +368,8 @@ parseJoinRequest = do parseJoinResponse :: ParseASN1 ActionPayload parseJoinResponse = onNextContainer Sequence $ do - succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) - pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + succ' <- onNextContainer Sequence (getMany parseNodeState) + pred' <- onNextContainer Sequence (getMany parseNodeState) cache <- parseNodeCache pure $ JoinResponsePayload { joinSuccessors = succ' @@ -404,8 +404,8 @@ parseStabiliseRequest = do parseStabiliseResponse :: ParseASN1 ActionPayload parseStabiliseResponse = onNextContainer Sequence $ do - succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) - pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + succ' <- onNextContainer Sequence (getMany parseNodeState) + pred' <- onNextContainer Sequence (getMany parseNodeState) pure $ StabiliseResponsePayload { stabiliseSuccessors = succ' , stabilisePredecessors = pred' @@ -413,8 +413,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do parseLeaveRequest :: ParseASN1 ActionPayload parseLeaveRequest = onNextContainer Sequence $ do - succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) - pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger) + succ' <- onNextContainer Sequence (getMany parseNodeState) + pred' <- onNextContainer Sequence (getMany parseNodeState) pure $ LeaveRequestPayload { leaveSuccessors = succ' , leavePredecessors = pred' diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 64e4602..f1eda71 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -73,7 +73,7 @@ import Debug.Trace (trace) queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache ownState nCache lBestNodes targetID -- as target ID falls between own ID and first predecessor, it is handled by this node - | (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (headMay preds) = FOUND . toRemoteNodeState $ ownState + | (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (getNid <$> headMay preds) = FOUND . toRemoteNodeState $ ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors @@ -264,8 +264,8 @@ respondLeave nsSTM msgSet = do writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID writeTVar nsSTM $ -- add predecessors and successors of leaving node to own lists - setPredecessors (delete senderID $ requestPreds <> predecessors nsSnap) - . setSuccessors (delete senderID $ requestSuccs <> successors nsSnap) $ nsSnap + setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap) + . setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap -- TODO: handle handover of key data let leaveResponse = Response { requestID = requestID aRequestPart @@ -337,7 +337,7 @@ respondJoin nsSTM msgSet = do then do -- if yes, adjust own predecessors/ successors and return those in a response let - newPreds = getNid senderNS:predecessors nsSnap + newPreds = senderNS:predecessors nsSnap joinedNS = setPredecessors newPreds nsSnap responsePayload = JoinResponsePayload { joinSuccessors = successors joinedNS @@ -381,28 +381,28 @@ requestJoin toJoinOn ownStateSTM = (cacheInsertQ, joinedState) <- atomically $ do stateSnap <- readTVar ownStateSTM let - (cacheInsertQ, joinedStateUnsorted) = foldl' - (\(insertQ, nsAcc) msg -> + (cacheInsertQ, predAccSet, succAccSet) = foldl' + (\(insertQ, predAccSet', succAccSet') msg -> let insertQ' = maybe insertQ (\msgPl -> -- collect list of insertion statements into global cache queueAddEntries (joinCache msgPl) : insertQ ) $ payload msg - -- add received predecessors and successors - addPreds ns' = maybe ns' (\msgPl -> - setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns' - ) $ payload msg - addSuccs ns' = maybe ns' (\msgPl -> - setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns' - ) $ payload msg + -- collect received predecessors and successors + predAccSet'' = maybe predAccSet' ( + foldr' Set.insert predAccSet' . joinPredecessors + ) $ payload msg + succAccSet'' = maybe succAccSet' ( + foldr' Set.insert succAccSet' . joinSuccessors + ) $ payload msg in - (insertQ', addSuccs . addPreds $ nsAcc) + (insertQ', predAccSet'', succAccSet'') ) -- reset predecessors and successors - ([], setPredecessors [] . setSuccessors [] $ ownState) + ([], Set.empty, Set.empty) responses - -- sort successors and predecessors - newState = setSuccessors (take (kNeighbours joinedStateUnsorted) . sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (take (kNeighbours joinedStateUnsorted) . sortBy (flip localCompare) $ predecessors joinedStateUnsorted) $ joinedStateUnsorted + -- sort, slice and set the accumulated successors and predecessors + newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index d5ea900..410cbe9 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -32,6 +32,7 @@ module Hash2Pub.FediChordTypes ( ) where import Control.Exception +import Data.Function (on) import Data.List (delete, nub, sortBy) import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isJust, mapMaybe) @@ -123,6 +124,9 @@ data RemoteNodeState = RemoteNodeState } deriving (Show, Eq) +instance Ord RemoteNodeState where + a `compare` b = nid a `compare` nid b + -- | represents a node and encapsulates all data and parameters that are not present for remote nodes data LocalNodeState = LocalNodeState { nodeState :: RemoteNodeState @@ -131,9 +135,9 @@ data LocalNodeState = LocalNodeState -- ^ EpiChord node cache with expiry times for nodes , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) -- ^ cache updates are not written directly to the 'nodeCache' but queued and - , successors :: [NodeID] -- could be a set instead as these are ordered as well + , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well -- ^ successor nodes in ascending order by distance - , predecessors :: [NodeID] + , predecessors :: [RemoteNodeState] -- ^ predecessor nodes in ascending order by distance , kNeighbours :: Int -- ^ desired length of predecessor and successor list @@ -213,12 +217,12 @@ instance Typeable a => Show (TQueue a) where show x = show (typeOf x) -- | convenience function that updates the successors of a 'LocalNodeState' -setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState -setSuccessors succ' ns = ns {successors = take (kNeighbours ns) . nub . sortBy localCompare $ succ'} +setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +setSuccessors succ' ns = ns {successors = take (kNeighbours ns) . nub . sortBy (localCompare `on` getNid) $ succ'} -- | convenience function that updates the predecessors of a 'LocalNodeState' -setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState -setPredecessors pred' ns = ns {predecessors = take (kNeighbours ns) . nub . sortBy (flip localCompare) $ pred'} +setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +setPredecessors pred' ns = ns {predecessors = take (kNeighbours ns) . nub . sortBy (flip (localCompare `on` getNid)) $ pred'} type NodeCache = Map.Map NodeID CacheEntry diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index 25cf2d4..afb72d2 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -53,8 +53,8 @@ data ActionPayload = QueryIDRequestPayload } | JoinRequestPayload | LeaveRequestPayload - { leaveSuccessors :: [NodeID] - , leavePredecessors :: [NodeID] + { leaveSuccessors :: [RemoteNodeState] + , leavePredecessors :: [RemoteNodeState] } | StabiliseRequestPayload | PingRequestPayload @@ -62,14 +62,14 @@ data ActionPayload = QueryIDRequestPayload { queryResult :: QueryResponse } | JoinResponsePayload - { joinSuccessors :: [NodeID] - , joinPredecessors :: [NodeID] + { joinSuccessors :: [RemoteNodeState] + , joinPredecessors :: [RemoteNodeState] , joinCache :: [RemoteCacheEntry] } | LeaveResponsePayload | StabiliseResponsePayload - { stabiliseSuccessors :: [NodeID] - , stabilisePredecessors :: [NodeID] + { stabiliseSuccessors :: [RemoteNodeState] + , stabilisePredecessors :: [RemoteNodeState] } | PingResponsePayload { pingNodeStates :: [RemoteNodeState] diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 146afcd..b289c33 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -119,7 +119,7 @@ spec = do let emptyCache = initCache nid1 = toNodeID 2^(23::Integer)+1 - node1 = setPredecessors [nid4] . setNid nid1 <$> exampleLocalNode + node1 = setPredecessors [node4] . setNid nid1 <$> exampleLocalNode nid2 = toNodeID 2^(230::Integer)+12 node2 = exampleNodeState { nid = nid2} nid3 = toNodeID 2^(25::Integer)+10 @@ -152,15 +152,15 @@ spec = do describe "Messages can be encoded to and decoded from ASN.1" $ do -- define test messages let - someNodeIDs = fmap fromInteger [3..12] + someNodes = fmap (flip setNid exampleNodeState . fromInteger) [3..12] qidReqPayload = QueryIDRequestPayload { queryTargetID = nid exampleNodeState , queryLBestNodes = 3 } jReqPayload = JoinRequestPayload lReqPayload = LeaveRequestPayload { - leaveSuccessors = someNodeIDs - , leavePredecessors = someNodeIDs + leaveSuccessors = someNodes + , leavePredecessors = someNodes } stabReqPayload = StabiliseRequestPayload pingReqPayload = PingRequestPayload @@ -174,8 +174,8 @@ spec = do ] } jResPayload = JoinResponsePayload { - joinSuccessors = someNodeIDs - , joinPredecessors = someNodeIDs + joinSuccessors = someNodes + , joinPredecessors = someNodes , joinCache = [ RemoteCacheEntry exampleNodeState (toEnum 23420001) , RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0) @@ -183,7 +183,7 @@ spec = do } lResPayload = LeaveResponsePayload stabResPayload = StabiliseResponsePayload { - stabiliseSuccessors = someNodeIDs + stabiliseSuccessors = someNodes , stabilisePredecessors = [] } pingResPayload = PingResponsePayload { @@ -213,8 +213,8 @@ spec = do encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg largeMessage = responseWith Join $ JoinResponsePayload { - joinSuccessors = fromInteger <$> [-20..150] - , joinPredecessors = fromInteger <$> [5..11] + joinSuccessors = flip setNid exampleNodeState . fromInteger <$> [-20..150] + , joinPredecessors = flip setNid exampleNodeState . fromInteger <$> [5..11] , joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]] } @@ -235,7 +235,7 @@ spec = do it "messages too large for a single packet can (often) be split into multiple parts" $ do -- TODO: once splitting works more efficient, test for exact number or payload, see #18 length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True - length (serialiseMessage 6000 largeMessage) `shouldBe` 1 + length (serialiseMessage 60000 largeMessage) `shouldBe` 1 it "message part numbering starts at the submitted part number" $ do isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True let startAt5 = serialiseMessage 600 (largeMessage {part = 5})