Compare commits
2 commits
abbe664ca1
...
f6481996d7
Author | SHA1 | Date | |
---|---|---|---|
|
f6481996d7 | ||
|
8d349212b4 |
|
@ -32,6 +32,7 @@ module Hash2Pub.DHTProtocol
|
||||||
, ackRequest
|
, ackRequest
|
||||||
, isPossibleSuccessor
|
, isPossibleSuccessor
|
||||||
, isPossiblePredecessor
|
, isPossiblePredecessor
|
||||||
|
, isJoined
|
||||||
, closestCachePredecessors
|
, closestCachePredecessors
|
||||||
)
|
)
|
||||||
where
|
where
|
||||||
|
@ -89,7 +90,9 @@ import Debug.Trace (trace)
|
||||||
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
|
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
|
||||||
queryLocalCache ownState nCache lBestNodes targetID
|
queryLocalCache ownState nCache lBestNodes targetID
|
||||||
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
||||||
| targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
-- This only makes sense if the node is part of the DHT by having joined.
|
||||||
|
-- A default answer to nodes querying an unjoined node is provided by 'respondQueryID'.
|
||||||
|
| isJoined ownState && targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
||||||
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
||||||
-- the closest succeeding node (like with the p initiated parallel queries
|
-- the closest succeeding node (like with the p initiated parallel queries
|
||||||
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
|
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
|
||||||
|
@ -213,8 +216,8 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
|
||||||
|
|
||||||
-- | uses the successor and predecessor list of a node as an indicator for whether a
|
-- | uses the successor and predecessor list of a node as an indicator for whether a
|
||||||
-- node has properly joined the DHT
|
-- node has properly joined the DHT
|
||||||
isJoined_ :: LocalNodeState -> Bool
|
isJoined :: LocalNodeState -> Bool
|
||||||
isJoined_ ns = not . all null $ [successors ns, predecessors ns]
|
isJoined ns = not . all null $ [successors ns, predecessors ns]
|
||||||
|
|
||||||
-- | the size limit to be used when serialising messages for sending
|
-- | the size limit to be used when serialising messages for sending
|
||||||
sendMessageSize :: Num i => i
|
sendMessageSize :: Num i => i
|
||||||
|
@ -260,8 +263,8 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||||
-- ToDo: figure out what happens if not joined
|
-- ToDo: figure out what happens if not joined
|
||||||
QueryID -> Just <$> respondQueryID nsSTM msgSet
|
QueryID -> Just <$> respondQueryID nsSTM msgSet
|
||||||
-- only when joined
|
-- only when joined
|
||||||
Leave -> if isJoined_ ns then Just <$> respondLeave nsSTM msgSet else pure Nothing
|
Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing
|
||||||
Stabilise -> if isJoined_ ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing
|
Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing
|
||||||
)
|
)
|
||||||
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
||||||
|
|
||||||
|
@ -295,7 +298,11 @@ respondQueryID nsSTM msgSet = do
|
||||||
cache <- readTVar $ nodeCacheSTM nsSnap
|
cache <- readTVar $ nodeCacheSTM nsSnap
|
||||||
let
|
let
|
||||||
responsePayload = QueryIDResponsePayload {
|
responsePayload = QueryIDResponsePayload {
|
||||||
queryResult = queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
queryResult = if isJoined nsSnap
|
||||||
|
then queryLocalCache nsSnap cache (fromIntegral $ queryLBestNodes senderPayload') (queryTargetID senderPayload')
|
||||||
|
-- if not joined yet, attract responsibility for
|
||||||
|
-- all keys to make bootstrapping possible
|
||||||
|
else FOUND (toRemoteNodeState nsSnap)
|
||||||
}
|
}
|
||||||
queryResponseMsg = Response {
|
queryResponseMsg = Response {
|
||||||
requestID = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
|
|
|
@ -253,7 +253,9 @@ checkCacheSliceInvariants :: LocalNodeState
|
||||||
-> NodeCache
|
-> NodeCache
|
||||||
-> [NodeID] -- ^ list of middle IDs of slices not
|
-> [NodeID] -- ^ list of middle IDs of slices not
|
||||||
-- ^ fulfilling the invariant
|
-- ^ fulfilling the invariant
|
||||||
checkCacheSliceInvariants ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
|
checkCacheSliceInvariants ns
|
||||||
|
| isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
|
||||||
|
| otherwise = const []
|
||||||
where
|
where
|
||||||
jEntries = jEntriesPerSlice ns
|
jEntries = jEntriesPerSlice ns
|
||||||
lastPred = getNid <$> lastMay (predecessors ns)
|
lastPred = getNid <$> lastMay (predecessors ns)
|
||||||
|
@ -340,7 +342,7 @@ stabiliseThread nsSTM = forever $ do
|
||||||
-- try looking up additional neighbours if list too short
|
-- try looking up additional neighbours if list too short
|
||||||
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||||
ns' <- readTVarIO nsSTM
|
ns' <- readTVarIO nsSTM
|
||||||
nextEntry <- requestQueryID ns' $ pred . getNid $ atDef (toRemoteNodeState ns') (predecessors ns') (-1)
|
nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
||||||
atomically $ do
|
atomically $ do
|
||||||
latestNs <- readTVar nsSTM
|
latestNs <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
|
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
|
||||||
|
@ -348,7 +350,7 @@ stabiliseThread nsSTM = forever $ do
|
||||||
|
|
||||||
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||||
ns' <- readTVarIO nsSTM
|
ns' <- readTVarIO nsSTM
|
||||||
nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1)
|
nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
|
||||||
atomically $ do
|
atomically $ do
|
||||||
latestNs <- readTVar nsSTM
|
latestNs <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
|
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
|
||||||
|
@ -460,6 +462,9 @@ fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||||
-> LocalNodeStateSTM -- ^ acting NodeState
|
-> LocalNodeStateSTM -- ^ acting NodeState
|
||||||
-> IO ()
|
-> IO ()
|
||||||
fediMessageHandler sendQ recvQ nsSTM = do
|
fediMessageHandler sendQ recvQ nsSTM = do
|
||||||
|
-- Read node state just once, assuming that all relevant data for this function does
|
||||||
|
-- not change.
|
||||||
|
-- Other functions are passed the nsSTM reference and thus can get the latest state.
|
||||||
nsSnap <- readTVarIO nsSTM
|
nsSnap <- readTVarIO nsSTM
|
||||||
-- handling multipart messages:
|
-- handling multipart messages:
|
||||||
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
||||||
|
|
|
@ -132,10 +132,11 @@ spec = do
|
||||||
cacheWith2Entries :: NodeCache
|
cacheWith2Entries :: NodeCache
|
||||||
cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
|
cacheWith2Entries = addCacheEntryPure 10 (RemoteCacheEntry node5 10) (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
|
||||||
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries)
|
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) (addCacheEntryPure 10 (RemoteCacheEntry node4 10) cacheWith2Entries)
|
||||||
it "nodes not joined provide the default answer FOUND" $ do
|
it "unjoined nodes should never return themselfs" $ do
|
||||||
exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode
|
exampleLocalNodeAsRemote <- toRemoteNodeState <$> exampleLocalNode
|
||||||
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FOUND exampleLocalNodeAsRemote
|
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty
|
||||||
queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FOUND exampleLocalNodeAsRemote
|
(FORWARD fwSet) <- queryLocalCache <$> exampleLocalNode <*> pure cacheWith4Entries <*> pure 1 <*> (getNid <$> exampleLocalNode)
|
||||||
|
remoteNode (head $ Set.elems fwSet) `shouldBe` node4
|
||||||
it "joined nodes do not fall back to the default" $
|
it "joined nodes do not fall back to the default" $
|
||||||
queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty
|
queryLocalCache <$> node1 <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 3) `shouldReturn` FORWARD Set.empty
|
||||||
it "works on a cache with less entries than needed" $ do
|
it "works on a cache with less entries than needed" $ do
|
||||||
|
@ -265,6 +266,17 @@ spec = do
|
||||||
let startAt5 = serialiseMessage 600 (largeMessage {part = 5})
|
let startAt5 = serialiseMessage 600 (largeMessage {part = 5})
|
||||||
Map.lookup 1 startAt5 `shouldBe` Nothing
|
Map.lookup 1 startAt5 `shouldBe` Nothing
|
||||||
part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5
|
part <$> (deserialiseMessage . fromJust) (Map.lookup 5 startAt5) `shouldBe` Right 5
|
||||||
|
describe "join cache lookup" $
|
||||||
|
it "A bootstrap cache initialised with just one node returns that one." $ do
|
||||||
|
let
|
||||||
|
bootstrapNid = toNodeID 34804191837661041451755206127000721433747285589603756490902196113256157045194
|
||||||
|
bootstrapNode = setNid bootstrapNid exampleNodeState
|
||||||
|
bootstrapCache = addCacheEntryPure 10 (RemoteCacheEntry bootstrapNode 19) initCache
|
||||||
|
ownId = toNodeID 34804191837661041451755206127000721433707928516052624394829818586723613390165
|
||||||
|
ownNode <- setNid ownId <$> exampleLocalNode
|
||||||
|
let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId
|
||||||
|
remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- some example data
|
-- some example data
|
||||||
|
|
Loading…
Reference in a new issue