diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index bfcdf9e..165ec39 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -486,7 +486,6 @@ queryIdLookupLoop cacheSnapshot ns targetID = do _ -> Nothing ) $ responses -- if no FOUND, recursively call lookup again - -- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp @@ -527,9 +526,6 @@ requestStabilise ns neighbour = do ,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg)) ) ([],[]) respSet - -- update successfully responded neighbour in cache - now <- getPOSIXTime - maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index c425683..db4ac61 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -182,10 +182,6 @@ cacheWriter nsSTM = cacheModifier <- readTQueue $ cacheWriteQueue ns modifyTVar' (nodeCacheSTM ns) cacheModifier - --- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until --- one responds, and get their neighbours for maintaining the own neighbour lists. --- If necessary, request new neighbours. stabiliseThread :: LocalNodeStateSTM -> IO () stabiliseThread nsSTM = forever $ do ns <- readTVarIO nsSTM @@ -217,8 +213,8 @@ stabiliseThread nsSTM = forever $ do newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap writeTVar nsSTM newNs pure newNs - -- delete unresponding nodes from cache as well - mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes + + -- TODO: update successfully stabilised nodes in cache -- try looking up additional neighbours if list too short forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do @@ -228,7 +224,7 @@ stabiliseThread nsSTM = forever $ do latestNs <- readTVar nsSTM writeTVar nsSTM $ addPredecessors [nextEntry] latestNs ) - +-- forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1) @@ -240,14 +236,9 @@ stabiliseThread nsSTM = forever $ do -- TODO: make delay configurable threadDelay (60 * 1000) where - -- | send a stabilise request to the n-th neighbour - -- (specified by the provided getter function) and on failure retr - -- with the n+1-th neighbour. - -- On success, return 2 lists: The failed nodes and the potential neighbours - -- returned by the queried node. - stabiliseClosestResponder :: LocalNodeState -- ^ own node - -> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors - -> Int -- ^ index of neighbour to query + stabiliseClosestResponder :: LocalNodeState + -> (LocalNodeState -> [RemoteNodeState]) + -> Int -> [RemoteNodeState] -- ^ delete accumulator -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours) stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc @@ -270,9 +261,7 @@ stabiliseThread nsSTM = forever $ do currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns - checkReachability :: LocalNodeState -- ^ this node - -> RemoteNodeState -- ^ node to Ping for reachability - -> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one + checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState) checkReachability ns toCheck = do resp <- requestPing ns toCheck pure $ either (const Nothing) (\vss -> @@ -280,6 +269,10 @@ stabiliseThread nsSTM = forever $ do ) resp +-- periodically contact immediate successor and predecessor +-- If they respond, see whether there is a closer neighbour in between +-- If they don't respond, drop them and try the next one + -- | Receives UDP packets and passes them to other threads via the given TQueue. -- Shall be used as the single receiving thread on the server socket, as multiple -- threads blocking on the same socket degrades performance. @@ -307,10 +300,9 @@ fediMainThreads sock nsSTM = do -- all get cancelled concurrently_ (fediMessageHandler sendQ recvQ nsSTM) $ - concurrently_ (stabiliseThread nsSTM) $ - concurrently_ - (sendThread sock sendQ) - (recvThread sock recvQ) + concurrently + (sendThread sock sendQ) + (recvThread sock recvQ) -- defining this here as, for now, the RequestMap is only used by fediMessageHandler.