calculate service load rates, interface for querying loads
- define data type for load representation - this representation can be queried from any Service (closes #72) - loads are periodically calculated from measured rates (contributes to #2)
This commit is contained in:
parent
7dd7e96cce
commit
576ea2c3f6
|
@ -7,8 +7,8 @@
|
|||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
|
||||
module Hash2Pub.FediChordTypes (
|
||||
NodeID -- abstract, but newtype constructors cannot be hidden
|
||||
module Hash2Pub.FediChordTypes
|
||||
( NodeID -- abstract, but newtype constructors cannot be hidden
|
||||
, idBits
|
||||
, getNodeID
|
||||
, toNodeID
|
||||
|
@ -18,6 +18,8 @@ module Hash2Pub.FediChordTypes (
|
|||
, RemoteNodeState (..)
|
||||
, RealNode (..)
|
||||
, RealNodeSTM
|
||||
, LoadStats (..)
|
||||
, emptyLoadStats
|
||||
, setSuccessors
|
||||
, setPredecessors
|
||||
, NodeCache
|
||||
|
@ -60,7 +62,7 @@ module Hash2Pub.FediChordTypes (
|
|||
, DHT(..)
|
||||
, Service(..)
|
||||
, ServiceConf(..)
|
||||
) where
|
||||
) where
|
||||
|
||||
import Control.Exception
|
||||
import Data.Foldable (foldr')
|
||||
|
@ -430,6 +432,23 @@ data FediChordConf = FediChordConf
|
|||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- ====== k-choices load balancing types ======
|
||||
|
||||
data LoadStats = LoadStats
|
||||
{ loadPerTag :: RingMap NodeID Double
|
||||
, totalCapacity :: Double
|
||||
, remainingLoadTarget :: Double
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- TODO: figure out a better way of initialising
|
||||
emptyLoadStats :: LoadStats
|
||||
emptyLoadStats = LoadStats
|
||||
{ loadPerTag = emptyRMap
|
||||
, totalCapacity = 0
|
||||
, remainingLoadTarget = 0
|
||||
}
|
||||
|
||||
-- ====== Service Types ============
|
||||
|
||||
class Service s d where
|
||||
|
@ -445,6 +464,7 @@ class Service s d where
|
|||
-> IO (Either String ()) -- ^ success or failure
|
||||
-- | Wait for an incoming migration from a given node to succeed, may block forever
|
||||
waitForMigrationFrom :: s d -> NodeID -> IO ()
|
||||
getServiceLoadStats :: s d -> IO LoadStats
|
||||
|
||||
instance Hashable.Hashable NodeID where
|
||||
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
||||
|
|
|
@ -22,7 +22,7 @@ import qualified Data.DList as D
|
|||
import Data.Either (lefts, rights)
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import qualified Data.HashSet as HSet
|
||||
import Data.Maybe (fromJust, isJust)
|
||||
import Data.Maybe (fromJust, fromMaybe, isJust)
|
||||
import Data.String (fromString)
|
||||
import Data.Text.Lazy (Text)
|
||||
import qualified Data.Text.Lazy as Txt
|
||||
|
@ -64,8 +64,10 @@ data PostService d = PostService
|
|||
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
|
||||
, httpMan :: HTTP.Manager
|
||||
, statsQueue :: TQueue StatsEvent
|
||||
, loadStats :: TVar RelayStats
|
||||
-- ^ current load stats, replaced periodically
|
||||
, relayStats :: TVar RelayStats
|
||||
-- ^ current relay stats, replaced periodically
|
||||
, loadStats :: TVar LoadStats
|
||||
-- ^ current load values of the relay, replaced periodically and used by
|
||||
, logFileHandle :: Handle
|
||||
}
|
||||
deriving (Typeable)
|
||||
|
@ -96,7 +98,8 @@ instance DHT d => Service PostService d where
|
|||
migrationsInProgress' <- newTVarIO HMap.empty
|
||||
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
||||
statsQueue' <- newTQueueIO
|
||||
loadStats' <- newTVarIO emptyStats
|
||||
relayStats' <- newTVarIO emptyStats
|
||||
loadStats' <- newTVarIO emptyLoadStats
|
||||
loggingFile <- openFile (confLogfilePath conf) WriteMode
|
||||
hSetBuffering loggingFile LineBuffering
|
||||
let
|
||||
|
@ -112,6 +115,7 @@ instance DHT d => Service PostService d where
|
|||
, migrationsInProgress = migrationsInProgress'
|
||||
, httpMan = httpMan'
|
||||
, statsQueue = statsQueue'
|
||||
, relayStats = relayStats'
|
||||
, loadStats = loadStats'
|
||||
, logFileHandle = loggingFile
|
||||
}
|
||||
|
@ -153,6 +157,12 @@ instance DHT d => Service PostService d where
|
|||
-- block until migration finished
|
||||
takeMVar migrationSynchroniser
|
||||
|
||||
getServiceLoadStats = getLoadStats
|
||||
|
||||
|
||||
getLoadStats :: PostService d -> IO LoadStats
|
||||
getLoadStats serv = readTVarIO $ loadStats serv
|
||||
|
||||
|
||||
-- | return a WAI application
|
||||
postServiceApplication :: DHT d => PostService d -> Application
|
||||
|
@ -835,7 +845,12 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
|
|||
-- persistently store in a TVar so it can be retrieved later by the DHT
|
||||
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
|
||||
rateStats = evaluateStats timePassed summedStats
|
||||
atomically $ writeTVar (loadStats serv) rateStats
|
||||
currentSubscribers <- readTVarIO $ subscribers serv
|
||||
-- translate the rate statistics to load values
|
||||
loads <- evaluateLoadStats rateStats currentSubscribers
|
||||
atomically $
|
||||
writeTVar (relayStats serv) rateStats
|
||||
>> writeTVar (loadStats serv) loads
|
||||
-- and now what? write a log to file
|
||||
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
|
||||
-- later: current (reported) load, target load
|
||||
|
@ -859,6 +874,32 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
|
|||
0 tagMap
|
||||
|
||||
|
||||
-- | calculate load values from rate statistics
|
||||
evaluateLoadStats :: RelayStats -> RelayTags -> IO LoadStats
|
||||
evaluateLoadStats currentStats currentSubscribers = do
|
||||
-- load caused by each tag: incomingPostRate * ( 1 + subscribers)
|
||||
-- calculate remaining load target: post publish rate * 2.5 - sum loadPerTag - postFetchRate
|
||||
let
|
||||
totalCapacity' = 2.5 * postPublishRate currentStats
|
||||
(loadSum, loadPerTag') <- foldM (\(loadSum, loadPerTag') (key, (subscriberMapSTM, _, _)) -> do
|
||||
numSubscribers <- HMap.size <$> readTVarIO subscriberMapSTM
|
||||
let
|
||||
thisTagRate = fromMaybe 0 $ rMapLookup key (relayReceiveRates currentStats)
|
||||
thisTagLoad = thisTagRate * (1 + fromIntegral numSubscribers)
|
||||
pure (loadSum + thisTagLoad, addRMapEntry key thisTagLoad loadPerTag')
|
||||
)
|
||||
(0, emptyRMap)
|
||||
$ rMapToListWithKeys currentSubscribers
|
||||
-- TODO: use underload and overload limits instead of capacity
|
||||
let remainingLoadTarget' = totalCapacity' - loadSum - postFetchRate currentStats
|
||||
pure LoadStats
|
||||
{ loadPerTag = loadPerTag'
|
||||
, totalCapacity = totalCapacity'
|
||||
, remainingLoadTarget = remainingLoadTarget'
|
||||
}
|
||||
|
||||
|
||||
|
||||
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
|
||||
-- rates by dividing through the collection time frame
|
||||
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
|
||||
|
|
Loading…
Reference in a new issue