Compare commits

...

3 commits

Author SHA1 Message Date
Trolli Schmittlauch c9783a10cf launch stabilise thread
closes #44

although stabilise functionality is still untested
2020-06-24 02:51:39 +02:00
Trolli Schmittlauch 81e346db4e update responding neighbours and delete unresponding ones from cache
contributes to #44
2020-06-24 02:48:41 +02:00
Trolli Schmittlauch 16b46a8b0b add some comments on stabilise 2020-06-24 02:12:21 +02:00
2 changed files with 26 additions and 14 deletions

View file

@ -486,6 +486,7 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
_ -> Nothing
) $ responses
-- if no FOUND, recursively call lookup again
-- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value
maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp
@ -526,6 +527,9 @@ requestStabilise ns neighbour = do
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
)
([],[]) respSet
-- update successfully responded neighbour in cache
now <- getPOSIXTime
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)

View file

@ -182,6 +182,10 @@ cacheWriter nsSTM =
cacheModifier <- readTQueue $ cacheWriteQueue ns
modifyTVar' (nodeCacheSTM ns) cacheModifier
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours.
stabiliseThread :: LocalNodeStateSTM -> IO ()
stabiliseThread nsSTM = forever $ do
ns <- readTVarIO nsSTM
@ -213,8 +217,8 @@ stabiliseThread nsSTM = forever $ do
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
writeTVar nsSTM newNs
pure newNs
-- TODO: update successfully stabilised nodes in cache
-- delete unresponding nodes from cache as well
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
-- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
@ -224,7 +228,7 @@ stabiliseThread nsSTM = forever $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
)
--
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1)
@ -236,9 +240,14 @@ stabiliseThread nsSTM = forever $ do
-- TODO: make delay configurable
threadDelay (60 * 1000)
where
stabiliseClosestResponder :: LocalNodeState
-> (LocalNodeState -> [RemoteNodeState])
-> Int
-- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retr
-- with the n+1-th neighbour.
-- On success, return 2 lists: The failed nodes and the potential neighbours
-- returned by the queried node.
stabiliseClosestResponder :: LocalNodeState -- ^ own node
-> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
-> Int -- ^ index of neighbour to query
-> [RemoteNodeState] -- ^ delete accumulator
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc
@ -261,7 +270,9 @@ stabiliseThread nsSTM = forever $ do
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState)
checkReachability :: LocalNodeState -- ^ this node
-> RemoteNodeState -- ^ node to Ping for reachability
-> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one
checkReachability ns toCheck = do
resp <- requestPing ns toCheck
pure $ either (const Nothing) (\vss ->
@ -269,10 +280,6 @@ stabiliseThread nsSTM = forever $ do
) resp
-- periodically contact immediate successor and predecessor
-- If they respond, see whether there is a closer neighbour in between
-- If they don't respond, drop them and try the next one
-- | Receives UDP packets and passes them to other threads via the given TQueue.
-- Shall be used as the single receiving thread on the server socket, as multiple
-- threads blocking on the same socket degrades performance.
@ -300,7 +307,8 @@ fediMainThreads sock nsSTM = do
-- all get cancelled
concurrently_
(fediMessageHandler sendQ recvQ nsSTM) $
concurrently
concurrently_ (stabiliseThread nsSTM) $
concurrently_
(sendThread sock sendQ)
(recvThread sock recvQ)