re-structure cacheVerifyThread to work on a RealNode and iterate over all joined vservers
contributes to #34
This commit is contained in:
parent
68de73d919
commit
33ae904d17
|
@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||||
Nothing -> pure ()
|
Nothing -> pure ()
|
||||||
Just aPart -> do
|
Just aPart -> do
|
||||||
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
|
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
|
||||||
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
|
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
|
||||||
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
||||||
maybe (pure ()) (
|
maybe (pure ()) (
|
||||||
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
||||||
|
@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do
|
||||||
writeTVar ownStateSTM newState
|
writeTVar ownStateSTM newState
|
||||||
pure (cacheInsertQ, newState)
|
pure (cacheInsertQ, newState)
|
||||||
-- execute the cache insertions
|
-- execute the cache insertions
|
||||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
|
||||||
if responses == Set.empty
|
if responses == Set.empty
|
||||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||||
else do
|
else do
|
||||||
|
@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do
|
||||||
_ -> Set.empty
|
_ -> Set.empty
|
||||||
|
|
||||||
-- forward entries to global cache
|
-- forward entries to global cache
|
||||||
queueAddEntries entrySet ns
|
queueAddEntries entrySet (cacheWriteQueue ns)
|
||||||
-- return accumulated QueryResult
|
-- return accumulated QueryResult
|
||||||
pure $ case acc of
|
pure $ case acc of
|
||||||
-- once a FOUND as been encountered, return this as a result
|
-- once a FOUND as been encountered, return this as a result
|
||||||
|
@ -670,7 +670,7 @@ requestStabilise ns neighbour = do
|
||||||
)
|
)
|
||||||
([],[]) respSet
|
([],[]) respSet
|
||||||
-- update successfully responded neighbour in cache
|
-- update successfully responded neighbour in cache
|
||||||
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
|
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
|
||||||
pure $ if null responsePreds && null responseSuccs
|
pure $ if null responsePreds && null responseSuccs
|
||||||
then Left "no neighbours returned"
|
then Left "no neighbours returned"
|
||||||
else Right (responsePreds, responseSuccs)
|
else Right (responsePreds, responseSuccs)
|
||||||
|
@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
|
|
||||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||||
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
||||||
-> LocalNodeState s
|
-> TQueue (NodeCache -> NodeCache)
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueAddEntries entries ns = do
|
queueAddEntries entries cacheQ = do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
|
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry
|
||||||
|
|
||||||
|
|
||||||
-- | enque a list of node IDs to be deleted from the global NodeCache
|
-- | enque a list of node IDs to be deleted from the global NodeCache
|
||||||
queueDeleteEntries :: Foldable c
|
queueDeleteEntries :: Foldable c
|
||||||
=> c NodeID
|
=> c NodeID
|
||||||
-> LocalNodeState s
|
-> TQueue (NodeCache -> NodeCache)
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
|
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
|
||||||
|
|
||||||
|
|
||||||
-- | enque a single node ID to be deleted from the global NodeCache
|
-- | enque a single node ID to be deleted from the global NodeCache
|
||||||
queueDeleteEntry :: NodeID
|
queueDeleteEntry :: NodeID
|
||||||
-> LocalNodeState s
|
-> TQueue (NodeCache -> NodeCache)
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||||
|
|
||||||
|
@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||||
-- global 'NodeCache'.
|
-- global 'NodeCache'.
|
||||||
queueUpdateVerifieds :: Foldable c
|
queueUpdateVerifieds :: Foldable c
|
||||||
=> c NodeID
|
=> c NodeID
|
||||||
-> LocalNodeState s
|
-> TQueue (NodeCache -> NodeCache)
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueUpdateVerifieds nIds ns = do
|
queueUpdateVerifieds nIds cacheQ = do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
|
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
|
||||||
markCacheEntryAsVerified (Just now) nid'
|
markCacheEntryAsVerified (Just now) nid'
|
||||||
|
|
||||||
-- | retry an IO action at most *i* times until it delivers a result
|
-- | retry an IO action at most *i* times until it delivers a result
|
||||||
|
|
|
@ -366,51 +366,63 @@ nodeCacheWriter nodeSTM = do
|
||||||
|
|
||||||
|
|
||||||
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
||||||
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
|
nodeCacheVerifyThread :: RealNodeSTM s -> IO ()
|
||||||
nodeCacheVerifyThread nsSTM = forever $ do
|
nodeCacheVerifyThread nodeSTM = forever $ do
|
||||||
-- get cache
|
(node, firstVSSTM) <- atomically $ do
|
||||||
(ns, cache, maxEntryAge) <- atomically $ do
|
node <- readTVar nodeSTM
|
||||||
ns <- readTVar nsSTM
|
case headMay (HMap.elems $ vservers node) of
|
||||||
cache <- readTVar $ nodeCacheSTM ns
|
-- wait until first VS is joined
|
||||||
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
|
Nothing -> retry
|
||||||
pure (ns, cache, maxEntryAge)
|
Just vs' -> pure (node, vs')
|
||||||
|
let
|
||||||
|
maxEntryAge = confMaxNodeCacheAge $ nodeConfig node
|
||||||
|
cacheQ = globalCacheWriteQueue node
|
||||||
|
cache <- readTVarIO $ globalNodeCacheSTM node
|
||||||
|
-- always use the first active VS as a sender for operations like Ping
|
||||||
|
firstVS <- readTVarIO firstVSSTM
|
||||||
-- iterate entries:
|
-- iterate entries:
|
||||||
-- for avoiding too many time syscalls, get current time before iterating.
|
-- for avoiding too many time syscalls, get current time before iterating.
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) ->
|
forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) ->
|
||||||
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
|
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
|
||||||
if (now - ts) > maxEntryAge
|
if (now - ts) > maxEntryAge
|
||||||
then
|
then
|
||||||
queueDeleteEntry (getNid node) ns
|
queueDeleteEntry (getNid cacheNode) cacheQ
|
||||||
-- case unverified: try verifying, otherwise delete
|
-- case unverified: try verifying, otherwise delete
|
||||||
else if not validated
|
else if not validated
|
||||||
then do
|
then do
|
||||||
-- marking as verified is done by 'requestPing' as well
|
-- marking as verified is done by 'requestPing' as well
|
||||||
pong <- requestPing ns node
|
pong <- requestPing firstVS cacheNode
|
||||||
either (\_->
|
either (\_->
|
||||||
queueDeleteEntry (getNid node) ns
|
queueDeleteEntry (getNid cacheNode) cacheQ
|
||||||
)
|
)
|
||||||
(\vss ->
|
(\vss ->
|
||||||
if node `notElem` vss
|
if cacheNode `notElem` vss
|
||||||
then queueDeleteEntry (getNid node) ns
|
then queueDeleteEntry (getNid cacheNode) firstVS
|
||||||
-- after verifying a node, check whether it can be a closer neighbour
|
-- after verifying a node, check whether it can be a closer neighbour
|
||||||
else do
|
-- do this for each node
|
||||||
if node `isPossiblePredecessor` ns
|
-- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor
|
||||||
|
else forM_ (vservers node) (\nsSTM -> do
|
||||||
|
ns <- readTVarIO nsSTM
|
||||||
|
if cacheNode `isPossiblePredecessor` ns
|
||||||
then atomically $ do
|
then atomically $ do
|
||||||
ns' <- readTVar nsSTM
|
ns' <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addPredecessors [node] ns'
|
writeTVar nsSTM $ addPredecessors [cacheNode] ns'
|
||||||
else pure ()
|
else pure ()
|
||||||
if node `isPossibleSuccessor` ns
|
if cacheNode `isPossibleSuccessor` ns
|
||||||
then atomically $ do
|
then atomically $ do
|
||||||
ns' <- readTVar nsSTM
|
ns' <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addSuccessors [node] ns'
|
writeTVar nsSTM $ addSuccessors [cacheNode] ns'
|
||||||
else pure ()
|
else pure ()
|
||||||
|
)
|
||||||
) pong
|
) pong
|
||||||
else pure ()
|
else pure ()
|
||||||
)
|
)
|
||||||
|
|
||||||
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
||||||
-- middle of each slice not verifying the invariant
|
-- middle of each slice not verifying the invariant
|
||||||
|
latestNode <- readTVarIO nodeSTM
|
||||||
|
forM_ (vservers latestNode) (\nsSTM -> do
|
||||||
latestNs <- readTVarIO nsSTM
|
latestNs <- readTVarIO nsSTM
|
||||||
latestCache <- readTVarIO $ nodeCacheSTM latestNs
|
latestCache <- readTVarIO $ nodeCacheSTM latestNs
|
||||||
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
||||||
|
@ -419,6 +431,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
|
||||||
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
||||||
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
|
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
|
||||||
|
|
||||||
|
@ -482,8 +495,10 @@ checkCacheSliceInvariants ns
|
||||||
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
|
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
|
||||||
-- one responds, and get their neighbours for maintaining the own neighbour lists.
|
-- one responds, and get their neighbours for maintaining the own neighbour lists.
|
||||||
-- If necessary, request new neighbours.
|
-- If necessary, request new neighbours.
|
||||||
stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
||||||
stabiliseThread nsSTM = forever $ do
|
stabiliseThread nodeSTM = forever $ do
|
||||||
|
node <- readTVarIO nodeSTM
|
||||||
|
forM_ (vservers node) (\nsSTM -> do
|
||||||
oldNs <- readTVarIO nsSTM
|
oldNs <- readTVarIO nsSTM
|
||||||
|
|
||||||
|
|
||||||
|
@ -564,8 +579,9 @@ stabiliseThread nsSTM = forever $ do
|
||||||
)
|
)
|
||||||
newPredecessor
|
newPredecessor
|
||||||
|
|
||||||
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
|
)
|
||||||
threadDelay stabiliseDelay
|
|
||||||
|
threadDelay . confStabiliseInterval . nodeConfig $ node
|
||||||
where
|
where
|
||||||
-- | send a stabilise request to the n-th neighbour
|
-- | send a stabilise request to the n-th neighbour
|
||||||
-- (specified by the provided getter function) and on failure retry
|
-- (specified by the provided getter function) and on failure retry
|
||||||
|
@ -636,8 +652,11 @@ fediMainThreads sock nodeSTM = do
|
||||||
-- all get cancelled
|
-- all get cancelled
|
||||||
concurrently_
|
concurrently_
|
||||||
(fediMessageHandler sendQ recvQ nodeSTM) $
|
(fediMessageHandler sendQ recvQ nodeSTM) $
|
||||||
concurrently_ (stabiliseThread nsSTM) $
|
-- decision whether to [1] launch 1 thread per VS or [2] let a single
|
||||||
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
-- thread process all VSes sequentially:
|
||||||
|
-- choose option 2 for the sake of limiting concurrency in simulation scenario
|
||||||
|
concurrently_ (stabiliseThread nodeSTM) $
|
||||||
|
concurrently_ (nodeCacheVerifyThread nodeSTM) $
|
||||||
concurrently_ (convergenceSampleThread nsSTM) $
|
concurrently_ (convergenceSampleThread nsSTM) $
|
||||||
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
||||||
concurrently_
|
concurrently_
|
||||||
|
|
Loading…
Reference in a new issue