diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 83e32d4..7af7699 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -14,13 +14,11 @@ module Hash2Pub.DHTProtocol , ActionPayload(..) , FediChordMessage(..) , maximumParts - , sendQueryIdMessages + , sendQueryIdMessage , requestQueryID , requestJoin , requestPing , requestStabilise - , lookupMessage - , sendRequestTo , queryIdLookupLoop , queueAddEntries , queueDeleteEntries @@ -32,7 +30,6 @@ module Hash2Pub.DHTProtocol , ackRequest , isPossibleSuccessor , isPossiblePredecessor - , closestCachePredecessors ) where @@ -92,7 +89,7 @@ queryLocalCache ownState nCache lBestNodes targetID | targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries - | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache + | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors where ownID = getNid ownState preds = predecessors ownState @@ -100,22 +97,18 @@ queryLocalCache ownState nCache lBestNodes targetID closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache - --- | look up the 3 direct predecessor cache entries of a given ID -closestCachePredecessors :: (Integral n) - => n -- ^ number of entries to look up - -> NodeID -- ^ target ID to get the predecessors of - -> NodeCache -- ^ cache to use for lookup - -> Set.Set RemoteCacheEntry -closestCachePredecessors 0 _ _ = Set.empty -closestCachePredecessors remainingLookups lastID nCache - | remainingLookups < 0 = Set.empty - | otherwise = - let result = cacheLookupPred lastID nCache - in - case toRemoteCacheEntry <$> result of - Nothing -> Set.empty - Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache + closestPredecessors :: Set.Set RemoteCacheEntry + closestPredecessors = closestPredecessor (lBestNodes-1) targetID + closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry + closestPredecessor 0 _ = Set.empty + closestPredecessor remainingLookups lastID + | remainingLookups < 0 = Set.empty + | otherwise = + let result = cacheLookupPred lastID nCache + in + case toRemoteCacheEntry <$> result of + Nothing -> Set.empty + Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns) -- | Determines whether a lookup key is within the responsibility slice of a node, -- as it falls between its first predecessor and the node itself. @@ -503,65 +496,45 @@ queryIdLookupLoop cacheSnapshot ns targetID = do case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- getPOSIXTime - -- check for a FOUND and return it - case responseEntries of - FOUND foundNode -> pure foundNode - -- if not FOUND, insert entries into local cache copy and recurse - FORWARD entrySet -> - let newLCache = foldr' ( - addCacheEntryPure now - ) cacheSnapshot entrySet - in - -- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value - queryIdLookupLoop newLCache ns targetID - - -sendQueryIdMessages :: (Integral i) - => NodeID -- ^ target key ID to look up - -> LocalNodeState -- ^ node state of the node doing the query - -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned - -> [RemoteNodeState] -- ^ nodes to query - -> IO QueryResponse -- ^ accumulated response -sendQueryIdMessages targetID ns lParam targets = do - -- create connected sockets to all query targets and use them for request handling -- ToDo: make attempts and timeout configurable - queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) - )) targets + queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (sendQueryIdMessage targetID ns)) $ remoteNode <$> Set.toList nodeSet -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads - -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing + -- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup now <- getPOSIXTime - -- collect cache entries from all responses - foldM (\acc resp -> do - let entrySet = case queryResult <$> payload resp of - Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) - Just (FORWARD resultset) -> resultset - _ -> Set.empty - + newLCache <- foldM (\oldCache resp -> do + let entriesToInsert = case queryResult <$> payload resp of + Just (FOUND result1) -> [RemoteCacheEntry result1 now] + Just (FORWARD resultset) -> Set.elems resultset + _ -> [] -- forward entries to global cache - queueAddEntries entrySet ns - -- return accumulated QueryResult - pure $ case acc of - -- once a FOUND as been encountered, return this as a result - isFound@FOUND{} -> isFound - FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet + queueAddEntries entriesToInsert ns + -- insert entries into local cache copy + pure $ foldr' ( + addCacheEntryPure now + ) oldCache entriesToInsert + ) cacheSnapshot responses - ) (FORWARD Set.empty) responses + -- check for a FOUND and return it + let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of + Just (FOUND ns') -> Just ns' + _ -> Nothing + ) $ responses + -- if no FOUND, recursively call lookup again + -- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value + maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp --- | Create a QueryID message to be supplied to 'sendRequestTo' -lookupMessage :: Integral i - => NodeID -- ^ target ID - -> LocalNodeState -- ^ sender node state - -> Maybe i -- ^ optionally provide a different l parameter - -> (Integer -> FediChordMessage) -lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) - where - pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } + +sendQueryIdMessage :: NodeID -- ^ target key ID to look up + -> LocalNodeState -- ^ node state of the node doing the query + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns) + where + lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) + pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } -- | Send a stabilise request to provided 'RemoteNode' and, if successful, diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index ce8b5b9..fe7fa83 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -136,7 +136,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) = -- 1. get routed to placement of own ID until FOUND: -- Initialise an empty cache only with the responses from a bootstrapping node ns <- readTVarIO nsSTM - bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock + bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock if bootstrapResponse == Set.empty then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." else do @@ -231,74 +231,10 @@ cacheVerifyThread nsSTM = forever $ do ) pong else pure () ) - - -- check the cache invariant per slice and, if necessary, do a single lookup to the - -- middle of each slice not verifying the invariant - latestNs <- readTVarIO nsSTM - latestCache <- readTVarIO $ nodeCacheSTM latestNs - let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of - FOUND node -> [node] - FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet - forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> - forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle - ) - + threadDelay $ toEnum (fromEnum maxEntryAge `div` 20) --- | Checks the invariant of at least @jEntries@ per cache slice. --- If this invariant does not hold, the middle of the slice is returned for --- making lookups to that ID -checkCacheSliceInvariants :: LocalNodeState - -> NodeCache - -> [NodeID] -- ^ list of middle IDs of slices not - -- ^ fulfilling the invariant -checkCacheSliceInvariants ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc - where - jEntries = jEntriesPerSlice ns - lastPred = getNid <$> lastMay (predecessors ns) - lastSucc = getNid <$> lastMay (successors ns) - -- start slice boundary: 1/2 key space - startBound = getNid ns + 2^(idBits - 1) - - checkSuccessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] - checkSuccessorSlice _ _ _ Nothing _ = [] - checkSuccessorSlice j ownID upperBound (Just lastSuccID) cache - | (upperBound `localCompare` lastSuccID) == LT = [] - | otherwise = - let - diff = getNodeID $ upperBound - ownID - lowerBound = ownID + fromInteger (diff `div` 2) - middleID = lowerBound + fromInteger (diff `div` 4) - lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache - in - -- check whether j entries are in the slice - if length lookupResult == jEntries - && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult - && all (\r -> (r `localCompare` upperBound) == LT) lookupResult - then checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache - -- if not enough entries, add the middle of the slice to list - else middleID : checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache - - checkPredecessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] - checkPredecessorSlice _ _ _ Nothing _ = [] - checkPredecessorSlice j ownID lowerBound (Just lastPredID) cache - | (lowerBound `localCompare` lastPredID) == GT = [] - | otherwise = - let - diff = getNodeID $ ownID - lowerBound - upperBound = ownID - fromInteger (diff `div` 2) - middleID = lowerBound + fromInteger (diff `div` 4) - lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache - in - -- check whether j entries are in the slice - if length lookupResult == jEntries - && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult - && all (\r -> (r `localCompare` upperBound) == LT) lookupResult - then checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache - -- if not enough entries, add the middle of the slice to list - else middleID : checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache - -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 459837f..26a13f8 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -6,7 +6,6 @@ module Hash2Pub.FediChordTypes ( NodeID -- abstract, but newtype constructors cannot be hidden - , idBits , getNodeID , toNodeID , NodeState (..) @@ -381,7 +380,7 @@ lookupWrapper f fRepeat direction key rmap = then lookupWrapper fRepeat fRepeat direction newKey rmap else Nothing -- normal entries are returned - Just (_, KeyEntry entry) -> Just entry + Just (_, (KeyEntry entry)) -> Just entry Nothing -> Nothing where rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool @@ -564,6 +563,47 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 parseWithOffset offset word = toInteger word * 2^(8 * offset) + + +-- TODO: complete rewrite +-- |checks wether the cache entries fulfill the logarithmic EpiChord invariant +-- of having j entries per slice, and creates a list of necessary lookup actions. +-- Should be invoked periodically. +--checkCacheSlices :: NodeState -> IO [()] +--checkCacheSlices state = case getNodeCache state of +-- -- don't do anything on nodes without a cache +-- Nothing -> pure [()] +-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache' +-- -- TODO: do the same for predecessors +-- where +-- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state +-- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) +-- startBound = NodeID 2^(255::Integer) + nid state +-- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] +-- checkSlice _ _ _ Nothing _ = [] +-- checkSlice j ownID upperBound (Just lastSuccNode) cache +-- | upperBound < lastSuccNode = [] +-- | otherwise = +-- -- continuously half the DHT namespace, take the upper part as a slice, +-- -- check for existing entries in that slice and create a lookup action +-- -- and recursively do this on the lower half. +-- -- recursion edge case: all successors/ predecessors need to be in the +-- -- first slice. +-- let +-- diff = getNodeID $ upperBound - ownID +-- lowerBound = ownID + NodeID (diff `div` 2) +-- in +-- -- TODO: replace empty IO actions with actual lookups to middle of slice +-- -- TODO: validate ID before adding to cache +-- case Map.lookupLT upperBound cache of +-- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- Just (matchID, _) -> +-- if +-- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- else +-- checkSlice j ownID lowerBound (Just lastSuccNode) cache + + -- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages -- persist them on disk so they can be used for all following bootstraps