Compare commits

..

No commits in common. "f6481996d777a33c95d2a43dcdc7e54a4e9486f0" and "abbe664ca14debfe571f1e32d4d8d015d2f8d5b4" have entirely different histories.

3 changed files with 12 additions and 36 deletions

View file

@ -32,7 +32,6 @@ module Hash2Pub.DHTProtocol
, ackRequest , ackRequest
, isPossibleSuccessor , isPossibleSuccessor
, isPossiblePredecessor , isPossiblePredecessor
, isJoined
, closestCachePredecessors , closestCachePredecessors
) )
where where
@ -90,9 +89,7 @@ import Debug.Trace (trace)
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node -- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined. | targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
@ -216,8 +213,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
-- | uses the successor and predecessor list of a node as an indicator for whether a -- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT -- node has properly joined the DHT
isJoined :: LocalNodeState -> Bool isJoined_ :: LocalNodeState -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns] isJoined_ ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending -- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i sendMessageSize :: Num i => i
@ -263,8 +260,8 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- ToDo: figure out what happens if not joined -- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined -- only when joined
Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing Leave -> if isJoined_ ns then Just <$> respondLeave nsSTM msgSet else pure Nothing
Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing Stabilise -> if isJoined_ ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing
) )
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
@ -298,11 +295,7 @@ respondQueryID nsSTM msgSet = do
cache <- readTVar $ nodeCacheSTM nsSnap cache <- readTVar $ nodeCacheSTM nsSnap
let let
responsePayload = QueryIDResponsePayload { responsePayload = QueryIDResponsePayload {
queryResult = if isJoined nsSnap queryResult = queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
-- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
else FOUND (toRemoteNodeState nsSnap)
} }
queryResponseMsg = Response { queryResponseMsg = Response {
requestID = requestID aRequestPart requestID = requestID aRequestPart

View file

@ -253,9 +253,7 @@ checkCacheSliceInvariants :: LocalNodeState
-> NodeCache -> NodeCache
-> [NodeID] -- ^ list of middle IDs of slices not -> [NodeID] -- ^ list of middle IDs of slices not
-- ^ fulfilling the invariant -- ^ fulfilling the invariant
checkCacheSliceInvariants ns checkCacheSliceInvariants ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
| isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
| otherwise = const []
where where
jEntries = jEntriesPerSlice ns jEntries = jEntriesPerSlice ns
lastPred = getNid <$> lastMay (predecessors ns) lastPred = getNid <$> lastMay (predecessors ns)
@ -342,7 +340,7 @@ stabiliseThread nsSTM = forever $ do
-- try looking up additional neighbours if list too short -- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') nextEntry <- requestQueryID ns' $ pred . getNid $ atDef (toRemoteNodeState ns') (predecessors ns') (-1)
atomically $ do atomically $ do
latestNs <- readTVar nsSTM latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
@ -350,7 +348,7 @@ stabiliseThread nsSTM = forever $ do
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1)
atomically $ do atomically $ do
latestNs <- readTVar nsSTM latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
@ -462,9 +460,6 @@ fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> LocalNodeStateSTM -- ^ acting NodeState -> LocalNodeStateSTM -- ^ acting NodeState
-> IO () -> IO ()
fediMessageHandler sendQ recvQ nsSTM = do fediMessageHandler sendQ recvQ nsSTM = do
-- Read node state just once, assuming that all relevant data for this function does
-- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
-- handling multipart messages: -- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.

View file

@ -132,11 +132,10 @@ spec = do
cacheWith2Entries :: NodeCache cacheWith2Entries :: NodeCache
cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries) cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries)
it "unjoined nodes should never return themselfs" $ do it "nodes not joined provide the default answer FOUND" $ do
exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FOUND exampleLocalNodeAsRemote
(FORWARD fwSet) <- queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> (getNid <$> exampleLocalNode) queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FOUND exampleLocalNodeAsRemote
remoteNode (head $ Set.elems fwSet) `shouldBe` node4
it "joined nodes do not fall back to the default" $ it "joined nodes do not fall back to the default" $
queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty
it "works on a cache with less entries than needed" $ do it "works on a cache with less entries than needed" $ do
@ -266,17 +265,6 @@ spec = do
let startAt5 = serialiseMessage 600 (largeMessage {part = 5}) let startAt5 = serialiseMessage 600 (largeMessage {part = 5})
Map.lookup 1 startAt5 `shouldBe` Nothing Map.lookup 1 startAt5 `shouldBe` Nothing
part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5 part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5
describe "join cache lookup" $
it "A bootstrap cache initialised with just one node returns that one." $ do
let
bootstrapNid = toNodeID 34804191837661041451755206127000721433747285589603756490902196113256157045194
bootstrapNode = setNid bootstrapNid exampleNodeState
bootstrapCache = addCacheEntryPure 10 (RemoteCacheEntry bootstrapNode 19) initCache
ownId = toNodeID 34804191837661041451755206127000721433707928516052624394829818586723613390165
ownNode <- setNid ownId <$> exampleLocalNode
let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId
remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode
-- some example data -- some example data