kChoices cost calculations for rebalance decisions
- loop with load queries and cost calculations on whether to do a vs relocation - actual relocation still missing though - untested
This commit is contained in:
parent
6aebd982f8
commit
4aa4667a1d
|
@ -92,8 +92,9 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
|
||||||
cacheLookup, cacheLookupPred,
|
cacheLookup, cacheLookupPred,
|
||||||
cacheLookupSucc, genNodeID,
|
cacheLookupSucc, genNodeID,
|
||||||
getKeyID, hasValidNodeId,
|
getKeyID, hasValidNodeId,
|
||||||
localCompare, rMapFromList,
|
loadSliceSum, localCompare,
|
||||||
rMapLookupPred, rMapLookupSucc,
|
rMapFromList, rMapLookupPred,
|
||||||
|
rMapLookupSucc,
|
||||||
remainingLoadTarget,
|
remainingLoadTarget,
|
||||||
setPredecessors, setSuccessors)
|
setPredecessors, setSuccessors)
|
||||||
import Hash2Pub.ProtocolTypes
|
import Hash2Pub.ProtocolTypes
|
||||||
|
@ -453,7 +454,7 @@ respondQueryLoad nsSTM msgSet = do
|
||||||
lStats <- getServiceLoadStats serv
|
lStats <- getServiceLoadStats serv
|
||||||
let
|
let
|
||||||
directSucc = getNid . head . predecessors $ nsSnap
|
directSucc = getNid . head . predecessors $ nsSnap
|
||||||
handledTagSum = sum . takeRMapSuccessorsFromTo directSucc (loadSegmentUpperBound pl) $ loadPerTag lStats
|
handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl)
|
||||||
pure $ Just LoadResponsePayload
|
pure $ Just LoadResponsePayload
|
||||||
{ loadSum = handledTagSum
|
{ loadSum = handledTagSum
|
||||||
, loadRemainingTarget = remainingLoadTarget conf lStats
|
, loadRemainingTarget = remainingLoadTarget conf lStats
|
||||||
|
@ -792,9 +793,9 @@ requestPing ns target = do
|
||||||
|
|
||||||
-- still need a particular vserver as LocalNodeState, because requests need a sender
|
-- still need a particular vserver as LocalNodeState, because requests need a sender
|
||||||
requestQueryLoad :: (MonadError String m, MonadIO m)
|
requestQueryLoad :: (MonadError String m, MonadIO m)
|
||||||
=> LocalNodeState s
|
=> LocalNodeState s -- ^ the local source vserver for the request
|
||||||
-> NodeID
|
-> NodeID -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node
|
||||||
-> RemoteNodeState
|
-> RemoteNodeState -- ^ target node to query
|
||||||
-> m SegmentLoadStats
|
-> m SegmentLoadStats
|
||||||
requestQueryLoad ns upperIdBound target = do
|
requestQueryLoad ns upperIdBound target = do
|
||||||
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
|
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
|
||||||
|
|
|
@ -298,6 +298,7 @@ kChoicesSegmentLoads conf queryVs bootstrapNode activeVss = do
|
||||||
-- query load of all possible segments
|
-- query load of all possible segments
|
||||||
-- simplification: treat each load lookup failure as a general unavailability of that segment
|
-- simplification: treat each load lookup failure as a general unavailability of that segment
|
||||||
-- TODO: retries for transient failures
|
-- TODO: retries for transient failures
|
||||||
|
-- TODO: parallel queries
|
||||||
fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
|
fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
|
||||||
-- if bootstrap node is provided, do initial lookup via that
|
-- if bootstrap node is provided, do initial lookup via that
|
||||||
currentlyResponsible <- maybe
|
currentlyResponsible <- maybe
|
||||||
|
@ -335,23 +336,96 @@ kChoicesDepartureCost remainingOwnLoad ownCap thisSegmentLoad segment =
|
||||||
|
|
||||||
|
|
||||||
kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO ()
|
kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO ()
|
||||||
kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS
|
kChoicesRebalanceThread nodeSTM = do
|
||||||
|
interval <- confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM
|
||||||
|
runExceptT $ loop interval
|
||||||
|
pure ()
|
||||||
where
|
where
|
||||||
rebalanceVS :: Int -> IO ()
|
loop interval = rebalanceVS interval `catchError` \_ -> loop interval
|
||||||
|
rebalanceVS :: (MonadError String m, MonadIO m) => Int -> m ()
|
||||||
rebalanceVS interval = do
|
rebalanceVS interval = do
|
||||||
threadDelay interval
|
liftIO $ threadDelay interval
|
||||||
-- query load of all possible available VS IDs
|
node <- liftIO $ readTVarIO nodeSTM
|
||||||
|
let
|
||||||
|
activeVssSTM = vservers node
|
||||||
|
conf = nodeConfig node
|
||||||
|
-- use an active vserver for load queries
|
||||||
|
queryVsSTM <- maybe (throwError "no active vserver") pure
|
||||||
|
$ headMay (rMapToList activeVssSTM)
|
||||||
|
queryVs <- liftIO . readTVarIO $ queryVsSTM
|
||||||
|
-- TODO: segment load and neighbour load queries can be done in parallel
|
||||||
-- query load of all existing VSes neighbours
|
-- query load of all existing VSes neighbours
|
||||||
|
-- TODO: what happens if neighbour is one of our own vservers?
|
||||||
|
neighbourLoadFetches <- liftIO . forM activeVssSTM $ async . (\vsSTM -> runExceptT $ do
|
||||||
|
vs <- liftIO . readTVarIO $ vsSTM
|
||||||
|
succNode <- maybe
|
||||||
|
(throwError "vs has no successor")
|
||||||
|
pure
|
||||||
|
. headMay . successors $ vs
|
||||||
|
neighbourStats <- requestQueryLoad queryVs (getNid succNode) succNode
|
||||||
|
pure (getNid succNode, neighbourStats)
|
||||||
|
)
|
||||||
|
-- TODO: deal with exceptions
|
||||||
|
-- TODO: better handling of nested Eithers
|
||||||
|
-- so far this is a RingMap NodeID (Either SomeException (Either String (NodeID, SegmentLoadStats)))
|
||||||
|
neighbourLoads <- liftIO . mapM waitCatch $ neighbourLoadFetches
|
||||||
|
-- get local load stats
|
||||||
|
ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node
|
||||||
-- calculate all departure costs
|
-- calculate all departure costs
|
||||||
|
let
|
||||||
|
departureCosts =
|
||||||
|
sortOn (\(cost, _, _) -> cost)
|
||||||
|
. foldl (\acc (ownVsId, neighbourLoad) -> case neighbourLoad of
|
||||||
|
Right (Right (neighbourId, neighbourStats)) ->
|
||||||
|
let
|
||||||
|
ownRemainingTarget = remainingLoadTarget conf ownLoadStats
|
||||||
|
thisSegmentLoad = loadSliceSum ownLoadStats ownVsId neighbourId
|
||||||
|
in
|
||||||
|
( kChoicesDepartureCost ownRemainingTarget (totalCapacity ownLoadStats) thisSegmentLoad neighbourStats
|
||||||
|
, thisSegmentLoad
|
||||||
|
, ownVsId)
|
||||||
|
:acc
|
||||||
|
_ -> acc
|
||||||
|
)
|
||||||
|
[]
|
||||||
|
$ rMapToListWithKeys neighbourLoads
|
||||||
-- select VS with lowest departure cost
|
-- select VS with lowest departure cost
|
||||||
|
(lowestDepartionCost, departingSegmentLoad, lowestCostDeparter) <- maybe
|
||||||
|
(throwError "not enough data for calculating departure costs")
|
||||||
|
pure
|
||||||
|
$ headMay departureCosts
|
||||||
|
-- query load of all possible available VS IDs
|
||||||
|
segmentLoads <- kChoicesSegmentLoads conf queryVs Nothing activeVssSTM
|
||||||
-- calculate all relocation costs of that VS
|
-- calculate all relocation costs of that VS
|
||||||
|
(joinCost, toJoinOn) <-
|
||||||
|
maybe (throwError "got no segment loads") pure
|
||||||
|
. headMay
|
||||||
|
. sortOn fst
|
||||||
|
. fmap (\(segment, vsId, toJoinOn) ->
|
||||||
|
let joinCosts = kChoicesJoinCost
|
||||||
|
-- when relocating a node, the load of the departing node is freed
|
||||||
|
(remainingLoadTarget conf ownLoadStats + departingSegmentLoad)
|
||||||
|
(totalCapacity ownLoadStats)
|
||||||
|
segment
|
||||||
|
in
|
||||||
|
(joinCosts, segmentCurrentOwner segment)
|
||||||
|
)
|
||||||
|
$ segmentLoads
|
||||||
|
|
||||||
-- if deciding to re-balance, first leave and then join
|
-- if deciding to re-balance, first leave and then join
|
||||||
|
-- combined costs need to be a gain (negative) and that gain needs
|
||||||
|
-- to be larger than Epsilon
|
||||||
|
when (lowestDepartionCost + joinCost <= negate kChoicesEpsilon) $ do
|
||||||
|
liftIO . putStrLn $ "here will be a relocation!"
|
||||||
-- loop
|
-- loop
|
||||||
rebalanceVS interval
|
rebalanceVS interval
|
||||||
|
|
||||||
|
|
||||||
-- placeholder
|
|
||||||
pure ()
|
-- TODO: make parameterisable
|
||||||
|
-- | dampening factor constant for deciding whether the match gain is worth relocating
|
||||||
|
kChoicesEpsilon :: Double
|
||||||
|
kChoicesEpsilon = 0.05
|
||||||
|
|
||||||
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
|
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
|
||||||
-- for resolving the new node's position.
|
-- for resolving the new node's position.
|
||||||
|
|
|
@ -22,6 +22,7 @@ module Hash2Pub.FediChordTypes
|
||||||
, LoadStats (..)
|
, LoadStats (..)
|
||||||
, emptyLoadStats
|
, emptyLoadStats
|
||||||
, remainingLoadTarget
|
, remainingLoadTarget
|
||||||
|
, loadSliceSum
|
||||||
, addVserver
|
, addVserver
|
||||||
, SegmentLoadStats (..)
|
, SegmentLoadStats (..)
|
||||||
, setSuccessors
|
, setSuccessors
|
||||||
|
@ -483,6 +484,15 @@ remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
|
||||||
where
|
where
|
||||||
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
|
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
|
||||||
|
|
||||||
|
|
||||||
|
-- | calculates the sum of tag load in a contiguous slice between to keys
|
||||||
|
loadSliceSum :: LoadStats
|
||||||
|
-> NodeID -- ^ lower segment bound
|
||||||
|
-> NodeID -- ^ upper segment bound
|
||||||
|
-> Double -- ^ sum of all tag loads within that segment
|
||||||
|
loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats
|
||||||
|
|
||||||
|
|
||||||
data SegmentLoadStats = SegmentLoadStats
|
data SegmentLoadStats = SegmentLoadStats
|
||||||
{ segmentLowerKeyBound :: NodeID
|
{ segmentLowerKeyBound :: NodeID
|
||||||
-- ^ segment start key
|
-- ^ segment start key
|
||||||
|
|
Loading…
Reference in a new issue