implement k-choices join functions

large commit, contains a number of things:
- k-choices #2 cost calculation
- k-choices parameters
- adjusting ASN.1 network messages to contain all values required for
  cost calculation #71
- adjusting stats to contain required values
- k-choices node and vserver join functions
- placeholder/ dummy invocation of k-choices join
This commit is contained in:
Trolli Schmittlauch 2020-09-25 00:42:41 +02:00
parent 62da66aade
commit 3b6d129bfc
8 changed files with 190 additions and 23 deletions

View file

@ -111,6 +111,7 @@ LoadRequestPayload ::= SEQUENCE {
LoadResponsePayload ::= SEQUENCE {
loadSum REAL,
remainingLoadTarget REAL,
totalCapacity REAL,
lowerBound NodeID
}

View file

@ -49,6 +49,9 @@ readConfig = do
, confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3
, confEnableKChoices = loadBalancingEnabled /= "off"
, confKChoicesOverload = 0.9
, confKChoicesUnderload = 0.1
, confKChoicesMaxVS = 8
}
sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup

View file

@ -193,6 +193,7 @@ encodePayload payload'@LoadResponsePayload{} =
[ Start Sequence
, Real $ loadSum payload'
, Real $ loadRemainingTarget payload'
, Real $ loadTotalCapacity payload'
, IntVal . getNodeID $ loadSegmentLowerBound payload'
, End Sequence
]
@ -472,10 +473,12 @@ parseLoadResponsePayload :: ParseASN1 ActionPayload
parseLoadResponsePayload = onNextContainer Sequence $ do
loadSum' <- parseReal
loadRemainingTarget' <- parseReal
loadTotalCapacity' <- parseReal
loadSegmentLowerBound' <- fromInteger <$> parseInteger
pure LoadResponsePayload
{ loadSum = loadSum'
, loadRemainingTarget = loadRemainingTarget'
, loadTotalCapacity = loadTotalCapacity'
, loadSegmentLowerBound = loadSegmentLowerBound'
}

View file

@ -93,6 +93,7 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
getKeyID, localCompare,
rMapFromList, rMapLookupPred,
rMapLookupSucc,
remainingLoadTarget,
setPredecessors, setSuccessors)
import Hash2Pub.ProtocolTypes
import Hash2Pub.RingMap
@ -292,7 +293,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- only when joined
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
QueryLoad -> if isJoined ns then Just <$> respondLoadQuery nsSTM msgSet else pure Nothing
QueryLoad -> if isJoined ns then Just <$> respondQueryLoad nsSTM msgSet else pure Nothing
)
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
@ -433,8 +434,8 @@ respondPing nsSTM msgSet = do
}
pure $ serialiseMessage sendMessageSize pingResponse
respondLoadQuery :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLoadQuery nsSTM msgSet = do
respondQueryLoad :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryLoad nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM
-- this message cannot be split reasonably, so just
-- consider the first payload
@ -444,14 +445,18 @@ respondLoadQuery nsSTM msgSet = do
responsePl <- maybe (pure Nothing) (\pl ->
case pl of
LoadRequestPayload{} -> do
serv <- nodeService <$> readTVarIO (parentRealNode nsSnap)
parentNode <- readTVarIO (parentRealNode nsSnap)
let
serv = nodeService parentNode
conf = nodeConfig parentNode
lStats <- getServiceLoadStats serv
let
directSucc = getNid . head . predecessors $ nsSnap
handledTagSum = sum . takeRMapSuccessorsFromTo directSucc (loadSegmentUpperBound pl) $ loadPerTag lStats
pure $ Just LoadResponsePayload
{ loadSum = handledTagSum
, loadRemainingTarget = remainingLoadTarget lStats
, loadRemainingTarget = remainingLoadTarget conf lStats
, loadTotalCapacity = totalCapacity lStats
, loadSegmentLowerBound = directSucc
}
_ -> pure Nothing
@ -808,7 +813,9 @@ requestQueryLoad ns upperIdBound target = do
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
, segmentUpperKeyBound = upperIdBound
, segmentLoad = loadSum loadResPl
, segmentOwnerLoadTarget = loadRemainingTarget loadResPl
, segmentOwnerRemainingLoadTarget = loadRemainingTarget loadResPl
, segmentOwnerCapacity = loadTotalCapacity loadResPl
, segmentCurrentOwner = target
}

View file

@ -63,6 +63,7 @@ import Control.Exception
import Control.Monad (forM_, forever)
import Control.Monad.Except
import Crypto.Hash
import Data.Bifunctor (first)
import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU
@ -71,9 +72,11 @@ import Data.Foldable (foldr')
import Data.Functor.Identity
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.HashSet (HashSet)
import qualified Data.HashSet as HSet
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.List ((\\))
import Data.List (sortBy, sortOn, (\\))
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, fromJust, fromMaybe,
isJust, isNothing, mapMaybe)
@ -123,7 +126,9 @@ fediChordInit initConf serviceRunner = do
fediThreadsAsync <- if confEnableKChoices initConf
then
-- TODO: k-choices way of joining
async (fediMainThreads serverSock realNodeSTM)
-- placeholder
runExceptT (kChoicesNodeJoin realNodeSTM ("foo", fromIntegral 3))
>> async (fediMainThreads serverSock realNodeSTM)
else do
-- without k-choices, just initialise a single vserver
firstVS <- nodeStateInit realNodeSTM 0
@ -148,6 +153,23 @@ fediChordInit initConf serviceRunner = do
joinedState
pure (fediThreadsAsync, realNodeSTM)
-- | Create a new vserver and join it through a provided remote node.
-- TODO: Many fediChord* functions already cover parts of this, refactor these to use
-- this function.
fediChordJoinNewVs :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> RealNodeSTM s -- ^ parent real node
-> Integer -- ^ vserver ID
-> RemoteNodeState -- ^ target node to join on
-> m (NodeID, LocalNodeStateSTM s) -- ^ on success: (vserver ID, TVar of vserver)
fediChordJoinNewVs nodeSTM vsId target = do
newVs <- liftIO $ nodeStateInit nodeSTM vsId
newVsSTM <- liftIO $ newTVarIO newVs
liftIO . putStrLn $ "Trying to join on " <> show (getNid target)
liftIO $ putStrLn "send a Join"
_ <- liftIO . requestJoin target $ newVsSTM
pure (getNid newVs, newVsSTM)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s)
@ -178,6 +200,114 @@ nodeStateInit realNodeSTM vsID' = do
}
pure initialState
kChoicesNodeJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> RealNodeSTM s
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> m ()
kChoicesNodeJoin nodeSTM bootstrapNode = do
node <- liftIO $ readTVarIO nodeSTM
-- use vserver 0 as origin of bootstrapping messages
vs0 <- liftIO $ nodeStateInit nodeSTM 0
vs0STM <- liftIO $ newTVarIO vs0
ownLoadStats <- liftIO . getServiceLoadStats . nodeService $ node
let
conf = nodeConfig node
-- T_a of k-choices
-- compute load target
joinLoadTarget = totalCapacity ownLoadStats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
initialJoins = confKChoicesMaxVS conf `div` 2
-- edge case: however small the target is, at least join 1 vs
-- kCoicesVsJoin until target is met
joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins) nodeSTM
liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
{ vservers = HMap.union (vservers node') joinedVss }
pure ()
where
vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> LocalNodeStateSTM s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s)
vsJoins _ _ vsmap _ 0 _ = pure vsmap
vsJoins queryVsSTM capacity vsmap remainingTargetLoad remainingJoins nodeSTM'
| remainingTargetLoad <= 0 = pure vsmap
| otherwise = (do
(acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVsSTM bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad
-- on successful vserver join add the new VS to node and recurse
vsJoins queryVsSTM capacity (HMap.insert newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM'
)
-- TODO: decide on whether and how to catch errors
-- error cause 1: not a single queried node has responded -> indicates permanent failure
-- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen
-- target even at the next attempt again
-- `catchError` (const .
kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> LocalNodeStateSTM s -- ^ vserver to be used for querying
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> Double -- ^ own capacity
-> VSMap s -- ^ currently active VServers
-> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver
-> Double -- ^ remaining load target
-> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver
kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarget = do
conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM)
-- generate all possible vs IDs
let
activeVsSet = HMap.keysSet activeVss
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
-- LocalNodeState creation
nonJoinedIDs = filter (not . flip HSet.member activeVsSet . fst) [ (genNodeID (confIP conf) (confDomain conf) (fromInteger v), v) | v <- [0..confKChoicesMaxVS conf]]
queryVs <- liftIO $ readTVarIO queryVsSTM
-- query load of all possible segments
-- simplification: treat each load lookup failure as a general unavailability of that segment
-- TODO: retries for transient failures
segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
lookupResp <- liftIO $ bootstrapQueryId queryVsSTM bootstrapNode vsNid
currentlyResponsible <- liftEither lookupResp
segment <- requestQueryLoad queryVs vsNid currentlyResponsible
pure $ Just (segment, vsId, currentlyResponsible)
-- store segment stats and vserver ID together, so it's clear
-- which vs needs to be joined to acquire this segment
) `catchError` const (pure Nothing)
)
-- cost calculation and sort by cost
-- edge case: no possible ID has responded
(mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure
. headMay
. sortOn (\(cost, _, _) -> cost)
. fmap (\(segment, vsId, toJoinOn) -> (kChoicesJoinCost remainingTarget capacity segment, vsId, toJoinOn))
$ segmentLoads
-- join at min cost
joinedNode <- fediChordJoinNewVs nodeSTM vsId toJoinOn
pure (mincost, joinedNode)
-- Possible optimisation:
-- Instead of sampling all join candidates again at each invocation, querying
-- all segment loads before the first join and trying to re-use these
-- load information can save round trips.
-- possible edge case: detect when joining a subsegment of one already owned
-- joining into own segments => When first joining into segment S1 and then
-- later joining into the subsegment S2, the
-- resulting load l(S1+S2) = l(S1) != l(S1) + l(S2)
-- => need to re-query the load of both S1 and S2
-- possible edge case 2: taking multiple segments from the same owner
-- changes the remainingLoadTarget at each vsJoin. This target change
-- needs to be accounted for starting from the 2nd vsJoin.
kChoicesJoinCost :: Double -- ^ own remaining load target
-> Double -- ^ own capacity
-> SegmentLoadStats
-> Double
kChoicesJoinCost remainingOwnLoad ownCap segment =
abs (segmentOwnerRemainingLoadTarget segment + segmentLoad segment) / segmentOwnerCapacity segment
+ abs (remainingOwnLoad - segmentLoad segment) / ownCap
- abs (segmentOwnerRemainingLoadTarget segment) / segmentOwnerCapacity segment
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position.
fediChordBootstrapJoin :: Service s (RealNodeSTM s)
@ -277,8 +407,7 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
)
initCache resp
currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure currentlyResponsible
runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
-- | join a node to the DHT using the global node cache
@ -296,6 +425,7 @@ fediChordVserverJoin nsSTM = do
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do
-- TODO: deal with failure of all successors, e.g. by invoking a stabilise

View file

@ -18,8 +18,10 @@ module Hash2Pub.FediChordTypes
, RemoteNodeState (..)
, RealNode (..)
, RealNodeSTM
, VSMap
, LoadStats (..)
, emptyLoadStats
, remainingLoadTarget
, SegmentLoadStats (..)
, setSuccessors
, setPredecessors
@ -153,7 +155,7 @@ a `localCompare` b
-- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping
data RealNode s = RealNode
{ vservers :: HashMap NodeID (LocalNodeStateSTM s)
{ vservers :: VSMap s
-- ^ map of all active VServer node IDs to their node state
, nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start
@ -168,6 +170,8 @@ data RealNode s = RealNode
, nodeService :: s (RealNodeSTM s)
}
type VSMap s = HashMap NodeID (LocalNodeStateSTM s)
type RealNodeSTM s = TVar (RealNode s)
-- | represents a node and all its important state
@ -438,30 +442,47 @@ data FediChordConf = FediChordConf
-- ^ how often re-sending a timed-out request can be retried
, confEnableKChoices :: Bool
-- ^ whether to enable k-choices load balancing
, confKChoicesOverload :: Double
-- ^ fraction of capacity above which a node considers itself overloaded
, confKChoicesUnderload :: Double
-- ^ fraction of capacity below which a node considers itself underloaded
, confKChoicesMaxVS :: Integer
-- ^ upper limit of vserver index κ
}
deriving (Show, Eq)
-- ====== k-choices load balancing types ======
data LoadStats = LoadStats
{ loadPerTag :: RingMap NodeID Double
{ loadPerTag :: RingMap NodeID Double
-- ^ map of loads for each handled tag
, totalCapacity :: Double
, totalCapacity :: Double
-- ^ total designated capacity of the service
, remainingLoadTarget :: Double
-- ^ current mismatch between actual load and target load/capacity
, compensatedLoadSum :: Double
-- ^ effective load reevant for load balancing after compensating for
}
deriving (Show, Eq)
-- | calculates the mismatch from the target load by taking into account the
-- underload and overload limits
remainingLoadTarget :: FediChordConf -> LoadStats -> Double
remainingLoadTarget conf lstats = targetLoad - compensatedLoadSum lstats
where
targetLoad = totalCapacity lstats * (confKChoicesUnderload conf + confKChoicesOverload conf) / 2
data SegmentLoadStats = SegmentLoadStats
{ segmentLowerKeyBound :: NodeID
{ segmentLowerKeyBound :: NodeID
-- ^ segment start key
, segmentUpperKeyBound :: NodeID
, segmentUpperKeyBound :: NodeID
-- ^ segment end key
, segmentLoad :: Double
, segmentLoad :: Double
-- ^ sum of load of all keys in the segment
, segmentOwnerLoadTarget :: Double
-- ^ remaining load target of the current segment handler
, segmentOwnerRemainingLoadTarget :: Double
-- ^ remaining load target of the current segment handler:
, segmentOwnerCapacity :: Double
-- ^ total capacity of the current segment handler node, used for normalisation
, segmentCurrentOwner :: RemoteNodeState
-- ^ the current owner of the segment that needs to be joined on
}
-- TODO: figure out a better way of initialising
@ -469,7 +490,7 @@ emptyLoadStats :: LoadStats
emptyLoadStats = LoadStats
{ loadPerTag = emptyRMap
, totalCapacity = 0
, remainingLoadTarget = 0
, compensatedLoadSum = 0
}
-- ====== Service Types ============

View file

@ -890,12 +890,13 @@ evaluateLoadStats currentStats currentSubscribers = do
)
(0, emptyRMap)
$ rMapToListWithKeys currentSubscribers
-- TODO: use underload and overload limits instead of capacity
let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats
pure LoadStats
{ loadPerTag = loadPerTag'
, totalCapacity = totalCapacity'
, remainingLoadTarget = remainingLoadTarget'
-- load caused by post fetches cannot be influenced by re-balancing nodes,
-- but still reduces the totally available capacity
, compensatedLoadSum = loadSum + postFetchRate currentStats
}

View file

@ -82,6 +82,7 @@ data ActionPayload = QueryIDRequestPayload
| LoadResponsePayload
{ loadSum :: Double
, loadRemainingTarget :: Double
, loadTotalCapacity :: Double
, loadSegmentLowerBound :: NodeID
}
deriving (Show, Eq)