modularise VS candidate load querying into own function
This commit is contained in:
parent
8bd4e04bcd
commit
048a6ce391
|
@ -218,7 +218,7 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do
|
||||||
-- kCoicesVsJoin until target is met – unless there's already an active & joined VS causing enough load
|
-- kCoicesVsJoin until target is met – unless there's already an active & joined VS causing enough load
|
||||||
alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node
|
alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node
|
||||||
unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do
|
unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do
|
||||||
joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM
|
joinedVss <- vsJoins vs0 (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM
|
||||||
if nullRMap joinedVss
|
if nullRMap joinedVss
|
||||||
then throwError "k-choices join unsuccessful, no vserver joined"
|
then throwError "k-choices join unsuccessful, no vserver joined"
|
||||||
else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
|
else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
|
||||||
|
@ -226,17 +226,17 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do
|
||||||
|
|
||||||
where
|
where
|
||||||
vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||||
=> LocalNodeStateSTM s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s)
|
=> LocalNodeState s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s)
|
||||||
vsJoins _ _ vsmap _ 0 _ = pure vsmap
|
vsJoins _ _ vsmap _ 0 _ = pure vsmap
|
||||||
vsJoins queryVsSTM capacity vsmap remainingTargetLoad remainingJoins nodeSTM'
|
vsJoins queryVs capacity vsmap remainingTargetLoad remainingJoins nodeSTM'
|
||||||
| remainingTargetLoad <= 0 = pure vsmap
|
| remainingTargetLoad <= 0 = pure vsmap
|
||||||
| otherwise = do
|
| otherwise = do
|
||||||
|
|
||||||
(acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVsSTM bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad
|
(acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVs bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad
|
||||||
-- on successful vserver join add the new VS to node and recurse
|
-- on successful vserver join add the new VS to node and recurse
|
||||||
vsJoins queryVsSTM capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM'
|
vsJoins queryVs capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM'
|
||||||
-- on error, just reduce the amount of tries and retry
|
-- on error, just reduce the amount of tries and retry
|
||||||
`catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVsSTM capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM')
|
`catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVs capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM')
|
||||||
|
|
||||||
-- error cause 1: not a single queried node has responded -> indicates permanent failure
|
-- error cause 1: not a single queried node has responded -> indicates permanent failure
|
||||||
-- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen
|
-- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen
|
||||||
|
@ -244,39 +244,17 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do
|
||||||
-- `catchError` (const .
|
-- `catchError` (const .
|
||||||
|
|
||||||
kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||||
=> LocalNodeStateSTM s -- ^ vserver to be used for querying
|
=> LocalNodeState s -- ^ vserver to be used for querying
|
||||||
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrappinG
|
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping
|
||||||
-> Double -- ^ own capacity
|
-> Double -- ^ own capacity
|
||||||
-> VSMap s -- ^ currently active VServers
|
-> VSMap s -- ^ currently active VServers
|
||||||
-> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver
|
-> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver
|
||||||
-> Double -- ^ remaining load target
|
-> Double -- ^ remaining load target
|
||||||
-> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver
|
-> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver
|
||||||
kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarget = do
|
kChoicesVsJoin queryVs bootstrapNode capacity activeVss nodeSTM remainingTarget = do
|
||||||
conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM)
|
conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM)
|
||||||
-- generate all possible vs IDs
|
-- generate all possible vs IDs
|
||||||
let
|
segmentLoads <- kChoicesSegmentLoads conf queryVs bootstrapNode activeVss
|
||||||
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
|
|
||||||
-- LocalNodeState creation
|
|
||||||
nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]]
|
|
||||||
queryVs <- liftIO $ readTVarIO queryVsSTM
|
|
||||||
|
|
||||||
-- query load of all possible segments
|
|
||||||
-- simplification: treat each load lookup failure as a general unavailability of that segment
|
|
||||||
-- TODO: retries for transient failures
|
|
||||||
segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
|
|
||||||
-- if bootstrap node is provided, do initial lookup via that
|
|
||||||
currentlyResponsible <- maybe
|
|
||||||
(requestQueryID queryVs vsNid)
|
|
||||||
(\bs -> bootstrapQueryId queryVs bs vsNid)
|
|
||||||
bootstrapNode
|
|
||||||
segment <- requestQueryLoad queryVs vsNid currentlyResponsible
|
|
||||||
pure $ Just (segment, vsId, currentlyResponsible)
|
|
||||||
-- store segment stats and vserver ID together, so it's clear
|
|
||||||
-- which vs needs to be joined to acquire this segment
|
|
||||||
) `catchError` const (pure Nothing)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
-- cost calculation and sort by cost
|
-- cost calculation and sort by cost
|
||||||
-- edge case: no possible ID has responded
|
-- edge case: no possible ID has responded
|
||||||
(mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure
|
(mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure
|
||||||
|
@ -303,6 +281,39 @@ kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarg
|
||||||
-- changes the remainingLoadTarget at each vsJoin. This target change
|
-- changes the remainingLoadTarget at each vsJoin. This target change
|
||||||
-- needs to be accounted for starting from the 2nd vsJoin.
|
-- needs to be accounted for starting from the 2nd vsJoin.
|
||||||
|
|
||||||
|
|
||||||
|
-- | query the load of all still unjoined VS positions
|
||||||
|
kChoicesSegmentLoads :: (Service s (RealNodeSTM s), MonadError String m, MonadIO m)
|
||||||
|
=> FediChordConf -- ^ config params needed for generating all possible VSs
|
||||||
|
-> LocalNodeState s -- ^ vserver to be used for querying
|
||||||
|
-> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping
|
||||||
|
-> VSMap s -- ^ currently active VServers
|
||||||
|
-> m [(SegmentLoadStats, Word8, RemoteNodeState)]
|
||||||
|
kChoicesSegmentLoads conf queryVs bootstrapNode activeVss = do
|
||||||
|
let
|
||||||
|
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
|
||||||
|
-- LocalNodeState creation
|
||||||
|
nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]]
|
||||||
|
|
||||||
|
-- query load of all possible segments
|
||||||
|
-- simplification: treat each load lookup failure as a general unavailability of that segment
|
||||||
|
-- TODO: retries for transient failures
|
||||||
|
fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
|
||||||
|
-- if bootstrap node is provided, do initial lookup via that
|
||||||
|
currentlyResponsible <- maybe
|
||||||
|
(requestQueryID queryVs vsNid)
|
||||||
|
(\bs -> bootstrapQueryId queryVs bs vsNid)
|
||||||
|
bootstrapNode
|
||||||
|
segment <- requestQueryLoad queryVs vsNid currentlyResponsible
|
||||||
|
pure $ Just (segment, vsId, currentlyResponsible)
|
||||||
|
-- store segment stats and vserver ID together, so it's clear
|
||||||
|
-- which vs needs to be joined to acquire this segment
|
||||||
|
) `catchError` const (pure Nothing)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
kChoicesJoinCost :: Double -- ^ own remaining load target
|
kChoicesJoinCost :: Double -- ^ own remaining load target
|
||||||
-> Double -- ^ own capacity
|
-> Double -- ^ own capacity
|
||||||
-> SegmentLoadStats -- ^ load stats of neighbour vs
|
-> SegmentLoadStats -- ^ load stats of neighbour vs
|
||||||
|
@ -324,27 +335,19 @@ kChoicesDepartureCost remainingOwnLoad ownCap thisSegmentLoad segment =
|
||||||
|
|
||||||
|
|
||||||
kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO ()
|
kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO ()
|
||||||
kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS 0
|
kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS
|
||||||
where
|
where
|
||||||
rebalanceVS :: NodeID -> Int -> IO ()
|
rebalanceVS :: Int -> IO ()
|
||||||
rebalanceVS startAt interval = do
|
rebalanceVS interval = do
|
||||||
threadDelay interval
|
threadDelay interval
|
||||||
(vsToRebalance', serv) <- atomically $ do
|
-- query load of all possible available VS IDs
|
||||||
node <- readTVar nodeSTM
|
-- query load of all existing VSes neighbours
|
||||||
pure (rMapLookupPred 0 (vservers node), nodeService node)
|
-- calculate all departure costs
|
||||||
maybe
|
-- select VS with lowest departure cost
|
||||||
-- wait and retry if no active VS available
|
-- calculate all relocation costs of that VS
|
||||||
(rebalanceVS startAt interval)
|
|
||||||
(\(vsNid, vsSTM) -> do
|
|
||||||
vs <- readTVarIO vsSTM
|
|
||||||
-- query neighbour node(s) load
|
|
||||||
-- query own load
|
|
||||||
-- calculate departure cost
|
|
||||||
-- if deciding to re-balance, first leave and then join
|
-- if deciding to re-balance, first leave and then join
|
||||||
-- loop
|
-- loop
|
||||||
rebalanceVS vsNid interval
|
rebalanceVS interval
|
||||||
)
|
|
||||||
vsToRebalance'
|
|
||||||
|
|
||||||
|
|
||||||
-- placeholder
|
-- placeholder
|
||||||
|
|
Loading…
Reference in a new issue