diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 7af7699..83e32d4 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -14,11 +14,13 @@ module Hash2Pub.DHTProtocol , ActionPayload(..) , FediChordMessage(..) , maximumParts - , sendQueryIdMessage + , sendQueryIdMessages , requestQueryID , requestJoin , requestPing , requestStabilise + , lookupMessage + , sendRequestTo , queryIdLookupLoop , queueAddEntries , queueDeleteEntries @@ -30,6 +32,7 @@ module Hash2Pub.DHTProtocol , ackRequest , isPossibleSuccessor , isPossiblePredecessor + , closestCachePredecessors ) where @@ -89,7 +92,7 @@ queryLocalCache ownState nCache lBestNodes targetID | targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries - | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors + | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache where ownID = getNid ownState preds = predecessors ownState @@ -97,18 +100,22 @@ queryLocalCache ownState nCache lBestNodes targetID closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache - closestPredecessors :: Set.Set RemoteCacheEntry - closestPredecessors = closestPredecessor (lBestNodes-1) targetID - closestPredecessor :: (Integral n, Show n) => n -> NodeID -> Set.Set RemoteCacheEntry - closestPredecessor 0 _ = Set.empty - closestPredecessor remainingLookups lastID - | remainingLookups < 0 = Set.empty - | otherwise = - let result = cacheLookupPred lastID nCache - in - case toRemoteCacheEntry <$> result of - Nothing -> Set.empty - Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestPredecessor (remainingLookups-1) (nid ns) + +-- | look up the 3 direct predecessor cache entries of a given ID +closestCachePredecessors :: (Integral n) + => n -- ^ number of entries to look up + -> NodeID -- ^ target ID to get the predecessors of + -> NodeCache -- ^ cache to use for lookup + -> Set.Set RemoteCacheEntry +closestCachePredecessors 0 _ _ = Set.empty +closestCachePredecessors remainingLookups lastID nCache + | remainingLookups < 0 = Set.empty + | otherwise = + let result = cacheLookupPred lastID nCache + in + case toRemoteCacheEntry <$> result of + Nothing -> Set.empty + Just nPred@(RemoteCacheEntry ns ts) -> Set.insert nPred $ closestCachePredecessors (remainingLookups-1) (nid ns) nCache -- | Determines whether a lookup key is within the responsibility slice of a node, -- as it falls between its first predecessor and the node itself. @@ -496,45 +503,65 @@ queryIdLookupLoop cacheSnapshot ns targetID = do case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do + responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- getPOSIXTime + -- check for a FOUND and return it + case responseEntries of + FOUND foundNode -> pure foundNode + -- if not FOUND, insert entries into local cache copy and recurse + FORWARD entrySet -> + let newLCache = foldr' ( + addCacheEntryPure now + ) cacheSnapshot entrySet + in + -- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value + queryIdLookupLoop newLCache ns targetID + + +sendQueryIdMessages :: (Integral i) + => NodeID -- ^ target key ID to look up + -> LocalNodeState -- ^ node state of the node doing the query + -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned + -> [RemoteNodeState] -- ^ nodes to query + -> IO QueryResponse -- ^ accumulated response +sendQueryIdMessages targetID ns lParam targets = do + -- create connected sockets to all query targets and use them for request handling -- ToDo: make attempts and timeout configurable - queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (sendQueryIdMessage targetID ns)) $ remoteNode <$> Set.toList nodeSet + queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close ( + sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads - -- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup + -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing now <- getPOSIXTime - newLCache <- foldM (\oldCache resp -> do - let entriesToInsert = case queryResult <$> payload resp of - Just (FOUND result1) -> [RemoteCacheEntry result1 now] - Just (FORWARD resultset) -> Set.elems resultset - _ -> [] + -- collect cache entries from all responses + foldM (\acc resp -> do + let entrySet = case queryResult <$> payload resp of + Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) + Just (FORWARD resultset) -> resultset + _ -> Set.empty + -- forward entries to global cache - queueAddEntries entriesToInsert ns - -- insert entries into local cache copy - pure $ foldr' ( - addCacheEntryPure now - ) oldCache entriesToInsert - ) cacheSnapshot responses + queueAddEntries entrySet ns + -- return accumulated QueryResult + pure $ case acc of + -- once a FOUND as been encountered, return this as a result + isFound@FOUND{} -> isFound + FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet - -- check for a FOUND and return it - let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of - Just (FOUND ns') -> Just ns' - _ -> Nothing - ) $ responses - -- if no FOUND, recursively call lookup again - -- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value - maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp + ) (FORWARD Set.empty) responses - -sendQueryIdMessage :: NodeID -- ^ target key ID to look up - -> LocalNodeState -- ^ node state of the node doing the query - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns) - where - lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) - pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } +-- | Create a QueryID message to be supplied to 'sendRequestTo' +lookupMessage :: Integral i + => NodeID -- ^ target ID + -> LocalNodeState -- ^ sender node state + -> Maybe i -- ^ optionally provide a different l parameter + -> (Integer -> FediChordMessage) +lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) + where + pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } -- | Send a stabilise request to provided 'RemoteNode' and, if successful, diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index fe7fa83..ce8b5b9 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -136,7 +136,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) = -- 1. get routed to placement of own ID until FOUND: -- Initialise an empty cache only with the responses from a bootstrapping node ns <- readTVarIO nsSTM - bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock + bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock if bootstrapResponse == Set.empty then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." else do @@ -231,10 +231,74 @@ cacheVerifyThread nsSTM = forever $ do ) pong else pure () ) - + + -- check the cache invariant per slice and, if necessary, do a single lookup to the + -- middle of each slice not verifying the invariant + latestNs <- readTVarIO nsSTM + latestCache <- readTVarIO $ nodeCacheSTM latestNs + let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of + FOUND node -> [node] + FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet + forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> + forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle + ) + threadDelay $ toEnum (fromEnum maxEntryAge `div` 20) +-- | Checks the invariant of at least @jEntries@ per cache slice. +-- If this invariant does not hold, the middle of the slice is returned for +-- making lookups to that ID +checkCacheSliceInvariants :: LocalNodeState + -> NodeCache + -> [NodeID] -- ^ list of middle IDs of slices not + -- ^ fulfilling the invariant +checkCacheSliceInvariants ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc + where + jEntries = jEntriesPerSlice ns + lastPred = getNid <$> lastMay (predecessors ns) + lastSucc = getNid <$> lastMay (successors ns) + -- start slice boundary: 1/2 key space + startBound = getNid ns + 2^(idBits - 1) + + checkSuccessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] + checkSuccessorSlice _ _ _ Nothing _ = [] + checkSuccessorSlice j ownID upperBound (Just lastSuccID) cache + | (upperBound `localCompare` lastSuccID) == LT = [] + | otherwise = + let + diff = getNodeID $ upperBound - ownID + lowerBound = ownID + fromInteger (diff `div` 2) + middleID = lowerBound + fromInteger (diff `div` 4) + lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache + in + -- check whether j entries are in the slice + if length lookupResult == jEntries + && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult + && all (\r -> (r `localCompare` upperBound) == LT) lookupResult + then checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache + -- if not enough entries, add the middle of the slice to list + else middleID : checkSuccessorSlice j ownID (lowerBound - 1) (Just lastSuccID) cache + + checkPredecessorSlice :: Integral i => i -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [NodeID] + checkPredecessorSlice _ _ _ Nothing _ = [] + checkPredecessorSlice j ownID lowerBound (Just lastPredID) cache + | (lowerBound `localCompare` lastPredID) == GT = [] + | otherwise = + let + diff = getNodeID $ ownID - lowerBound + upperBound = ownID - fromInteger (diff `div` 2) + middleID = lowerBound + fromInteger (diff `div` 4) + lookupResult = Set.map (getNid . remoteNode) $ closestCachePredecessors jEntries upperBound cache + in + -- check whether j entries are in the slice + if length lookupResult == jEntries + && all (\r -> (r `localCompare` lowerBound) == GT) lookupResult + && all (\r -> (r `localCompare` upperBound) == LT) lookupResult + then checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache + -- if not enough entries, add the middle of the slice to list + else middleID : checkPredecessorSlice j ownID (upperBound + 1) (Just lastPredID) cache + -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 26a13f8..459837f 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -6,6 +6,7 @@ module Hash2Pub.FediChordTypes ( NodeID -- abstract, but newtype constructors cannot be hidden + , idBits , getNodeID , toNodeID , NodeState (..) @@ -380,7 +381,7 @@ lookupWrapper f fRepeat direction key rmap = then lookupWrapper fRepeat fRepeat direction newKey rmap else Nothing -- normal entries are returned - Just (_, (KeyEntry entry)) -> Just entry + Just (_, KeyEntry entry) -> Just entry Nothing -> Nothing where rMapNotEmpty :: (HasKeyID a) => RingMap a -> Bool @@ -563,47 +564,6 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0 parseWithOffset offset word = toInteger word * 2^(8 * offset) - - --- TODO: complete rewrite --- |checks wether the cache entries fulfill the logarithmic EpiChord invariant --- of having j entries per slice, and creates a list of necessary lookup actions. --- Should be invoked periodically. ---checkCacheSlices :: NodeState -> IO [()] ---checkCacheSlices state = case getNodeCache state of --- -- don't do anything on nodes without a cache --- Nothing -> pure [()] --- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache' --- -- TODO: do the same for predecessors --- where --- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state --- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) --- startBound = NodeID 2^(255::Integer) + nid state --- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] --- checkSlice _ _ _ Nothing _ = [] --- checkSlice j ownID upperBound (Just lastSuccNode) cache --- | upperBound < lastSuccNode = [] --- | otherwise = --- -- continuously half the DHT namespace, take the upper part as a slice, --- -- check for existing entries in that slice and create a lookup action --- -- and recursively do this on the lower half. --- -- recursion edge case: all successors/ predecessors need to be in the --- -- first slice. --- let --- diff = getNodeID $ upperBound - ownID --- lowerBound = ownID + NodeID (diff `div` 2) --- in --- -- TODO: replace empty IO actions with actual lookups to middle of slice --- -- TODO: validate ID before adding to cache --- case Map.lookupLT upperBound cache of --- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache --- Just (matchID, _) -> --- if --- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache --- else --- checkSlice j ownID lowerBound (Just lastSuccNode) cache - - -- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages -- persist them on disk so they can be used for all following bootstraps