diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index d9f7e05..39eaad2 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -92,8 +92,9 @@ import Hash2Pub.FediChordTypes (CacheEntry (..), cacheLookup, cacheLookupPred, cacheLookupSucc, genNodeID, getKeyID, hasValidNodeId, - localCompare, rMapFromList, - rMapLookupPred, rMapLookupSucc, + loadSliceSum, localCompare, + rMapFromList, rMapLookupPred, + rMapLookupSucc, remainingLoadTarget, setPredecessors, setSuccessors) import Hash2Pub.ProtocolTypes @@ -453,7 +454,7 @@ respondQueryLoad nsSTM msgSet = do lStats <- getServiceLoadStats serv let directSucc = getNid . head . predecessors $ nsSnap - handledTagSum = sum . takeRMapSuccessorsFromTo directSucc (loadSegmentUpperBound pl) $ loadPerTag lStats + handledTagSum = loadSliceSum lStats directSucc (loadSegmentUpperBound pl) pure $ Just LoadResponsePayload { loadSum = handledTagSum , loadRemainingTarget = remainingLoadTarget conf lStats @@ -792,9 +793,9 @@ requestPing ns target = do -- still need a particular vserver as LocalNodeState, because requests need a sender requestQueryLoad :: (MonadError String m, MonadIO m) - => LocalNodeState s - -> NodeID - -> RemoteNodeState + => LocalNodeState s -- ^ the local source vserver for the request + -> NodeID -- ^ upper bound of the segment queried, lower bound is set automatically by the queried node + -> RemoteNodeState -- ^ target node to query -> m SegmentLoadStats requestQueryLoad ns upperIdBound target = do nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 51c23c5..ab413cf 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -298,6 +298,7 @@ kChoicesSegmentLoads conf queryVs bootstrapNode activeVss = do -- query load of all possible segments -- simplification: treat each load lookup failure as a general unavailability of that segment -- TODO: retries for transient failures + -- TODO: parallel queries fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do -- if bootstrap node is provided, do initial lookup via that currentlyResponsible <- maybe @@ -335,23 +336,96 @@ kChoicesDepartureCost remainingOwnLoad ownCap thisSegmentLoad segment = kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO () -kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS +kChoicesRebalanceThread nodeSTM = do + interval <- confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM + runExceptT $ loop interval + pure () where - rebalanceVS :: Int -> IO () + loop interval = rebalanceVS interval `catchError` \_ -> loop interval + rebalanceVS :: (MonadError String m, MonadIO m) => Int -> m () rebalanceVS interval = do - threadDelay interval - -- query load of all possible available VS IDs + liftIO $ threadDelay interval + node <- liftIO $ readTVarIO nodeSTM + let + activeVssSTM = vservers node + conf = nodeConfig node + -- use an active vserver for load queries + queryVsSTM <- maybe (throwError "no active vserver") pure + $ headMay (rMapToList activeVssSTM) + queryVs <- liftIO . readTVarIO $ queryVsSTM + -- TODO: segment load and neighbour load queries can be done in parallel -- query load of all existing VSes neighbours + -- TODO: what happens if neighbour is one of our own vservers? + neighbourLoadFetches <- liftIO . forM activeVssSTM $ async . (\vsSTM -> runExceptT $ do + vs <- liftIO . readTVarIO $ vsSTM + succNode <- maybe + (throwError "vs has no successor") + pure + . headMay . successors $ vs + neighbourStats <- requestQueryLoad queryVs (getNid succNode) succNode + pure (getNid succNode, neighbourStats) + ) + -- TODO: deal with exceptions + -- TODO: better handling of nested Eithers + -- so far this is a RingMap NodeID (Either SomeException (Either String (NodeID, SegmentLoadStats))) + neighbourLoads <- liftIO . mapM waitCatch $ neighbourLoadFetches + -- get local load stats + ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node -- calculate all departure costs + let + departureCosts = + sortOn (\(cost, _, _) -> cost) + . foldl (\acc (ownVsId, neighbourLoad) -> case neighbourLoad of + Right (Right (neighbourId, neighbourStats)) -> + let + ownRemainingTarget = remainingLoadTarget conf ownLoadStats + thisSegmentLoad = loadSliceSum ownLoadStats ownVsId neighbourId + in + ( kChoicesDepartureCost ownRemainingTarget (totalCapacity ownLoadStats) thisSegmentLoad neighbourStats + , thisSegmentLoad + , ownVsId) + :acc + _ -> acc + ) + [] + $ rMapToListWithKeys neighbourLoads -- select VS with lowest departure cost + (lowestDepartionCost, departingSegmentLoad, lowestCostDeparter) <- maybe + (throwError "not enough data for calculating departure costs") + pure + $ headMay departureCosts + -- query load of all possible available VS IDs + segmentLoads <- kChoicesSegmentLoads conf queryVs Nothing activeVssSTM -- calculate all relocation costs of that VS + (joinCost, toJoinOn) <- + maybe (throwError "got no segment loads") pure + . headMay + . sortOn fst + . fmap (\(segment, vsId, toJoinOn) -> + let joinCosts = kChoicesJoinCost + -- when relocating a node, the load of the departing node is freed + (remainingLoadTarget conf ownLoadStats + departingSegmentLoad) + (totalCapacity ownLoadStats) + segment + in + (joinCosts, segmentCurrentOwner segment) + ) + $ segmentLoads + -- if deciding to re-balance, first leave and then join + -- combined costs need to be a gain (negative) and that gain needs + -- to be larger than Epsilon + when (lowestDepartionCost + joinCost <= negate kChoicesEpsilon) $ do + liftIO . putStrLn $ "here will be a relocation!" -- loop rebalanceVS interval - -- placeholder - pure () + +-- TODO: make parameterisable +-- | dampening factor constant for deciding whether the match gain is worth relocating +kChoicesEpsilon :: Double +kChoicesEpsilon = 0.05 -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- for resolving the new node's position. diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 3a954d1..347c90c 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -22,6 +22,7 @@ module Hash2Pub.FediChordTypes , LoadStats (..) , emptyLoadStats , remainingLoadTarget + , loadSliceSum , addVserver , SegmentLoadStats (..) , setSuccessors @@ -483,6 +484,15 @@ remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats where targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 + +-- | calculates the sum of tag load in a contiguous slice between to keys +loadSliceSum :: LoadStats + -> NodeID -- ^ lower segment bound + -> NodeID -- ^ upper segment bound + -> Double -- ^ sum of all tag loads within that segment +loadSliceSum stats from to = sum . takeRMapSuccessorsFromTo from to $ loadPerTag stats + + data SegmentLoadStats = SegmentLoadStats { segmentLowerKeyBound :: NodeID -- ^ segment start key