parent
7a87d86c32
commit
1a0de55b8c
|
@ -123,34 +123,21 @@ fediChordInit initConf serviceRunner = do
|
|||
-- prepare for joining: start node cache writer thread
|
||||
-- currently no masking is necessary, as there is nothing to clean up
|
||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
|
||||
fediThreadsAsync <- if confEnableKChoices initConf
|
||||
then
|
||||
-- TODO: k-choices way of joining
|
||||
-- placeholder
|
||||
runExceptT (kChoicesNodeJoin realNodeSTM ("foo", fromIntegral 3))
|
||||
>> async (fediMainThreads serverSock realNodeSTM)
|
||||
else do
|
||||
-- without k-choices, just initialise a single vserver
|
||||
firstVS <- nodeStateInit realNodeSTM 0
|
||||
firstVSSTM <- newTVarIO firstVS
|
||||
-- add vserver to list at RealNode
|
||||
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
|
||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||
joinedState <- tryBootstrapJoining firstVSSTM
|
||||
|
||||
either (\err -> do
|
||||
-- handle unsuccessful join
|
||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||
-- launch thread attempting to join on new cache entries
|
||||
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
|
||||
async (fediMainThreads serverSock realNodeSTM)
|
||||
)
|
||||
(\joinedNS -> do
|
||||
-- launch main eventloop with successfully joined state
|
||||
putStrLn "successful join"
|
||||
async (fediMainThreads serverSock realNodeSTM)
|
||||
)
|
||||
joinedState
|
||||
fediThreadsAsync <- do
|
||||
either (\err -> do
|
||||
-- handle unsuccessful join
|
||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||
-- launch thread attempting to join on new cache entries
|
||||
-- TODO: adjust joinOnNewEntriesThread to k-choices
|
||||
--_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
|
||||
async (fediMainThreads serverSock realNodeSTM)
|
||||
)
|
||||
(\_ -> do
|
||||
-- launch main eventloop with successfully joined state
|
||||
putStrLn "successful join"
|
||||
async (fediMainThreads serverSock realNodeSTM)
|
||||
)
|
||||
=<< tryBootstrapJoining realNodeSTM
|
||||
pure (fediThreadsAsync, realNodeSTM)
|
||||
|
||||
|
||||
|
@ -318,7 +305,7 @@ kChoicesJoinCost remainingOwnLoad ownCap segment =
|
|||
fediChordBootstrapJoin :: Service s (RealNodeSTM s)
|
||||
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
|
||||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
|
||||
-> IO (Either String ()) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
fediChordBootstrapJoin nsSTM bootstrapNode = do
|
||||
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
||||
|
@ -330,10 +317,10 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
|
|||
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||
-- 2. then send a join to the currently responsible node
|
||||
liftIO $ putStrLn "send a bootstrap Join"
|
||||
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
|
||||
liftEither joinResult
|
||||
_ <- liftEither =<< liftIO (requestJoin currentlyResponsible nsSTM)
|
||||
pure ()
|
||||
|
||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
||||
-- Periodically lookup own IDs through a random bootstrapping node to discover and merge separated DHT clusters.
|
||||
-- Unjoined try joining instead.
|
||||
convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
||||
convergenceSampleThread nodeSTM = forever $ do
|
||||
|
@ -362,27 +349,41 @@ convergenceSampleThread nodeSTM = forever $ do
|
|||
else pure ()
|
||||
) >> pure ()
|
||||
-- unjoined node: try joining through all bootstrapping nodes
|
||||
else tryBootstrapJoining nsSTM >> pure ()
|
||||
else tryBootstrapJoining nodeSTM >> pure ()
|
||||
|
||||
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node
|
||||
threadDelay delaySecs
|
||||
|
||||
|
||||
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
|
||||
tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
|
||||
tryBootstrapJoining nsSTM = do
|
||||
bss <- atomically $ do
|
||||
nsSnap <- readTVar nsSTM
|
||||
realNodeSnap <- readTVar $ parentRealNode nsSnap
|
||||
pure $ bootstrapNodes realNodeSnap
|
||||
tryJoining bss
|
||||
tryBootstrapJoining :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (Either String ())
|
||||
tryBootstrapJoining nodeSTM = do
|
||||
node <- readTVarIO nodeSTM
|
||||
let
|
||||
bss = bootstrapNodes node
|
||||
conf = nodeConfig node
|
||||
if confEnableKChoices conf
|
||||
then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM
|
||||
else do
|
||||
firstVS <- nodeStateInit nodeSTM 0
|
||||
firstVSSTM <- newTVarIO firstVS
|
||||
joinResult <- tryJoining bss (fediChordBootstrapJoin firstVSSTM)
|
||||
either
|
||||
(pure . Left)
|
||||
(\_ -> fmap Right . atomically . modifyTVar' nodeSTM $ (\rn -> rn
|
||||
{ vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
|
||||
)
|
||||
)
|
||||
(joinResult :: Either String ())
|
||||
|
||||
where
|
||||
tryJoining (bn:bns) = do
|
||||
j <- fediChordBootstrapJoin nsSTM bn
|
||||
tryJoining :: [(String, PortNumber)] -> ((String, PortNumber) -> IO (Either String ())) -> IO (Either String ())
|
||||
tryJoining (bn:bns) joinFunc = do
|
||||
j <- joinFunc bn
|
||||
case j of
|
||||
Left err -> putStrLn ("join error: " <> err) >> tryJoining bns
|
||||
Right joined -> pure $ Right joined
|
||||
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
|
||||
Left err -> putStrLn ("join error: " <> err) >> tryJoining bns joinFunc
|
||||
Right joined -> pure $ Right ()
|
||||
tryJoining [] _ = pure $ Left "Exhausted all bootstrap points for joining."
|
||||
|
||||
|
||||
-- | Look up a key just based on the responses of a single bootstrapping node.
|
||||
|
|
Loading…
Reference in a new issue