Compare commits

...

3 commits

Author SHA1 Message Date
Trolli Schmittlauch c9783a10cf launch stabilise thread
closes #44

although stabilise functionality is still untested
2020-06-24 02:51:39 +02:00
Trolli Schmittlauch 81e346db4e update responding neighbours and delete unresponding ones from cache
contributes to #44
2020-06-24 02:48:41 +02:00
Trolli Schmittlauch 16b46a8b0b add some comments on stabilise 2020-06-24 02:12:21 +02:00
2 changed files with 26 additions and 14 deletions

View file

@ -486,6 +486,7 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
_ -> Nothing _ -> Nothing
) $ responses ) $ responses
-- if no FOUND, recursively call lookup again -- if no FOUND, recursively call lookup again
-- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value
maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp
@ -526,6 +527,9 @@ requestStabilise ns neighbour = do
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg)) ,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache
now <- getPOSIXTime
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)

View file

@ -182,6 +182,10 @@ cacheWriter nsSTM =
cacheModifier <- readTQueue $ cacheWriteQueue ns cacheModifier <- readTQueue $ cacheWriteQueue ns
modifyTVar' (nodeCacheSTM ns) cacheModifier modifyTVar' (nodeCacheSTM ns) cacheModifier
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours.
stabiliseThread :: LocalNodeStateSTM -> IO () stabiliseThread :: LocalNodeStateSTM -> IO ()
stabiliseThread nsSTM = forever $ do stabiliseThread nsSTM = forever $ do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
@ -213,8 +217,8 @@ stabiliseThread nsSTM = forever $ do
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
writeTVar nsSTM newNs writeTVar nsSTM newNs
pure newNs pure newNs
-- delete unresponding nodes from cache as well
-- TODO: update successfully stabilised nodes in cache mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
-- try looking up additional neighbours if list too short -- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
@ -224,7 +228,7 @@ stabiliseThread nsSTM = forever $ do
latestNs <- readTVar nsSTM latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
) )
--
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1) nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1)
@ -236,9 +240,14 @@ stabiliseThread nsSTM = forever $ do
-- TODO: make delay configurable -- TODO: make delay configurable
threadDelay (60 * 1000) threadDelay (60 * 1000)
where where
stabiliseClosestResponder :: LocalNodeState -- | send a stabilise request to the n-th neighbour
-> (LocalNodeState -> [RemoteNodeState]) -- (specified by the provided getter function) and on failure retr
-> Int -- with the n+1-th neighbour.
-- On success, return 2 lists: The failed nodes and the potential neighbours
-- returned by the queried node.
stabiliseClosestResponder :: LocalNodeState -- ^ own node
-> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
-> Int -- ^ index of neighbour to query
-> [RemoteNodeState] -- ^ delete accumulator -> [RemoteNodeState] -- ^ delete accumulator
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours) -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc
@ -261,7 +270,9 @@ stabiliseThread nsSTM = forever $ do
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState) checkReachability :: LocalNodeState -- ^ this node
-> RemoteNodeState -- ^ node to Ping for reachability
-> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one
checkReachability ns toCheck = do checkReachability ns toCheck = do
resp <- requestPing ns toCheck resp <- requestPing ns toCheck
pure $ either (const Nothing) (\vss -> pure $ either (const Nothing) (\vss ->
@ -269,10 +280,6 @@ stabiliseThread nsSTM = forever $ do
) resp ) resp
-- periodically contact immediate successor and predecessor
-- If they respond, see whether there is a closer neighbour in between
-- If they don't respond, drop them and try the next one
-- | Receives UDP packets and passes them to other threads via the given TQueue. -- | Receives UDP packets and passes them to other threads via the given TQueue.
-- Shall be used as the single receiving thread on the server socket, as multiple -- Shall be used as the single receiving thread on the server socket, as multiple
-- threads blocking on the same socket degrades performance. -- threads blocking on the same socket degrades performance.
@ -300,9 +307,10 @@ fediMainThreads sock nsSTM = do
-- all get cancelled -- all get cancelled
concurrently_ concurrently_
(fediMessageHandler sendQ recvQ nsSTM) $ (fediMessageHandler sendQ recvQ nsSTM) $
concurrently concurrently_ (stabiliseThread nsSTM) $
(sendThread sock sendQ) concurrently_
(recvThread sock recvQ) (sendThread sock sendQ)
(recvThread sock recvQ)
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler. -- defining this here as, for now, the RequestMap is only used by fediMessageHandler.