adjust joinOnNewEntreisThread to k-choices join
This commit is contained in:
parent
21ecf9b041
commit
0ee8f0dc43
|
@ -133,9 +133,7 @@ fediChordInit initConf serviceRunner = do
|
|||
placeholderVSSTM <- newTVarIO placeholderVS
|
||||
atomically . modifyTVar' realNodeSTM $
|
||||
addVserver (getNid placeholderVS, placeholderVSSTM)
|
||||
-- TODO: on which bootstrap node vserver to join? (#77)
|
||||
-- launch thread attempting to join on new cache entries
|
||||
-- TODO: adjust joinOnNewEntriesThread to k-choices
|
||||
_ <- forkIO $ joinOnNewEntriesThread realNodeSTM
|
||||
async (fediMainThreads serverSock realNodeSTM)
|
||||
)
|
||||
|
@ -195,9 +193,12 @@ nodeStateInit realNodeSTM vsID' = do
|
|||
pure initialState
|
||||
|
||||
|
||||
-- | Joins a 'RealNode' to the DHT by joining several vservers, trying to match
|
||||
-- the own load target best.
|
||||
-- Triggers 'kChoicesVsJoin'
|
||||
kChoicesNodeJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||
=> RealNodeSTM s
|
||||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrap joining
|
||||
-> m ()
|
||||
kChoicesNodeJoin nodeSTM bootstrapNode = do
|
||||
node <- liftIO $ readTVarIO nodeSTM
|
||||
|
@ -213,13 +214,14 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do
|
|||
joinLoadTarget = totalCapacity ownLoadStats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
|
||||
initialJoins = confKChoicesMaxVS conf `div` 2
|
||||
-- edge case: however small the target is, at least join 1 vs
|
||||
-- kCoicesVsJoin until target is met
|
||||
joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins) nodeSTM
|
||||
if HMap.null joinedVss
|
||||
then throwError "k-choices join unsuccessful, no vserver joined"
|
||||
else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
|
||||
{ vservers = HMap.union (vservers node') joinedVss }
|
||||
pure ()
|
||||
-- kCoicesVsJoin until target is met – unless there's already an active & joined VS causing enough load
|
||||
alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if isJoined vs then 1 else 0)) 0 $ vservers node
|
||||
unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do
|
||||
joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM
|
||||
if HMap.null joinedVss
|
||||
then throwError "k-choices join unsuccessful, no vserver joined"
|
||||
else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
|
||||
{ vservers = HMap.union (vservers node') joinedVss }
|
||||
|
||||
where
|
||||
vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||
|
@ -242,7 +244,7 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do
|
|||
|
||||
kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||
=> LocalNodeStateSTM s -- ^ vserver to be used for querying
|
||||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrappinG
|
||||
-> Double -- ^ own capacity
|
||||
-> VSMap s -- ^ currently active VServers
|
||||
-> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver
|
||||
|
@ -262,7 +264,11 @@ kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarg
|
|||
-- simplification: treat each load lookup failure as a general unavailability of that segment
|
||||
-- TODO: retries for transient failures
|
||||
segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
|
||||
currentlyResponsible <- bootstrapQueryId queryVsSTM bootstrapNode vsNid
|
||||
-- if bootstrap node is provided, do initial lookup via that
|
||||
currentlyResponsible <- maybe
|
||||
(requestQueryID queryVs vsNid)
|
||||
(\bs -> bootstrapQueryId queryVsSTM bs vsNid)
|
||||
bootstrapNode
|
||||
segment <- requestQueryLoad queryVs vsNid currentlyResponsible
|
||||
pure $ Just (segment, vsId, currentlyResponsible)
|
||||
-- store segment stats and vserver ID together, so it's clear
|
||||
|
@ -367,7 +373,7 @@ tryBootstrapJoining nodeSTM = do
|
|||
bss = bootstrapNodes node
|
||||
conf = nodeConfig node
|
||||
if confEnableKChoices conf
|
||||
then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM
|
||||
then tryJoining bss $ runExceptT . kChoicesNodeJoin nodeSTM . Just
|
||||
else do
|
||||
firstVS <- nodeStateInit nodeSTM 0
|
||||
firstVSSTM <- newTVarIO firstVS
|
||||
|
@ -517,9 +523,10 @@ joinOnNewEntriesThread nodeSTM = loop
|
|||
-- periodic rebalance
|
||||
-- TODO: document this approach in the docs
|
||||
loop = do
|
||||
lookupResult <- atomically $ do
|
||||
(lookupResult, conf, firstVSSTM) <- atomically $ do
|
||||
nodeSnap <- readTVar nodeSTM
|
||||
case headMay (HMap.toList $ vservers nodeSnap) of
|
||||
let conf = nodeConfig nodeSnap
|
||||
case headMay (HMap.elems $ vservers nodeSnap) of
|
||||
Nothing -> retry
|
||||
Just vsSTM -> do
|
||||
-- take any active vserver as heuristic for whether this node has
|
||||
|
@ -532,7 +539,7 @@ joinOnNewEntriesThread nodeSTM = loop
|
|||
-- …which, having no neighbours, returns an empty forward list
|
||||
-- -> block until cache changes and then retry
|
||||
(FORWARD s) | Set.null s -> retry
|
||||
result -> pure result
|
||||
result -> pure (result, conf, vsSTM)
|
||||
case lookupResult of
|
||||
-- already joined
|
||||
FOUND _ ->
|
||||
|
@ -540,10 +547,13 @@ joinOnNewEntriesThread nodeSTM = loop
|
|||
-- otherwise try joining
|
||||
FORWARD _ -> do
|
||||
-- do normal join, but without bootstrap nodes
|
||||
joinResult <- runExceptT $ fediChordVserverJoin nsSTM
|
||||
joinResult <- if confEnableKChoices conf
|
||||
then runExceptT $ kChoicesNodeJoin nodeSTM Nothing
|
||||
else runExceptT $ fediChordVserverJoin firstVSSTM
|
||||
>> pure ()
|
||||
either
|
||||
-- on join failure, sleep and retry
|
||||
(const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop)
|
||||
(const $ threadDelay (confJoinAttemptsInterval conf) >> loop)
|
||||
(const $ pure ())
|
||||
joinResult
|
||||
|
||||
|
|
Loading…
Reference in a new issue