Compare commits
3 commits
7f5dac55ea
...
f7ed0ee8d8
Author | SHA1 | Date | |
---|---|---|---|
f7ed0ee8d8 | |||
2c3ef44064 | |||
280d928ad7 |
3 changed files with 139 additions and 88 deletions
|
@ -14,11 +14,13 @@ module Hash2Pub.DHTProtocol
|
|||
, ActionPayload(..)
|
||||
, FediChordMessage(..)
|
||||
, maximumParts
|
||||
, sendQueryIdMessage
|
||||
, sendQueryIdMessages
|
||||
, requestQueryID
|
||||
, requestJoin
|
||||
, requestPing
|
||||
, requestStabilise
|
||||
, lookupMessage
|
||||
, sendRequestTo
|
||||
, queryIdLookupLoop
|
||||
, queueAddEntries
|
||||
, queueDeleteEntries
|
||||
|
@ -30,6 +32,7 @@ module Hash2Pub.DHTProtocol
|
|||
, ackRequest
|
||||
, isPossibleSuccessor
|
||||
, isPossiblePredecessor
|
||||
, closestCachePredecessors
|
||||
)
|
||||
where
|
||||
|
||||
|
@ -89,7 +92,7 @@ queryLocalCache ownState nCache lBestNodes targetID
|
|||
| targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
||||
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
||||
-- the closest succeeding node (like with the p initiated parallel queries
|
||||
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
|
||||
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
|
||||
where
|
||||
ownID = getNid ownState
|
||||
preds = predecessors ownState
|
||||
|
@ -97,18 +100,22 @@ queryLocalCache ownState nCache lBestNodes targetID
|
|||
closestSuccessor :: Set.Set RemoteCacheEntry
|
||||
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
|
||||
|
||||
closestPredecessors :: Set.Set RemoteCacheEntry
|
||||
closestPredecessors = closestPredecessor (lBestNodes-1) targetID
|
||||
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
|
||||
closestPredecessor 0 _ = Set.empty
|
||||
closestPredecessor remainingLookups lastID
|
||||
| remainingLookups < 0 = Set.empty
|
||||
| otherwise =
|
||||
let result = cacheLookupPred lastID nCache
|
||||
in
|
||||
case toRemoteCacheEntry <$> result of
|
||||
Nothing -> Set.empty
|
||||
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns)
|
||||
|
||||
-- | look up the 3 direct predecessor cache entries of a given ID
|
||||
closestCachePredecessors :: (Integral n)
|
||||
=> n -- ^ number of entries to look up
|
||||
-> NodeID -- ^ target ID to get the predecessors of
|
||||
-> NodeCache -- ^ cache to use for lookup
|
||||
-> Set.Set RemoteCacheEntry
|
||||
closestCachePredecessors 0 _ _ = Set.empty
|
||||
closestCachePredecessors remainingLookups lastID nCache
|
||||
| remainingLookups < 0 = Set.empty
|
||||
| otherwise =
|
||||
let result = cacheLookupPred lastID nCache
|
||||
in
|
||||
case toRemoteCacheEntry <$> result of
|
||||
Nothing -> Set.empty
|
||||
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache
|
||||
|
||||
-- | Determines whether a lookup key is within the responsibility slice of a node,
|
||||
-- as it falls between its first predecessor and the node itself.
|
||||
|
@ -496,45 +503,65 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
|
|||
case localResult of
|
||||
FOUND thisNode -> pure thisNode
|
||||
FORWARD nodeSet -> do
|
||||
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
|
||||
now <- getPOSIXTime
|
||||
-- check for a FOUND and return it
|
||||
case responseEntries of
|
||||
FOUND foundNode -> pure foundNode
|
||||
-- if not FOUND, insert entries into local cache copy and recurse
|
||||
FORWARD entrySet ->
|
||||
let newLCache = foldr' (
|
||||
addCacheEntryPure now
|
||||
) cacheSnapshot entrySet
|
||||
in
|
||||
-- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value
|
||||
queryIdLookupLoop newLCache ns targetID
|
||||
|
||||
|
||||
sendQueryIdMessages :: (Integral i)
|
||||
=> NodeID -- ^ target key ID to look up
|
||||
-> LocalNodeState -- ^ node state of the node doing the query
|
||||
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
|
||||
-> [RemoteNodeState] -- ^ nodes to query
|
||||
-> IO QueryResponse -- ^ accumulated response
|
||||
sendQueryIdMessages targetID ns lParam targets = do
|
||||
|
||||
-- create connected sockets to all query targets and use them for request handling
|
||||
-- ToDo: make attempts and timeout configurable
|
||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (sendQueryIdMessage targetID ns)) $ remoteNode <$> Set.toList nodeSet
|
||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (
|
||||
sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
|
||||
)) targets
|
||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
-- ToDo: exception handling, maybe log them
|
||||
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
||||
-- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup
|
||||
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
|
||||
now <- getPOSIXTime
|
||||
newLCache <- foldM (\oldCache resp -> do
|
||||
let entriesToInsert = case queryResult <$> payload resp of
|
||||
Just (FOUND result1) -> [RemoteCacheEntry result1 now]
|
||||
Just (FORWARD resultset) -> Set.elems resultset
|
||||
_ -> []
|
||||
-- collect cache entries from all responses
|
||||
foldM (\acc resp -> do
|
||||
let entrySet = case queryResult <$> payload resp of
|
||||
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
|
||||
Just (FORWARD resultset) -> resultset
|
||||
_ -> Set.empty
|
||||
|
||||
-- forward entries to global cache
|
||||
queueAddEntries entriesToInsert ns
|
||||
-- insert entries into local cache copy
|
||||
pure $ foldr' (
|
||||
addCacheEntryPure now
|
||||
) oldCache entriesToInsert
|
||||
) cacheSnapshot responses
|
||||
queueAddEntries entrySet ns
|
||||
-- return accumulated QueryResult
|
||||
pure $ case acc of
|
||||
-- once a FOUND as been encountered, return this as a result
|
||||
isFound@FOUND{} -> isFound
|
||||
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
|
||||
|
||||
-- check for a FOUND and return it
|
||||
let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of
|
||||
Just (FOUND ns') -> Just ns'
|
||||
_ -> Nothing
|
||||
) $ responses
|
||||
-- if no FOUND, recursively call lookup again
|
||||
-- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value
|
||||
maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp
|
||||
) (FORWARD Set.empty) responses
|
||||
|
||||
|
||||
sendQueryIdMessage :: NodeID -- ^ target key ID to look up
|
||||
-> LocalNodeState -- ^ node state of the node doing the query
|
||||
-> Socket -- ^ connected socket to use for sending
|
||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||
sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns)
|
||||
where
|
||||
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
|
||||
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
||||
lookupMessage :: Integral i
|
||||
=> NodeID -- ^ target ID
|
||||
-> LocalNodeState -- ^ sender node state
|
||||
-> Maybe i -- ^ optionally provide a different l parameter
|
||||
-> (Integer -> FediChordMessage)
|
||||
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||
where
|
||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
|
||||
|
||||
|
||||
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
||||
|
|
|
@ -136,7 +136,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
|||
-- 1. get routed to placement of own ID until FOUND:
|
||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||
ns <- readTVarIO nsSTM
|
||||
bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock
|
||||
bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock
|
||||
if bootstrapResponse == Set.empty
|
||||
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
|
||||
else do
|
||||
|
@ -231,10 +231,74 @@ cacheVerifyThread nsSTM = forever $ do
|
|||
) pong
|
||||
else pure ()
|
||||
)
|
||||
|
||||
|
||||
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
||||
-- middle of each slice not verifying the invariant
|
||||
latestNs <- readTVarIO nsSTM
|
||||
latestCache <- readTVarIO $ nodeCacheSTM latestNs
|
||||
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
||||
FOUND node -> [node]
|
||||
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
||||
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
||||
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
||||
)
|
||||
|
||||
threadDelay $ toEnum (fromEnum maxEntryAge `div` 20)
|
||||
|
||||
|
||||
-- | Checks the invariant of at least @jEntries@ per cache slice.
|
||||
-- If this invariant does not hold, the middle of the slice is returned for
|
||||
-- making lookups to that ID
|
||||
checkCacheSliceInvariants :: LocalNodeState
|
||||
-> NodeCache
|
||||
-> [NodeID] -- ^ list of middle IDs of slices not
|
||||
-- ^ fulfilling the invariant
|
||||
checkCacheSliceInvariants ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
|
||||
where
|
||||
jEntries = jEntriesPerSlice ns
|
||||
lastPred = getNid <$> lastMay (predecessors ns)
|
||||
lastSucc = getNid <$> lastMay (successors ns)
|
||||
-- start slice boundary: 1/2 key space
|
||||
startBound = getNid ns + 2^(idBits - 1)
|
||||
|
||||
checkSuccessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID]
|
||||
checkSuccessorSlice _ _ _ Nothing _ = []
|
||||
checkSuccessorSlice j ownID upperBound (Just lastSuccID) cache
|
||||
| (upperBound `localCompare` lastSuccID) == LT = []
|
||||
| otherwise =
|
||||
let
|
||||
diff = getNodeID $ upperBound - ownID
|
||||
lowerBound = ownID + fromInteger (diff `div` 2)
|
||||
middleID = lowerBound + fromInteger (diff `div` 4)
|
||||
lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache
|
||||
in
|
||||
-- check whether j entries are in the slice
|
||||
if length lookupResult == jEntries
|
||||
&& all (\r -> (r `localCompare` lowerBound) == GT) lookupResult
|
||||
&& all (\r -> (r `localCompare` upperBound) == LT) lookupResult
|
||||
then checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache
|
||||
-- if not enough entries, add the middle of the slice to list
|
||||
else middleID : checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache
|
||||
|
||||
checkPredecessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID]
|
||||
checkPredecessorSlice _ _ _ Nothing _ = []
|
||||
checkPredecessorSlice j ownID lowerBound (Just lastPredID) cache
|
||||
| (lowerBound `localCompare` lastPredID) == GT = []
|
||||
| otherwise =
|
||||
let
|
||||
diff = getNodeID $ ownID - lowerBound
|
||||
upperBound = ownID - fromInteger (diff `div` 2)
|
||||
middleID = lowerBound + fromInteger (diff `div` 4)
|
||||
lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache
|
||||
in
|
||||
-- check whether j entries are in the slice
|
||||
if length lookupResult == jEntries
|
||||
&& all (\r -> (r `localCompare` lowerBound) == GT) lookupResult
|
||||
&& all (\r -> (r `localCompare` upperBound) == LT) lookupResult
|
||||
then checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache
|
||||
-- if not enough entries, add the middle of the slice to list
|
||||
else middleID : checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache
|
||||
|
||||
|
||||
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
|
||||
-- one responds, and get their neighbours for maintaining the own neighbour lists.
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
module Hash2Pub.FediChordTypes (
|
||||
NodeID -- abstract, but newtype constructors cannot be hidden
|
||||
, idBits
|
||||
, getNodeID
|
||||
, toNodeID
|
||||
, NodeState (..)
|
||||
|
@ -380,7 +381,7 @@ lookupWrapper f fRepeat direction key rmap =
|
|||
then lookupWrapper fRepeat fRepeat direction newKey rmap
|
||||
else Nothing
|
||||
-- normal entries are returned
|
||||
Just (_, (KeyEntry entry)) -> Just entry
|
||||
Just (_, KeyEntry entry) -> Just entry
|
||||
Nothing -> Nothing
|
||||
where
|
||||
rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool
|
||||
|
@ -563,47 +564,6 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
|
|||
parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0
|
||||
parseWithOffset offset word = toInteger word * 2^(8 * offset)
|
||||
|
||||
|
||||
|
||||
-- TODO: complete rewrite
|
||||
-- |checks wether the cache entries fulfill the logarithmic EpiChord invariant
|
||||
-- of having j entries per slice, and creates a list of necessary lookup actions.
|
||||
-- Should be invoked periodically.
|
||||
--checkCacheSlices :: NodeState -> IO [()]
|
||||
--checkCacheSlices state = case getNodeCache state of
|
||||
-- -- don't do anything on nodes without a cache
|
||||
-- Nothing -> pure [()]
|
||||
-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache'
|
||||
-- -- TODO: do the same for predecessors
|
||||
-- where
|
||||
-- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state
|
||||
-- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state)
|
||||
-- startBound = NodeID 2^(255::Integer) + nid state
|
||||
-- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()]
|
||||
-- checkSlice _ _ _ Nothing _ = []
|
||||
-- checkSlice j ownID upperBound (Just lastSuccNode) cache
|
||||
-- | upperBound < lastSuccNode = []
|
||||
-- | otherwise =
|
||||
-- -- continuously half the DHT namespace, take the upper part as a slice,
|
||||
-- -- check for existing entries in that slice and create a lookup action
|
||||
-- -- and recursively do this on the lower half.
|
||||
-- -- recursion edge case: all successors/ predecessors need to be in the
|
||||
-- -- first slice.
|
||||
-- let
|
||||
-- diff = getNodeID $ upperBound - ownID
|
||||
-- lowerBound = ownID + NodeID (diff `div` 2)
|
||||
-- in
|
||||
-- -- TODO: replace empty IO actions with actual lookups to middle of slice
|
||||
-- -- TODO: validate ID before adding to cache
|
||||
-- case Map.lookupLT upperBound cache of
|
||||
-- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
|
||||
-- Just (matchID, _) ->
|
||||
-- if
|
||||
-- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
|
||||
-- else
|
||||
-- checkSlice j ownID lowerBound (Just lastSuccNode) cache
|
||||
|
||||
|
||||
-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages
|
||||
-- persist them on disk so they can be used for all following bootstraps
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue