Compare commits
	
		
			2 commits
		
	
	
		
			abbe664ca1
			...
			f6481996d7
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| f6481996d7 | |||
| 8d349212b4 | 
					 3 changed files with 36 additions and 12 deletions
				
			
		|  | @ -32,6 +32,7 @@ module Hash2Pub.DHTProtocol | ||||||
|     , ackRequest |     , ackRequest | ||||||
|     , isPossibleSuccessor |     , isPossibleSuccessor | ||||||
|     , isPossiblePredecessor |     , isPossiblePredecessor | ||||||
|  |     , isJoined | ||||||
|     , closestCachePredecessors |     , closestCachePredecessors | ||||||
|     ) |     ) | ||||||
|         where |         where | ||||||
|  | @ -89,7 +90,9 @@ import           Debug.Trace                    (trace) | ||||||
| queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse | queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse | ||||||
| queryLocalCache ownState nCache lBestNodes targetID | queryLocalCache ownState nCache lBestNodes targetID | ||||||
|     -- as target ID falls between own ID and first predecessor, it is handled by this node |     -- as target ID falls between own ID and first predecessor, it is handled by this node | ||||||
|       | targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState |     -- This only makes sense if the node is part of the DHT by having joined. | ||||||
|  |     -- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'. | ||||||
|  |       | isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState | ||||||
|     -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and |     -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and | ||||||
|     -- the closest succeeding node (like with the p initiated parallel queries |     -- the closest succeeding node (like with the p initiated parallel queries | ||||||
|       | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache |       | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | ||||||
|  | @ -213,8 +216,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g | ||||||
| 
 | 
 | ||||||
| -- | uses the successor and predecessor list of a node as an indicator for whether a | -- | uses the successor and predecessor list of a node as an indicator for whether a | ||||||
| -- node has properly joined the DHT | -- node has properly joined the DHT | ||||||
| isJoined_ :: LocalNodeState -> Bool | isJoined :: LocalNodeState -> Bool | ||||||
| isJoined_ ns = not . all null $ [successors ns, predecessors ns] | isJoined ns = not . all null $ [successors ns, predecessors ns] | ||||||
| 
 | 
 | ||||||
| -- | the size limit to be used when serialising messages for sending | -- | the size limit to be used when serialising messages for sending | ||||||
| sendMessageSize :: Num i => i | sendMessageSize :: Num i => i | ||||||
|  | @ -260,8 +263,8 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do | ||||||
|                 -- ToDo: figure out what happens if not joined |                 -- ToDo: figure out what happens if not joined | ||||||
|                 QueryID -> Just <$> respondQueryID nsSTM msgSet |                 QueryID -> Just <$> respondQueryID nsSTM msgSet | ||||||
|                 -- only when joined |                 -- only when joined | ||||||
|                 Leave -> if isJoined_ ns then Just <$> respondLeave nsSTM msgSet else pure Nothing |                 Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing | ||||||
|                 Stabilise -> if isJoined_ ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing |                 Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing | ||||||
|             ) |             ) | ||||||
|       -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. |       -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. | ||||||
| 
 | 
 | ||||||
|  | @ -295,7 +298,11 @@ respondQueryID nsSTM msgSet = do | ||||||
|             cache <- readTVar $ nodeCacheSTM nsSnap |             cache <- readTVar $ nodeCacheSTM nsSnap | ||||||
|             let |             let | ||||||
|                 responsePayload = QueryIDResponsePayload { |                 responsePayload = QueryIDResponsePayload { | ||||||
|                     queryResult = queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') |                     queryResult = if isJoined nsSnap | ||||||
|  |                                      then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload') | ||||||
|  |                                      -- if not joined yet, attract responsibility for | ||||||
|  |                                      -- all keys to make bootstrapping possible | ||||||
|  |                                      else FOUND (toRemoteNodeState nsSnap) | ||||||
|                                                         } |                                                         } | ||||||
|                 queryResponseMsg = Response { |                 queryResponseMsg = Response { | ||||||
|                     requestID = requestID aRequestPart |                     requestID = requestID aRequestPart | ||||||
|  |  | ||||||
|  | @ -253,7 +253,9 @@ checkCacheSliceInvariants :: LocalNodeState | ||||||
|                           -> NodeCache |                           -> NodeCache | ||||||
|                           -> [NodeID]   -- ^ list of middle IDs of slices not |                           -> [NodeID]   -- ^ list of middle IDs of slices not | ||||||
|                                         -- ^ fulfilling the invariant |                                         -- ^ fulfilling the invariant | ||||||
| checkCacheSliceInvariants ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc | checkCacheSliceInvariants ns  | ||||||
|  |   | isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc | ||||||
|  |   | otherwise = const [] | ||||||
|   where |   where | ||||||
|     jEntries = jEntriesPerSlice ns |     jEntries = jEntriesPerSlice ns | ||||||
|     lastPred = getNid <$> lastMay (predecessors ns) |     lastPred = getNid <$> lastMay (predecessors ns) | ||||||
|  | @ -340,7 +342,7 @@ stabiliseThread nsSTM = forever $ do | ||||||
|     -- try looking up additional neighbours if list too short |     -- try looking up additional neighbours if list too short | ||||||
|     forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do |     forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do | ||||||
|         ns' <- readTVarIO nsSTM |         ns' <- readTVarIO nsSTM | ||||||
|         nextEntry <- requestQueryID ns' $ pred . getNid $ atDef (toRemoteNodeState ns') (predecessors ns') (-1) |         nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') | ||||||
|         atomically $ do |         atomically $ do | ||||||
|             latestNs <- readTVar nsSTM |             latestNs <- readTVar nsSTM | ||||||
|             writeTVar nsSTM $ addPredecessors [nextEntry] latestNs |             writeTVar nsSTM $ addPredecessors [nextEntry] latestNs | ||||||
|  | @ -348,7 +350,7 @@ stabiliseThread nsSTM = forever $ do | ||||||
| 
 | 
 | ||||||
|     forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do |     forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do | ||||||
|         ns' <- readTVarIO nsSTM |         ns' <- readTVarIO nsSTM | ||||||
|         nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1) |         nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') | ||||||
|         atomically $ do |         atomically $ do | ||||||
|             latestNs <- readTVar nsSTM |             latestNs <- readTVar nsSTM | ||||||
|             writeTVar nsSTM $ addSuccessors [nextEntry] latestNs |             writeTVar nsSTM $ addSuccessors [nextEntry] latestNs | ||||||
|  | @ -460,6 +462,9 @@ fediMessageHandler :: TQueue (BS.ByteString, SockAddr)  -- ^ send queue | ||||||
|                    -> LocalNodeStateSTM                    -- ^ acting NodeState |                    -> LocalNodeStateSTM                    -- ^ acting NodeState | ||||||
|                    -> IO () |                    -> IO () | ||||||
| fediMessageHandler sendQ recvQ nsSTM = do | fediMessageHandler sendQ recvQ nsSTM = do | ||||||
|  |     -- Read node state just once, assuming that all relevant data for this function does | ||||||
|  |     -- not change. | ||||||
|  |     -- Other functions are passed the nsSTM reference and thus can get the latest state. | ||||||
|     nsSnap <- readTVarIO nsSTM |     nsSnap <- readTVarIO nsSTM | ||||||
|     -- handling multipart messages: |     -- handling multipart messages: | ||||||
|     -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. |     -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. | ||||||
|  |  | ||||||
|  | @ -132,10 +132,11 @@ spec = do | ||||||
|             cacheWith2Entries :: NodeCache |             cacheWith2Entries :: NodeCache | ||||||
|             cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) |             cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) | ||||||
|             cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries) |             cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries) | ||||||
|         it "nodes not joined provide the default answer FOUND" $ do |         it "unjoined nodes should never return themselfs" $ do | ||||||
|             exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode |             exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode | ||||||
|             queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FOUND exampleLocalNodeAsRemote |             queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty | ||||||
|             queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FOUND exampleLocalNodeAsRemote |             (FORWARD fwSet) <- queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> (getNid <$> exampleLocalNode) | ||||||
|  |             remoteNode (head $ Set.elems fwSet) `shouldBe` node4 | ||||||
|         it "joined nodes do not fall back to the default" $ |         it "joined nodes do not fall back to the default" $ | ||||||
|             queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty |             queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty | ||||||
|         it "works on a cache with less entries than needed" $ do |         it "works on a cache with less entries than needed" $ do | ||||||
|  | @ -265,6 +266,17 @@ spec = do | ||||||
|             let startAt5 = serialiseMessage 600 (largeMessage {part = 5}) |             let startAt5 = serialiseMessage 600 (largeMessage {part = 5}) | ||||||
|             Map.lookup 1 startAt5 `shouldBe` Nothing |             Map.lookup 1 startAt5 `shouldBe` Nothing | ||||||
|             part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5 |             part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5 | ||||||
|  |     describe "join cache lookup" $ | ||||||
|  |         it "A bootstrap cache initialised with just one node returns that one." $ do | ||||||
|  |             let | ||||||
|  |                 bootstrapNid = toNodeID 34804191837661041451755206127000721433747285589603756490902196113256157045194 | ||||||
|  |                 bootstrapNode = setNid bootstrapNid exampleNodeState | ||||||
|  |                 bootstrapCache = addCacheEntryPure 10 (RemoteCacheEntry bootstrapNode 19) initCache | ||||||
|  |                 ownId = toNodeID 34804191837661041451755206127000721433707928516052624394829818586723613390165 | ||||||
|  |             ownNode <- setNid ownId <$> exampleLocalNode | ||||||
|  |             let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId | ||||||
|  |             remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode | ||||||
|  |              | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- some example data | -- some example data | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue