split up stats summing and evaluating, launch threads

This commit is contained in:
Trolli Schmittlauch 2020-09-07 16:27:19 +02:00
parent c823e6357a
commit 5c338b9cd7

View file

@ -19,7 +19,7 @@ import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (isJust, fromJust)
import Data.Maybe (fromJust, isJust)
import Data.String (fromString)
import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt
@ -54,9 +54,11 @@ data PostService d = PostService
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
-- ^ queue of posts to be fetched
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats
}
deriving (Typeable)
@ -86,6 +88,7 @@ instance DHT d => Service PostService d where
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats
let
thisService = PostService
{ serviceConf = conf
@ -99,6 +102,7 @@ instance DHT d => Service PostService d where
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
, statsQueue = statsQueue'
, loadStats = loadStats'
}
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
@ -109,7 +113,11 @@ instance DHT d => Service PostService d where
concurrently_
-- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService)
$ concurrently
-- post queue processing
(processIncomingPosts thisService)
-- statistics/ measurements
(launchStatsThreads thisService)
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
@ -606,84 +614,14 @@ fetchTagPosts serv = forever $ do
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
-- ^ initial publishing of a post by an instance
| RelayReceiveEvent
-- ^ receiving of posts because of being the responsible relay, for estimation of \tau_t
-- TODO: record for which hashtag
| RelayDeliveryEvent
-- ^ delivering (or at least attempt) a post to a subscriber
| IncomingPostFetchEvent
-- ^ another instance fetches a post from this instance
deriving (Enum, Show, Eq)
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
data StatsEvent = StatsEvent StatsEventType Int NodeID
-- TODO: make delay configurable
statsAccuDelay = 300000
-- | periodically flush the stats queue and accumulate all events inside
accumulateStatsThread :: TQueue StatsEvent -> IO ()
accumulateStatsThread statsQ = getPOSIXTime >>= flushLoop
where
flushLoop previousRun = do
now <- getPOSIXTime
-- TODO: instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result
-- but how to achieve the periodicity when blocking on a queue?
-- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen")
threadDelay statsAccuDelay
latestEvents <- atomically $ flushTQueue statsQ
-- accumulate the events
-- and now what? write a log to file, probably as a forkIO
-- persistently store in a TVar so it can be retrieved later by the DHT
flushLoop now
accumulateStats :: POSIXTime -> [StatsEvent] -> RelayStats
accumulateStats timeInterval events =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats
, relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12
summedStats = foldl (\stats event -> case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
)
emptyStats
events
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- idea: first just sum with foldl, and then map the time division over all values
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}
-- | measured rates of relay performance
-- TODO: maybe include other metrics in here as well, like number of subscribers?
@ -697,3 +635,98 @@ data RelayStats = RelayStats
, postPublishRate :: Double
-- ^ rate of initially publishing posts through this instance
}
-- TODO: make delay configurable
statsEvalDelay = 300000
launchStatsThreads :: PostService d -> IO ()
launchStatsThreads serv = do
-- create shared accumulator
sharedAccum <- newTVarIO emptyStats
concurrently_
(accumulateStatsThread sharedAccum $ statsQueue serv)
(evaluateStatsThread serv sharedAccum)
-- | Read stats events from queue and add them to a shared accumulator.
-- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result.
accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO ()
accumulateStatsThread statsAccumulator statsQ = forever $ do
-- blocks until stats event arrives
event <- atomically $ readTQueue statsQ
-- add the event number to current accumulator
atomically $ modifyTVar' statsAccumulator $ statsAdder event
-- | add incoming stats events to accumulator value
statsAdder :: StatsEvent -> RelayStats -> RelayStats
statsAdder event stats = case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
where
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- Periodically exchange the accumulated statistics with empty ones, evaluate them
-- and make them the current statistics of the service.
evaluateStatsThread :: PostService d -> TVar RelayStats -> IO ()
evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
where
loop previousTs = do
threadDelay statsEvalDelay
-- get and reset the stats accumulator
summedStats <- atomically $ do
stats <- readTVar statsAcc
writeTVar statsAcc emptyStats
pure stats
-- as the transaction might retry several times, current time needs to
-- be read afterwards
now <- getPOSIXTime
-- evaluate stats rate and replace server stats
atomically . writeTVar (loadStats serv) . evaluateStats (now - previousTs) $ summedStats
-- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen")
-- and now what? write a log to file, probably as a forkIO
-- persistently store in a TVar so it can be retrieved later by the DHT
loop now
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
evaluateStats timeInterval summedStats =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats
, relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
-- TODO: take speedup into account
intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}