Compare commits
No commits in common. "c9783a10cf1a3df42dab5691d55e1644bfc5593c" and "25f44f3a4572e2f7cbc6f4d9e5193989d6d21fd1" have entirely different histories.
c9783a10cf
...
25f44f3a45
|
@ -486,7 +486,6 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
|
||||||
_ -> Nothing
|
_ -> Nothing
|
||||||
) $ responses
|
) $ responses
|
||||||
-- if no FOUND, recursively call lookup again
|
-- if no FOUND, recursively call lookup again
|
||||||
-- TODO: this could lead to infinite recursion on an empty cache. Consider returning the node itself as default value
|
|
||||||
maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp
|
maybe (queryIdLookupLoop newLCache ns targetID) pure foundResp
|
||||||
|
|
||||||
|
|
||||||
|
@ -527,9 +526,6 @@ requestStabilise ns neighbour = do
|
||||||
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
|
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
|
||||||
)
|
)
|
||||||
([],[]) respSet
|
([],[]) respSet
|
||||||
-- update successfully responded neighbour in cache
|
|
||||||
now <- getPOSIXTime
|
|
||||||
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
|
|
||||||
pure $ if null responsePreds && null responseSuccs
|
pure $ if null responsePreds && null responseSuccs
|
||||||
then Left "no neighbours returned"
|
then Left "no neighbours returned"
|
||||||
else Right (responsePreds, responseSuccs)
|
else Right (responsePreds, responseSuccs)
|
||||||
|
|
|
@ -182,10 +182,6 @@ cacheWriter nsSTM =
|
||||||
cacheModifier <- readTQueue $ cacheWriteQueue ns
|
cacheModifier <- readTQueue $ cacheWriteQueue ns
|
||||||
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
||||||
|
|
||||||
|
|
||||||
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
|
|
||||||
-- one responds, and get their neighbours for maintaining the own neighbour lists.
|
|
||||||
-- If necessary, request new neighbours.
|
|
||||||
stabiliseThread :: LocalNodeStateSTM -> IO ()
|
stabiliseThread :: LocalNodeStateSTM -> IO ()
|
||||||
stabiliseThread nsSTM = forever $ do
|
stabiliseThread nsSTM = forever $ do
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
|
@ -217,8 +213,8 @@ stabiliseThread nsSTM = forever $ do
|
||||||
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
|
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
|
||||||
writeTVar nsSTM newNs
|
writeTVar nsSTM newNs
|
||||||
pure newNs
|
pure newNs
|
||||||
-- delete unresponding nodes from cache as well
|
|
||||||
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
|
-- TODO: update successfully stabilised nodes in cache
|
||||||
|
|
||||||
-- try looking up additional neighbours if list too short
|
-- try looking up additional neighbours if list too short
|
||||||
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||||
|
@ -228,7 +224,7 @@ stabiliseThread nsSTM = forever $ do
|
||||||
latestNs <- readTVar nsSTM
|
latestNs <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
|
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
|
||||||
)
|
)
|
||||||
|
--
|
||||||
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||||
ns' <- readTVarIO nsSTM
|
ns' <- readTVarIO nsSTM
|
||||||
nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1)
|
nextEntry <- requestQueryID ns' $ succ . getNid $ atDef (toRemoteNodeState ns') (successors ns') (-1)
|
||||||
|
@ -240,14 +236,9 @@ stabiliseThread nsSTM = forever $ do
|
||||||
-- TODO: make delay configurable
|
-- TODO: make delay configurable
|
||||||
threadDelay (60 * 1000)
|
threadDelay (60 * 1000)
|
||||||
where
|
where
|
||||||
-- | send a stabilise request to the n-th neighbour
|
stabiliseClosestResponder :: LocalNodeState
|
||||||
-- (specified by the provided getter function) and on failure retr
|
-> (LocalNodeState -> [RemoteNodeState])
|
||||||
-- with the n+1-th neighbour.
|
-> Int
|
||||||
-- On success, return 2 lists: The failed nodes and the potential neighbours
|
|
||||||
-- returned by the queried node.
|
|
||||||
stabiliseClosestResponder :: LocalNodeState -- ^ own node
|
|
||||||
-> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
|
|
||||||
-> Int -- ^ index of neighbour to query
|
|
||||||
-> [RemoteNodeState] -- ^ delete accumulator
|
-> [RemoteNodeState] -- ^ delete accumulator
|
||||||
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
|
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
|
||||||
stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc
|
stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc
|
||||||
|
@ -270,9 +261,7 @@ stabiliseThread nsSTM = forever $ do
|
||||||
|
|
||||||
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
|
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
|
||||||
|
|
||||||
checkReachability :: LocalNodeState -- ^ this node
|
checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState)
|
||||||
-> RemoteNodeState -- ^ node to Ping for reachability
|
|
||||||
-> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one
|
|
||||||
checkReachability ns toCheck = do
|
checkReachability ns toCheck = do
|
||||||
resp <- requestPing ns toCheck
|
resp <- requestPing ns toCheck
|
||||||
pure $ either (const Nothing) (\vss ->
|
pure $ either (const Nothing) (\vss ->
|
||||||
|
@ -280,6 +269,10 @@ stabiliseThread nsSTM = forever $ do
|
||||||
) resp
|
) resp
|
||||||
|
|
||||||
|
|
||||||
|
-- periodically contact immediate successor and predecessor
|
||||||
|
-- If they respond, see whether there is a closer neighbour in between
|
||||||
|
-- If they don't respond, drop them and try the next one
|
||||||
|
|
||||||
-- | Receives UDP packets and passes them to other threads via the given TQueue.
|
-- | Receives UDP packets and passes them to other threads via the given TQueue.
|
||||||
-- Shall be used as the single receiving thread on the server socket, as multiple
|
-- Shall be used as the single receiving thread on the server socket, as multiple
|
||||||
-- threads blocking on the same socket degrades performance.
|
-- threads blocking on the same socket degrades performance.
|
||||||
|
@ -307,8 +300,7 @@ fediMainThreads sock nsSTM = do
|
||||||
-- all get cancelled
|
-- all get cancelled
|
||||||
concurrently_
|
concurrently_
|
||||||
(fediMessageHandler sendQ recvQ nsSTM) $
|
(fediMessageHandler sendQ recvQ nsSTM) $
|
||||||
concurrently_ (stabiliseThread nsSTM) $
|
concurrently
|
||||||
concurrently_
|
|
||||||
(sendThread sock sendQ)
|
(sendThread sock sendQ)
|
||||||
(recvThread sock recvQ)
|
(recvThread sock recvQ)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue