From 1a0de55b8c6a8a9da4838866155c93d9a9cecbe7 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 25 Sep 2020 20:40:45 +0200 Subject: [PATCH] integrate k-choices into `tryBootstrapJoin` flow part of #2 --- src/Hash2Pub/FediChord.hs | 91 ++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 45 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index ec970cb..befa8ce 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -123,34 +123,21 @@ fediChordInit initConf serviceRunner = do -- prepare for joining: start node cache writer thread -- currently no masking is necessary, as there is nothing to clean up nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM - fediThreadsAsync <- if confEnableKChoices initConf - then - -- TODO: k-choices way of joining - -- placeholder - runExceptT (kChoicesNodeJoin realNodeSTM ("foo", fromIntegral 3)) - >> async (fediMainThreads serverSock realNodeSTM) - else do - -- without k-choices, just initialise a single vserver - firstVS <- nodeStateInit realNodeSTM 0 - firstVSSTM <- newTVarIO firstVS - -- add vserver to list at RealNode - atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) } - -- try joining the DHT using one of the provided bootstrapping nodes - joinedState <- tryBootstrapJoining firstVSSTM - - either (\err -> do - -- handle unsuccessful join - putStrLn $ err <> " Error joining, start listening for incoming requests anyways" - -- launch thread attempting to join on new cache entries - _ <- forkIO $ joinOnNewEntriesThread firstVSSTM - async (fediMainThreads serverSock realNodeSTM) - ) - (\joinedNS -> do - -- launch main eventloop with successfully joined state - putStrLn "successful join" - async (fediMainThreads serverSock realNodeSTM) - ) - joinedState + fediThreadsAsync <- do + either (\err -> do + -- handle unsuccessful join + putStrLn $ err <> " Error joining, start listening for incoming requests anyways" + -- launch thread attempting to join on new cache entries + -- TODO: adjust joinOnNewEntriesThread to k-choices + --_ <- forkIO $ joinOnNewEntriesThread firstVSSTM + async (fediMainThreads serverSock realNodeSTM) + ) + (\_ -> do + -- launch main eventloop with successfully joined state + putStrLn "successful join" + async (fediMainThreads serverSock realNodeSTM) + ) + =<< tryBootstrapJoining realNodeSTM pure (fediThreadsAsync, realNodeSTM) @@ -318,7 +305,7 @@ kChoicesJoinCost remainingOwnLoad ownCap segment = fediChordBootstrapJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -- ^ the local 'NodeState' -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a + -> IO (Either String ()) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordBootstrapJoin nsSTM bootstrapNode = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined @@ -330,10 +317,10 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node liftIO $ putStrLn "send a bootstrap Join" - joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM - liftEither joinResult + _ <- liftEither =<< liftIO (requestJoin currentlyResponsible nsSTM) + pure () --- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. +-- Periodically lookup own IDs through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () convergenceSampleThread nodeSTM = forever $ do @@ -362,27 +349,41 @@ convergenceSampleThread nodeSTM = forever $ do else pure () ) >> pure () -- unjoined node: try joining through all bootstrapping nodes - else tryBootstrapJoining nsSTM >> pure () + else tryBootstrapJoining nodeSTM >> pure () let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node threadDelay delaySecs -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) -tryBootstrapJoining nsSTM = do - bss <- atomically $ do - nsSnap <- readTVar nsSTM - realNodeSnap <- readTVar $ parentRealNode nsSnap - pure $ bootstrapNodes realNodeSnap - tryJoining bss +tryBootstrapJoining :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (Either String ()) +tryBootstrapJoining nodeSTM = do + node <- readTVarIO nodeSTM + let + bss = bootstrapNodes node + conf = nodeConfig node + if confEnableKChoices conf + then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM + else do + firstVS <- nodeStateInit nodeSTM 0 + firstVSSTM <- newTVarIO firstVS + joinResult <- tryJoining bss (fediChordBootstrapJoin firstVSSTM) + either + (pure . Left) + (\_ -> fmap Right . atomically . modifyTVar' nodeSTM $ (\rn -> rn + { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) } + ) + ) + (joinResult :: Either String ()) + where - tryJoining (bn:bns) = do - j <- fediChordBootstrapJoin nsSTM bn + tryJoining :: [(String, PortNumber)] -> ((String, PortNumber) -> IO (Either String ())) -> IO (Either String ()) + tryJoining (bn:bns) joinFunc = do + j <- joinFunc bn case j of - Left err -> putStrLn ("join error: " <> err) >> tryJoining bns - Right joined -> pure $ Right joined - tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." + Left err -> putStrLn ("join error: " <> err) >> tryJoining bns joinFunc + Right joined -> pure $ Right () + tryJoining [] _ = pure $ Left "Exhausted all bootstrap points for joining." -- | Look up a key just based on the responses of a single bootstrapping node.