From 0ee8f0dc43ace1173d18095c0f99f20131d43cfa Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 28 Sep 2020 00:55:45 +0200 Subject: [PATCH] adjust joinOnNewEntreisThread to k-choices join --- src/Hash2Pub/FediChord.hs | 46 ++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 6c90b5d..e8a3260 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -133,9 +133,7 @@ fediChordInit initConf serviceRunner = do placeholderVSSTM <- newTVarIO placeholderVS atomically . modifyTVar' realNodeSTM $ addVserver (getNid placeholderVS, placeholderVSSTM) - -- TODO: on which bootstrap node vserver to join? (#77) -- launch thread attempting to join on new cache entries - -- TODO: adjust joinOnNewEntriesThread to k-choices _ <- forkIO $ joinOnNewEntriesThread realNodeSTM async (fediMainThreads serverSock realNodeSTM) ) @@ -195,9 +193,12 @@ nodeStateInit realNodeSTM vsID' = do pure initialState +-- | Joins a 'RealNode' to the DHT by joining several vservers, trying to match +-- the own load target best. +-- Triggers 'kChoicesVsJoin' kChoicesNodeJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => RealNodeSTM s - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrap joining -> m () kChoicesNodeJoin nodeSTM bootstrapNode = do node <- liftIO $ readTVarIO nodeSTM @@ -213,13 +214,14 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do joinLoadTarget = totalCapacity ownLoadStats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 initialJoins = confKChoicesMaxVS conf `div` 2 -- edge case: however small the target is, at least join 1 vs - -- kCoicesVsJoin until target is met - joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins) nodeSTM - if HMap.null joinedVss - then throwError "k-choices join unsuccessful, no vserver joined" - else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node' - { vservers = HMap.union (vservers node') joinedVss } - pure () + -- kCoicesVsJoin until target is met – unless there's already an active & joined VS causing enough load + alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if isJoined vs then 1 else 0)) 0 $ vservers node + unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do + joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM + if HMap.null joinedVss + then throwError "k-choices join unsuccessful, no vserver joined" + else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node' + { vservers = HMap.union (vservers node') joinedVss } where vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) @@ -242,7 +244,7 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeStateSTM s -- ^ vserver to be used for querying - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrappinG -> Double -- ^ own capacity -> VSMap s -- ^ currently active VServers -> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver @@ -262,7 +264,11 @@ kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarg -- simplification: treat each load lookup failure as a general unavailability of that segment -- TODO: retries for transient failures segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do - currentlyResponsible <- bootstrapQueryId queryVsSTM bootstrapNode vsNid + -- if bootstrap node is provided, do initial lookup via that + currentlyResponsible <- maybe + (requestQueryID queryVs vsNid) + (\bs -> bootstrapQueryId queryVsSTM bs vsNid) + bootstrapNode segment <- requestQueryLoad queryVs vsNid currentlyResponsible pure $ Just (segment, vsId, currentlyResponsible) -- store segment stats and vserver ID together, so it's clear @@ -367,7 +373,7 @@ tryBootstrapJoining nodeSTM = do bss = bootstrapNodes node conf = nodeConfig node if confEnableKChoices conf - then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM + then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM . Just else do firstVS <- nodeStateInit nodeSTM 0 firstVSSTM <- newTVarIO firstVS @@ -517,9 +523,10 @@ joinOnNewEntriesThread nodeSTM = loop -- periodic rebalance -- TODO: document this approach in the docs loop = do - lookupResult <- atomically $ do + (lookupResult, conf, firstVSSTM) <- atomically $ do nodeSnap <- readTVar nodeSTM - case headMay (HMap.toList $ vservers nodeSnap) of + let conf = nodeConfig nodeSnap + case headMay (HMap.elems $ vservers nodeSnap) of Nothing -> retry Just vsSTM -> do -- take any active vserver as heuristic for whether this node has @@ -532,7 +539,7 @@ joinOnNewEntriesThread nodeSTM = loop -- …which, having no neighbours, returns an empty forward list -- -> block until cache changes and then retry (FORWARD s) | Set.null s -> retry - result -> pure result + result -> pure (result, conf, vsSTM) case lookupResult of -- already joined FOUND _ -> @@ -540,10 +547,13 @@ joinOnNewEntriesThread nodeSTM = loop -- otherwise try joining FORWARD _ -> do -- do normal join, but without bootstrap nodes - joinResult <- runExceptT $ fediChordVserverJoin nsSTM + joinResult <- if confEnableKChoices conf + then runExceptT $ kChoicesNodeJoin nodeSTM Nothing + else runExceptT $ fediChordVserverJoin firstVSSTM + >> pure () either -- on join failure, sleep and retry - (const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop) + (const $ threadDelay (confJoinAttemptsInterval conf) >> loop) (const $ pure ()) joinResult