diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 9565bae..307dc51 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -133,7 +133,7 @@ fediChordInit initConf serviceRunner = do putStrLn $ err <> " Error joining, start listening for incoming requests anyways" -- launch thread attempting to join on new cache entries _ <- forkIO $ joinOnNewEntriesThread firstVSSTM - async (fediMainThreads serverSock firstVSSTM) + async (fediMainThreads serverSock realNodeSTM) ) (\joinedNS -> do -- launch main eventloop with successfully joined state @@ -195,33 +195,36 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () -convergenceSampleThread nsSTM = forever $ do - nsSnap <- readTVarIO nsSTM - parentNode <- readTVarIO $ parentRealNode nsSnap - if isJoined nsSnap - then - runExceptT (do - -- joined node: choose random node, do queryIDLoop, compare result with own responsibility - let bss = bootstrapNodes parentNode - randIndex <- liftIO $ randomRIO (0, length bss - 1) - chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) - currentlyResponsible <- liftEither lookupResult - if getNid currentlyResponsible /= getNid nsSnap - -- if mismatch, stabilise on the result, else do nothing - then do - stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible - (preds, succs) <- liftEither stabResult - -- TODO: verify neighbours before adding, see #55 - liftIO . atomically $ do - ns <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors preds ns - else pure () - ) >> pure () - -- unjoined node: try joining through all bootstrapping nodes - else tryBootstrapJoining nsSTM >> pure () - let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode +convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () +convergenceSampleThread nodeSTM = forever $ do + node <- readTVarIO nodeSTM + forM_ (vservers node) $ \nsSTM -> do + nsSnap <- readTVarIO nsSTM + parentNode <- readTVarIO $ parentRealNode nsSnap + if isJoined nsSnap + then + runExceptT (do + -- joined node: choose random node, do queryIDLoop, compare result with own responsibility + let bss = bootstrapNodes parentNode + randIndex <- liftIO $ randomRIO (0, length bss - 1) + chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex + lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) + currentlyResponsible <- liftEither lookupResult + if getNid currentlyResponsible /= getNid nsSnap + -- if mismatch, stabilise on the result, else do nothing + then do + stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible + (preds, succs) <- liftEither stabResult + -- TODO: verify neighbours before adding, see #55 + liftIO . atomically $ do + ns <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors preds ns + else pure () + ) >> pure () + -- unjoined node: try joining through all bootstrapping nodes + else tryBootstrapJoining nsSTM >> pure () + + let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node threadDelay delaySecs @@ -657,7 +660,7 @@ fediMainThreads sock nodeSTM = do -- choose option 2 for the sake of limiting concurrency in simulation scenario concurrently_ (stabiliseThread nodeSTM) $ concurrently_ (nodeCacheVerifyThread nodeSTM) $ - concurrently_ (convergenceSampleThread nsSTM) $ + concurrently_ (convergenceSampleThread nodeSTM) $ concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ concurrently_ (sendThread sock sendQ)