re-structure convergenceSampleThread to work on a RealNode and iterate over all joined vservers
contributes to #34
This commit is contained in:
parent
33ae904d17
commit
8e8ea41dc4
|
@ -133,7 +133,7 @@ fediChordInit initConf serviceRunner = do
|
||||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||||
-- launch thread attempting to join on new cache entries
|
-- launch thread attempting to join on new cache entries
|
||||||
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
|
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
|
||||||
async (fediMainThreads serverSock firstVSSTM)
|
async (fediMainThreads serverSock realNodeSTM)
|
||||||
)
|
)
|
||||||
(\joinedNS -> do
|
(\joinedNS -> do
|
||||||
-- launch main eventloop with successfully joined state
|
-- launch main eventloop with successfully joined state
|
||||||
|
@ -195,33 +195,36 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
|
||||||
|
|
||||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
||||||
-- Unjoined try joining instead.
|
-- Unjoined try joining instead.
|
||||||
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
||||||
convergenceSampleThread nsSTM = forever $ do
|
convergenceSampleThread nodeSTM = forever $ do
|
||||||
nsSnap <- readTVarIO nsSTM
|
node <- readTVarIO nodeSTM
|
||||||
parentNode <- readTVarIO $ parentRealNode nsSnap
|
forM_ (vservers node) $ \nsSTM -> do
|
||||||
if isJoined nsSnap
|
nsSnap <- readTVarIO nsSTM
|
||||||
then
|
parentNode <- readTVarIO $ parentRealNode nsSnap
|
||||||
runExceptT (do
|
if isJoined nsSnap
|
||||||
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
then
|
||||||
let bss = bootstrapNodes parentNode
|
runExceptT (do
|
||||||
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
||||||
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
let bss = bootstrapNodes parentNode
|
||||||
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
||||||
currentlyResponsible <- liftEither lookupResult
|
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
||||||
if getNid currentlyResponsible /= getNid nsSnap
|
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
||||||
-- if mismatch, stabilise on the result, else do nothing
|
currentlyResponsible <- liftEither lookupResult
|
||||||
then do
|
if getNid currentlyResponsible /= getNid nsSnap
|
||||||
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
-- if mismatch, stabilise on the result, else do nothing
|
||||||
(preds, succs) <- liftEither stabResult
|
then do
|
||||||
-- TODO: verify neighbours before adding, see #55
|
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
||||||
liftIO . atomically $ do
|
(preds, succs) <- liftEither stabResult
|
||||||
ns <- readTVar nsSTM
|
-- TODO: verify neighbours before adding, see #55
|
||||||
writeTVar nsSTM $ addPredecessors preds ns
|
liftIO . atomically $ do
|
||||||
else pure ()
|
ns <- readTVar nsSTM
|
||||||
) >> pure ()
|
writeTVar nsSTM $ addPredecessors preds ns
|
||||||
-- unjoined node: try joining through all bootstrapping nodes
|
else pure ()
|
||||||
else tryBootstrapJoining nsSTM >> pure ()
|
) >> pure ()
|
||||||
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
|
-- unjoined node: try joining through all bootstrapping nodes
|
||||||
|
else tryBootstrapJoining nsSTM >> pure ()
|
||||||
|
|
||||||
|
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node
|
||||||
threadDelay delaySecs
|
threadDelay delaySecs
|
||||||
|
|
||||||
|
|
||||||
|
@ -657,7 +660,7 @@ fediMainThreads sock nodeSTM = do
|
||||||
-- choose option 2 for the sake of limiting concurrency in simulation scenario
|
-- choose option 2 for the sake of limiting concurrency in simulation scenario
|
||||||
concurrently_ (stabiliseThread nodeSTM) $
|
concurrently_ (stabiliseThread nodeSTM) $
|
||||||
concurrently_ (nodeCacheVerifyThread nodeSTM) $
|
concurrently_ (nodeCacheVerifyThread nodeSTM) $
|
||||||
concurrently_ (convergenceSampleThread nsSTM) $
|
concurrently_ (convergenceSampleThread nodeSTM) $
|
||||||
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
||||||
concurrently_
|
concurrently_
|
||||||
(sendThread sock sendQ)
|
(sendThread sock sendQ)
|
||||||
|
|
Loading…
Reference in a new issue