From 280d928ad7c40d4949a7173f7229b18f6a5893a0 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 27 Jun 2020 15:57:54 +0200 Subject: [PATCH] Refactor requestQueryID to be able to send a single request in preparation for #30 --- src/Hash2Pub/DHTProtocol.hs | 115 ++++++++++++++++++++------------- src/Hash2Pub/FediChord.hs | 2 +- src/Hash2Pub/FediChordTypes.hs | 3 +- 3 files changed, 74 insertions(+), 46 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 7af7699..83e32d4 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -14,11 +14,13 @@ module Hash2Pub.DHTProtocol , ActionPayload(..) , FediChordMessage(..) , maximumParts - , sendQueryIdMessage + , sendQueryIdMessages , requestQueryID , requestJoin , requestPing , requestStabilise + , lookupMessage + , sendRequestTo , queryIdLookupLoop , queueAddEntries , queueDeleteEntries @@ -30,6 +32,7 @@ module Hash2Pub.DHTProtocol , ackRequest , isPossibleSuccessor , isPossiblePredecessor + , closestCachePredecessors ) where @@ -89,7 +92,7 @@ queryLocalCache ownState nCache lBestNodes targetID | targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries - | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors + | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache where ownID = getNid ownState preds = predecessors ownState @@ -97,18 +100,22 @@ queryLocalCache ownState nCache lBestNodes targetID closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache - closestPredecessors :: Set.Set RemoteCacheEntry - closestPredecessors = closestPredecessor (lBestNodes-1) targetID - closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry - closestPredecessor 0 _ = Set.empty - closestPredecessor remainingLookups lastID - | remainingLookups < 0 = Set.empty - | otherwise = - let result = cacheLookupPred lastID nCache - in - case toRemoteCacheEntry <$> result of - Nothing -> Set.empty - Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns) + +-- | look up the 3 direct predecessor cache entries of a given ID +closestCachePredecessors :: (Integral n) + => n -- ^ number of entries to look up + -> NodeID -- ^ target ID to get the predecessors of + -> NodeCache -- ^ cache to use for lookup + -> Set.Set RemoteCacheEntry +closestCachePredecessors 0 _ _ = Set.empty +closestCachePredecessors remainingLookups lastID nCache + | remainingLookups < 0 = Set.empty + | otherwise = + let result = cacheLookupPred lastID nCache + in + case toRemoteCacheEntry <$> result of + Nothing -> Set.empty + Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache -- | Determines whether a lookup key is within the responsibility slice of a node, -- as it falls between its first predecessor and the node itself. @@ -496,45 +503,65 @@ queryIdLookupLoop cacheSnapshot ns targetID = do case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do + responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- getPOSIXTime + -- check for a FOUND and return it + case responseEntries of + FOUND foundNode -> pure foundNode + -- if not FOUND, insert entries into local cache copy and recurse + FORWARD entrySet -> + let newLCache = foldr' ( + addCacheEntryPure now + ) cacheSnapshot entrySet + in + -- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value + queryIdLookupLoop newLCache ns targetID + + +sendQueryIdMessages :: (Integral i) + => NodeID -- ^ target key ID to look up + -> LocalNodeState -- ^ node state of the node doing the query + -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned + -> [RemoteNodeState] -- ^ nodes to query + -> IO QueryResponse -- ^ accumulated response +sendQueryIdMessages targetID ns lParam targets = do + -- create connected sockets to all query targets and use them for request handling -- ToDo: make attempts and timeout configurable - queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (sendQueryIdMessage targetID ns)) $ remoteNode <$> Set.toList nodeSet + queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close ( + sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads - -- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup + -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing now <- getPOSIXTime - newLCache <- foldM (\oldCache resp -> do - let entriesToInsert = case queryResult <$> payload resp of - Just (FOUND result1) -> [RemoteCacheEntry result1 now] - Just (FORWARD resultset) -> Set.elems resultset - _ -> [] + -- collect cache entries from all responses + foldM (\acc resp -> do + let entrySet = case queryResult <$> payload resp of + Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) + Just (FORWARD resultset) -> resultset + _ -> Set.empty + -- forward entries to global cache - queueAddEntries entriesToInsert ns - -- insert entries into local cache copy - pure $ foldr' ( - addCacheEntryPure now - ) oldCache entriesToInsert - ) cacheSnapshot responses + queueAddEntries entrySet ns + -- return accumulated QueryResult + pure $ case acc of + -- once a FOUND as been encountered, return this as a result + isFound@FOUND{} -> isFound + FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet - -- check for a FOUND and return it - let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of - Just (FOUND ns') -> Just ns' - _ -> Nothing - ) $ responses - -- if no FOUND, recursively call lookup again - -- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value - maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp + ) (FORWARD Set.empty) responses - -sendQueryIdMessage :: NodeID -- ^ target key ID to look up - -> LocalNodeState -- ^ node state of the node doing the query - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns) - where - lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) - pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } +-- | Create a QueryID message to be supplied to 'sendRequestTo' +lookupMessage :: Integral i + => NodeID -- ^ target ID + -> LocalNodeState -- ^ sender node state + -> Maybe i -- ^ optionally provide a different l parameter + -> (Integer -> FediChordMessage) +lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) + where + pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } -- | Send a stabilise request to provided 'RemoteNode' and, if successful, diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index fe7fa83..021b94d 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -136,7 +136,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) = -- 1. get routed to placement of own ID until FOUND: -- Initialise an empty cache only with the responses from a bootstrapping node ns <- readTVarIO nsSTM - bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock + bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock if bootstrapResponse == Set.empty then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." else do diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 26a13f8..7153aec 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -6,6 +6,7 @@ module Hash2Pub.FediChordTypes ( NodeID -- abstract, but newtype constructors cannot be hidden + , idBits , getNodeID , toNodeID , NodeState (..) @@ -380,7 +381,7 @@ lookupWrapper f fRepeat direction key rmap = then lookupWrapper fRepeat fRepeat direction newKey rmap else Nothing -- normal entries are returned - Just (_, (KeyEntry entry)) -> Just entry + Just (_, KeyEntry entry) -> Just entry Nothing -> Nothing where rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool