Refactor requestQueryID to be able to send a single request
in preparation for #30
This commit is contained in:
parent
7f5dac55ea
commit
280d928ad7
|
@ -14,11 +14,13 @@ module Hash2Pub.DHTProtocol
|
||||||
, ActionPayload(..)
|
, ActionPayload(..)
|
||||||
, FediChordMessage(..)
|
, FediChordMessage(..)
|
||||||
, maximumParts
|
, maximumParts
|
||||||
, sendQueryIdMessage
|
, sendQueryIdMessages
|
||||||
, requestQueryID
|
, requestQueryID
|
||||||
, requestJoin
|
, requestJoin
|
||||||
, requestPing
|
, requestPing
|
||||||
, requestStabilise
|
, requestStabilise
|
||||||
|
, lookupMessage
|
||||||
|
, sendRequestTo
|
||||||
, queryIdLookupLoop
|
, queryIdLookupLoop
|
||||||
, queueAddEntries
|
, queueAddEntries
|
||||||
, queueDeleteEntries
|
, queueDeleteEntries
|
||||||
|
@ -30,6 +32,7 @@ module Hash2Pub.DHTProtocol
|
||||||
, ackRequest
|
, ackRequest
|
||||||
, isPossibleSuccessor
|
, isPossibleSuccessor
|
||||||
, isPossiblePredecessor
|
, isPossiblePredecessor
|
||||||
|
, closestCachePredecessors
|
||||||
)
|
)
|
||||||
where
|
where
|
||||||
|
|
||||||
|
@ -89,7 +92,7 @@ queryLocalCache ownState nCache lBestNodes targetID
|
||||||
| targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
| targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
|
||||||
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
||||||
-- the closest succeeding node (like with the p initiated parallel queries
|
-- the closest succeeding node (like with the p initiated parallel queries
|
||||||
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
|
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
|
||||||
where
|
where
|
||||||
ownID = getNid ownState
|
ownID = getNid ownState
|
||||||
preds = predecessors ownState
|
preds = predecessors ownState
|
||||||
|
@ -97,18 +100,22 @@ queryLocalCache ownState nCache lBestNodes targetID
|
||||||
closestSuccessor :: Set.Set RemoteCacheEntry
|
closestSuccessor :: Set.Set RemoteCacheEntry
|
||||||
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
|
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
|
||||||
|
|
||||||
closestPredecessors :: Set.Set RemoteCacheEntry
|
|
||||||
closestPredecessors = closestPredecessor (lBestNodes-1) targetID
|
-- | look up the 3 direct predecessor cache entries of a given ID
|
||||||
closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry
|
closestCachePredecessors :: (Integral n)
|
||||||
closestPredecessor 0 _ = Set.empty
|
=> n -- ^ number of entries to look up
|
||||||
closestPredecessor remainingLookups lastID
|
-> NodeID -- ^ target ID to get the predecessors of
|
||||||
| remainingLookups < 0 = Set.empty
|
-> NodeCache -- ^ cache to use for lookup
|
||||||
| otherwise =
|
-> Set.Set RemoteCacheEntry
|
||||||
let result = cacheLookupPred lastID nCache
|
closestCachePredecessors 0 _ _ = Set.empty
|
||||||
in
|
closestCachePredecessors remainingLookups lastID nCache
|
||||||
case toRemoteCacheEntry <$> result of
|
| remainingLookups < 0 = Set.empty
|
||||||
Nothing -> Set.empty
|
| otherwise =
|
||||||
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns)
|
let result = cacheLookupPred lastID nCache
|
||||||
|
in
|
||||||
|
case toRemoteCacheEntry <$> result of
|
||||||
|
Nothing -> Set.empty
|
||||||
|
Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache
|
||||||
|
|
||||||
-- | Determines whether a lookup key is within the responsibility slice of a node,
|
-- | Determines whether a lookup key is within the responsibility slice of a node,
|
||||||
-- as it falls between its first predecessor and the node itself.
|
-- as it falls between its first predecessor and the node itself.
|
||||||
|
@ -496,45 +503,65 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
|
||||||
case localResult of
|
case localResult of
|
||||||
FOUND thisNode -> pure thisNode
|
FOUND thisNode -> pure thisNode
|
||||||
FORWARD nodeSet -> do
|
FORWARD nodeSet -> do
|
||||||
|
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
|
||||||
|
now <- getPOSIXTime
|
||||||
|
-- check for a FOUND and return it
|
||||||
|
case responseEntries of
|
||||||
|
FOUND foundNode -> pure foundNode
|
||||||
|
-- if not FOUND, insert entries into local cache copy and recurse
|
||||||
|
FORWARD entrySet ->
|
||||||
|
let newLCache = foldr' (
|
||||||
|
addCacheEntryPure now
|
||||||
|
) cacheSnapshot entrySet
|
||||||
|
in
|
||||||
|
-- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value
|
||||||
|
queryIdLookupLoop newLCache ns targetID
|
||||||
|
|
||||||
|
|
||||||
|
sendQueryIdMessages :: (Integral i)
|
||||||
|
=> NodeID -- ^ target key ID to look up
|
||||||
|
-> LocalNodeState -- ^ node state of the node doing the query
|
||||||
|
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
|
||||||
|
-> [RemoteNodeState] -- ^ nodes to query
|
||||||
|
-> IO QueryResponse -- ^ accumulated response
|
||||||
|
sendQueryIdMessages targetID ns lParam targets = do
|
||||||
|
|
||||||
-- create connected sockets to all query targets and use them for request handling
|
-- create connected sockets to all query targets and use them for request handling
|
||||||
-- ToDo: make attempts and timeout configurable
|
-- ToDo: make attempts and timeout configurable
|
||||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (sendQueryIdMessage targetID ns)) $ remoteNode <$> Set.toList nodeSet
|
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (
|
||||||
|
sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
|
||||||
|
)) targets
|
||||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||||
-- ToDo: exception handling, maybe log them
|
-- ToDo: exception handling, maybe log them
|
||||||
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
||||||
-- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup
|
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
newLCache <- foldM (\oldCache resp -> do
|
-- collect cache entries from all responses
|
||||||
let entriesToInsert = case queryResult <$> payload resp of
|
foldM (\acc resp -> do
|
||||||
Just (FOUND result1) -> [RemoteCacheEntry result1 now]
|
let entrySet = case queryResult <$> payload resp of
|
||||||
Just (FORWARD resultset) -> Set.elems resultset
|
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
|
||||||
_ -> []
|
Just (FORWARD resultset) -> resultset
|
||||||
|
_ -> Set.empty
|
||||||
|
|
||||||
-- forward entries to global cache
|
-- forward entries to global cache
|
||||||
queueAddEntries entriesToInsert ns
|
queueAddEntries entrySet ns
|
||||||
-- insert entries into local cache copy
|
-- return accumulated QueryResult
|
||||||
pure $ foldr' (
|
pure $ case acc of
|
||||||
addCacheEntryPure now
|
-- once a FOUND as been encountered, return this as a result
|
||||||
) oldCache entriesToInsert
|
isFound@FOUND{} -> isFound
|
||||||
) cacheSnapshot responses
|
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
|
||||||
|
|
||||||
-- check for a FOUND and return it
|
) (FORWARD Set.empty) responses
|
||||||
let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of
|
|
||||||
Just (FOUND ns') -> Just ns'
|
|
||||||
_ -> Nothing
|
|
||||||
) $ responses
|
|
||||||
-- if no FOUND, recursively call lookup again
|
|
||||||
-- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value
|
|
||||||
maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp
|
|
||||||
|
|
||||||
|
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
||||||
sendQueryIdMessage :: NodeID -- ^ target key ID to look up
|
lookupMessage :: Integral i
|
||||||
-> LocalNodeState -- ^ node state of the node doing the query
|
=> NodeID -- ^ target ID
|
||||||
-> Socket -- ^ connected socket to use for sending
|
-> LocalNodeState -- ^ sender node state
|
||||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
-> Maybe i -- ^ optionally provide a different l parameter
|
||||||
sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns)
|
-> (Integer -> FediChordMessage)
|
||||||
where
|
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||||
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
where
|
||||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
|
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
|
||||||
|
|
||||||
|
|
||||||
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
||||||
|
|
|
@ -136,7 +136,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
||||||
-- 1. get routed to placement of own ID until FOUND:
|
-- 1. get routed to placement of own ID until FOUND:
|
||||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock
|
bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock
|
||||||
if bootstrapResponse == Set.empty
|
if bootstrapResponse == Set.empty
|
||||||
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
|
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
|
||||||
else do
|
else do
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
|
|
||||||
module Hash2Pub.FediChordTypes (
|
module Hash2Pub.FediChordTypes (
|
||||||
NodeID -- abstract, but newtype constructors cannot be hidden
|
NodeID -- abstract, but newtype constructors cannot be hidden
|
||||||
|
, idBits
|
||||||
, getNodeID
|
, getNodeID
|
||||||
, toNodeID
|
, toNodeID
|
||||||
, NodeState (..)
|
, NodeState (..)
|
||||||
|
@ -380,7 +381,7 @@ lookupWrapper f fRepeat direction key rmap =
|
||||||
then lookupWrapper fRepeat fRepeat direction newKey rmap
|
then lookupWrapper fRepeat fRepeat direction newKey rmap
|
||||||
else Nothing
|
else Nothing
|
||||||
-- normal entries are returned
|
-- normal entries are returned
|
||||||
Just (_, (KeyEntry entry)) -> Just entry
|
Just (_, KeyEntry entry) -> Just entry
|
||||||
Nothing -> Nothing
|
Nothing -> Nothing
|
||||||
where
|
where
|
||||||
rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool
|
rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool
|
||||||
|
|
Loading…
Reference in a new issue