diff --git a/FediChord.asn1 b/FediChord.asn1 index f978151..eafd303 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -111,6 +111,7 @@ LoadRequestPayload ::= SEQUENCE { LoadResponsePayload ::= SEQUENCE { loadSum REAL, remainingLoadTarget REAL, + totalCapacity REAL, lowerBound NodeID } diff --git a/app/Main.hs b/app/Main.hs index 964748a..24d66a9 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -49,6 +49,9 @@ readConfig = do , confRequestTimeout = 5 * 10^6 `div` speedup , confRequestRetries = 3 , confEnableKChoices = loadBalancingEnabled /= "off" + , confKChoicesOverload = 0.9 + , confKChoicesUnderload = 0.1 + , confKChoicesMaxVS = 8 } sConf = ServiceConf { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index 7701097..c2a5cc4 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -193,6 +193,7 @@ encodePayload payload'@LoadResponsePayload{} = [ Start Sequence , Real $ loadSum payload' , Real $ loadRemainingTarget payload' + , Real $ loadTotalCapacity payload' , IntVal . getNodeID $ loadSegmentLowerBound payload' , End Sequence ] @@ -472,10 +473,12 @@ parseLoadResponsePayload :: ParseASN1 ActionPayload parseLoadResponsePayload = onNextContainer Sequence $ do loadSum' <- parseReal loadRemainingTarget' <- parseReal + loadTotalCapacity' <- parseReal loadSegmentLowerBound' <- fromInteger <$> parseInteger pure LoadResponsePayload { loadSum = loadSum' , loadRemainingTarget = loadRemainingTarget' + , loadTotalCapacity = loadTotalCapacity' , loadSegmentLowerBound = loadSegmentLowerBound' } diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index c3cc858..1682e16 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -93,6 +93,7 @@ import Hash2Pub.FediChordTypes (CacheEntry (..), getKeyID, localCompare, rMapFromList, rMapLookupPred, rMapLookupSucc, + remainingLoadTarget, setPredecessors, setSuccessors) import Hash2Pub.ProtocolTypes import Hash2Pub.RingMap @@ -292,7 +293,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do -- only when joined Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing - QueryLoad -> if isJoined ns then Just <$> respondLoadQuery nsSTM msgSet else pure Nothing + QueryLoad -> if isJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing ) -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. @@ -433,8 +434,8 @@ respondPing nsSTM msgSet = do } pure $ serialiseMessage sendMessageSize pingResponse -respondLoadQuery :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -respondLoadQuery nsSTM msgSet = do +respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondQueryLoad nsSTM msgSet = do nsSnap <- readTVarIO nsSTM -- this message cannot be split reasonably, so just -- consider the first payload @@ -444,14 +445,18 @@ respondLoadQuery nsSTM msgSet = do responsePl <- maybe (pure Nothing) (\pl -> case pl of LoadRequestPayload{} -> do - serv <- nodeService <$> readTVarIO (parentRealNode nsSnap) + parentNode <- readTVarIO (parentRealNode nsSnap) + let + serv = nodeService parentNode + conf = nodeConfig parentNode lStats <- getServiceLoadStats serv let directSucc = getNid . head . predecessors $ nsSnap handledTagSum = sum . takeRMapSuccessorsFromTo directSucc (loadSegmentUpperBound pl) $ loadPerTag lStats pure $ Just LoadResponsePayload { loadSum = handledTagSum - , loadRemainingTarget = remainingLoadTarget lStats + , loadRemainingTarget = remainingLoadTarget conf lStats + , loadTotalCapacity = totalCapacity lStats , loadSegmentLowerBound = directSucc } _ -> pure Nothing @@ -808,7 +813,9 @@ requestQueryLoad ns upperIdBound target = do { segmentLowerKeyBound = loadSegmentLowerBound loadResPl , segmentUpperKeyBound = upperIdBound , segmentLoad = loadSum loadResPl - , segmentOwnerLoadTarget = loadRemainingTarget loadResPl + , segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl + , segmentOwnerCapacity = loadTotalCapacity loadResPl + , segmentCurrentOwner = target } diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index bab064d..c5836c0 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -63,6 +63,7 @@ import Control.Exception import Control.Monad (forM_, forever) import Control.Monad.Except import Crypto.Hash +import Data.Bifunctor (first) import qualified Data.ByteArray as BA import qualified Data.ByteString as BS import qualified Data.ByteString.UTF8 as BSU @@ -71,9 +72,11 @@ import Data.Foldable (foldr') import Data.Functor.Identity import Data.HashMap.Strict (HashMap) import qualified Data.HashMap.Strict as HMap +import Data.HashSet (HashSet) +import qualified Data.HashSet as HSet import Data.IP (IPv6, fromHostAddress6, toHostAddress6) -import Data.List ((\\)) +import Data.List (sortBy, sortOn, (\\)) import qualified Data.Map.Strict as Map import Data.Maybe (catMaybes, fromJust, fromMaybe, isJust, isNothing, mapMaybe) @@ -123,7 +126,9 @@ fediChordInit initConf serviceRunner = do fediThreadsAsync <- if confEnableKChoices initConf then -- TODO: k-choices way of joining - async (fediMainThreads serverSock realNodeSTM) + -- placeholder + runExceptT (kChoicesNodeJoin realNodeSTM ("foo", fromIntegral 3)) + >> async (fediMainThreads serverSock realNodeSTM) else do -- without k-choices, just initialise a single vserver firstVS <- nodeStateInit realNodeSTM 0 @@ -148,6 +153,23 @@ fediChordInit initConf serviceRunner = do joinedState pure (fediThreadsAsync, realNodeSTM) + +-- | Create a new vserver and join it through a provided remote node. +-- TODO: Many fediChord* functions already cover parts of this, refactor these to use +-- this function. +fediChordJoinNewVs :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) + => RealNodeSTM s -- ^ parent real node + -> Integer -- ^ vserver ID + -> RemoteNodeState -- ^ target node to join on + -> m (NodeID, LocalNodeStateSTM s) -- ^ on success: (vserver ID, TVar of vserver) +fediChordJoinNewVs nodeSTM vsId target = do + newVs <- liftIO $ nodeStateInit nodeSTM vsId + newVsSTM <- liftIO $ newTVarIO newVs + liftIO . putStrLn $ "Trying to join on " <> show (getNid target) + liftIO $ putStrLn "send a Join" + _ <- liftIO . requestJoin target $ newVsSTM + pure (getNid newVs, newVsSTM) + -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s) @@ -178,6 +200,114 @@ nodeStateInit realNodeSTM vsID' = do } pure initialState + +kChoicesNodeJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) + => RealNodeSTM s + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> m () +kChoicesNodeJoin nodeSTM bootstrapNode = do + node <- liftIO $ readTVarIO nodeSTM + -- use vserver 0 as origin of bootstrapping messages + vs0 <- liftIO $ nodeStateInit nodeSTM 0 + vs0STM <- liftIO $ newTVarIO vs0 + + ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node + let + conf = nodeConfig node + -- T_a of k-choices + -- compute load target + joinLoadTarget = totalCapacity ownLoadStats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 + initialJoins = confKChoicesMaxVS conf `div` 2 + -- edge case: however small the target is, at least join 1 vs + -- kCoicesVsJoin until target is met + joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins) nodeSTM + liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node' + { vservers = HMap.union (vservers node') joinedVss } + pure () + + where + vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) + => LocalNodeStateSTM s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s) + vsJoins _ _ vsmap _ 0 _ = pure vsmap + vsJoins queryVsSTM capacity vsmap remainingTargetLoad remainingJoins nodeSTM' + | remainingTargetLoad <= 0 = pure vsmap + | otherwise = (do + + (acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVsSTM bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad + -- on successful vserver join add the new VS to node and recurse + vsJoins queryVsSTM capacity (HMap.insert newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM' + ) + -- TODO: decide on whether and how to catch errors + -- error cause 1: not a single queried node has responded -> indicates permanent failure + -- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen + -- target even at the next attempt again + -- `catchError` (const . + +kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) + => LocalNodeStateSTM s -- ^ vserver to be used for querying + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> Double -- ^ own capacity + -> VSMap s -- ^ currently active VServers + -> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver + -> Double -- ^ remaining load target + -> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver +kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarget = do + conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM) + -- generate all possible vs IDs + let + activeVsSet = HMap.keysSet activeVss + -- tuples of node IDs and vserver IDs, because vserver IDs are needed for + -- LocalNodeState creation + nonJoinedIDs = filter (not . flip HSet.member activeVsSet . fst) [ (genNodeID (confIP conf) (confDomain conf) (fromInteger v), v) | v <- [0..confKChoicesMaxVS conf]] + queryVs <- liftIO $ readTVarIO queryVsSTM + + -- query load of all possible segments + -- simplification: treat each load lookup failure as a general unavailability of that segment + -- TODO: retries for transient failures + segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do + lookupResp <- liftIO $ bootstrapQueryId queryVsSTM bootstrapNode vsNid + currentlyResponsible <- liftEither lookupResp + segment <- requestQueryLoad queryVs vsNid currentlyResponsible + pure $ Just (segment, vsId, currentlyResponsible) + -- store segment stats and vserver ID together, so it's clear + -- which vs needs to be joined to acquire this segment + ) `catchError` const (pure Nothing) + ) + + + -- cost calculation and sort by cost + -- edge case: no possible ID has responded + (mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure + . headMay + . sortOn (\(cost, _, _) -> cost) + . fmap (\(segment, vsId, toJoinOn) -> (kChoicesJoinCost remainingTarget capacity segment, vsId, toJoinOn)) + $ segmentLoads + -- join at min cost + joinedNode <- fediChordJoinNewVs nodeSTM vsId toJoinOn + pure (mincost, joinedNode) + + -- Possible optimisation: + -- Instead of sampling all join candidates again at each invocation, querying + -- all segment loads before the first join and trying to re-use these + -- load information can save round trips. + -- possible edge case: detect when joining a subsegment of one already owned + -- joining into own segments => When first joining into segment S1 and then + -- later joining into the subsegment S2, the + -- resulting load l(S1+S2) = l(S1) != l(S1) + l(S2) + -- => need to re-query the load of both S1 and S2 + -- possible edge case 2: taking multiple segments from the same owner + -- changes the remainingLoadTarget at each vsJoin. This target change + -- needs to be accounted for starting from the 2nd vsJoin. + +kChoicesJoinCost :: Double -- ^ own remaining load target + -> Double -- ^ own capacity + -> SegmentLoadStats + -> Double +kChoicesJoinCost remainingOwnLoad ownCap segment = + abs (segmentOwnerRemainingLoadTarget segment + segmentLoad segment) / segmentOwnerCapacity segment + + abs (remainingOwnLoad - segmentLoad segment) / ownCap + - abs (segmentOwnerRemainingLoadTarget segment) / segmentOwnerCapacity segment + -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- for resolving the new node's position. fediChordBootstrapJoin :: Service s (RealNodeSTM s) @@ -277,8 +407,7 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache resp - currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure currentlyResponsible + runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns -- | join a node to the DHT using the global node cache @@ -296,6 +425,7 @@ fediChordVserverJoin nsSTM = do joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult + fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do -- TODO: deal with failure of all successors, e.g. by invoking a stabilise diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index d8b9ce2..2ddcaf2 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -18,8 +18,10 @@ module Hash2Pub.FediChordTypes , RemoteNodeState (..) , RealNode (..) , RealNodeSTM + , VSMap , LoadStats (..) , emptyLoadStats + , remainingLoadTarget , SegmentLoadStats (..) , setSuccessors , setPredecessors @@ -153,7 +155,7 @@ a `localCompare` b -- Also contains shared data and config values. -- TODO: more data structures for k-choices bookkeeping data RealNode s = RealNode - { vservers :: HashMap NodeID (LocalNodeStateSTM s) + { vservers :: VSMap s -- ^ map of all active VServer node IDs to their node state , nodeConfig :: FediChordConf -- ^ holds the initial configuration read at program start @@ -168,6 +170,8 @@ data RealNode s = RealNode , nodeService :: s (RealNodeSTM s) } + +type VSMap s = HashMap NodeID (LocalNodeStateSTM s) type RealNodeSTM s = TVar (RealNode s) -- | represents a node and all its important state @@ -438,30 +442,47 @@ data FediChordConf = FediChordConf -- ^ how often re-sending a timed-out request can be retried , confEnableKChoices :: Bool -- ^ whether to enable k-choices load balancing + , confKChoicesOverload :: Double + -- ^ fraction of capacity above which a node considers itself overloaded + , confKChoicesUnderload :: Double + -- ^ fraction of capacity below which a node considers itself underloaded + , confKChoicesMaxVS :: Integer + -- ^ upper limit of vserver index κ } deriving (Show, Eq) -- ====== k-choices load balancing types ====== data LoadStats = LoadStats - { loadPerTag :: RingMap NodeID Double + { loadPerTag :: RingMap NodeID Double -- ^ map of loads for each handled tag - , totalCapacity :: Double + , totalCapacity :: Double -- ^ total designated capacity of the service - , remainingLoadTarget :: Double - -- ^ current mismatch between actual load and target load/capacity + , compensatedLoadSum :: Double + -- ^ effective load reevant for load balancing after compensating for } deriving (Show, Eq) +-- | calculates the mismatch from the target load by taking into account the +-- underload and overload limits +remainingLoadTarget :: FediChordConf -> LoadStats -> Double +remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats + where + targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2 + data SegmentLoadStats = SegmentLoadStats - { segmentLowerKeyBound :: NodeID + { segmentLowerKeyBound :: NodeID -- ^ segment start key - , segmentUpperKeyBound :: NodeID + , segmentUpperKeyBound :: NodeID -- ^ segment end key - , segmentLoad :: Double + , segmentLoad :: Double -- ^ sum of load of all keys in the segment - , segmentOwnerLoadTarget :: Double - -- ^ remaining load target of the current segment handler + , segmentOwnerRemainingLoadTarget :: Double + -- ^ remaining load target of the current segment handler: + , segmentOwnerCapacity :: Double + -- ^ total capacity of the current segment handler node, used for normalisation + , segmentCurrentOwner :: RemoteNodeState + -- ^ the current owner of the segment that needs to be joined on } -- TODO: figure out a better way of initialising @@ -469,7 +490,7 @@ emptyLoadStats :: LoadStats emptyLoadStats = LoadStats { loadPerTag = emptyRMap , totalCapacity = 0 - , remainingLoadTarget = 0 + , compensatedLoadSum = 0 } -- ====== Service Types ============ diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index a02d1d7..f1376e4 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -890,12 +890,13 @@ evaluateLoadStats currentStats currentSubscribers = do ) (0, emptyRMap) $ rMapToListWithKeys currentSubscribers - -- TODO: use underload and overload limits instead of capacity let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats pure LoadStats { loadPerTag = loadPerTag' , totalCapacity = totalCapacity' - , remainingLoadTarget = remainingLoadTarget' + -- load caused by post fetches cannot be influenced by re-balancing nodes, + -- but still reduces the totally available capacity + , compensatedLoadSum = loadSum + postFetchRate currentStats } diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index b5438fa..b5ce0a9 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -82,6 +82,7 @@ data ActionPayload = QueryIDRequestPayload | LoadResponsePayload { loadSum :: Double , loadRemainingTarget :: Double + , loadTotalCapacity :: Double , loadSegmentLowerBound :: NodeID } deriving (Show, Eq)