accumulate all statistic/ measurement events to a measurement summary
- RingMap can now be mapped over
This commit is contained in:
parent
4d2d6faf1b
commit
c823e6357a
|
@ -19,7 +19,7 @@ import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
import qualified Data.HashMap.Strict as HMap
|
import qualified Data.HashMap.Strict as HMap
|
||||||
import qualified Data.HashSet as HSet
|
import qualified Data.HashSet as HSet
|
||||||
import Data.Maybe (isJust)
|
import Data.Maybe (isJust, fromJust)
|
||||||
import Data.String (fromString)
|
import Data.String (fromString)
|
||||||
import Data.Text.Lazy (Text)
|
import Data.Text.Lazy (Text)
|
||||||
import qualified Data.Text.Lazy as Txt
|
import qualified Data.Text.Lazy as Txt
|
||||||
|
@ -56,6 +56,7 @@ data PostService d = PostService
|
||||||
, postFetchQueue :: TQueue PostID
|
, postFetchQueue :: TQueue PostID
|
||||||
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
|
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
|
||||||
, httpMan :: HTTP.Manager
|
, httpMan :: HTTP.Manager
|
||||||
|
, statsQueue :: TQueue StatsEvent
|
||||||
}
|
}
|
||||||
deriving (Typeable)
|
deriving (Typeable)
|
||||||
|
|
||||||
|
@ -84,9 +85,10 @@ instance DHT d => Service PostService d where
|
||||||
postFetchQueue' <- newTQueueIO
|
postFetchQueue' <- newTQueueIO
|
||||||
migrationsInProgress' <- newTVarIO HMap.empty
|
migrationsInProgress' <- newTVarIO HMap.empty
|
||||||
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
||||||
|
statsQueue' <- newTQueueIO
|
||||||
let
|
let
|
||||||
thisService = PostService {
|
thisService = PostService
|
||||||
serviceConf = conf
|
{ serviceConf = conf
|
||||||
, baseDHT = dht
|
, baseDHT = dht
|
||||||
, serviceThread = threadVar
|
, serviceThread = threadVar
|
||||||
, subscribers = subscriberVar
|
, subscribers = subscriberVar
|
||||||
|
@ -96,7 +98,8 @@ instance DHT d => Service PostService d where
|
||||||
, postFetchQueue = postFetchQueue'
|
, postFetchQueue = postFetchQueue'
|
||||||
, migrationsInProgress = migrationsInProgress'
|
, migrationsInProgress = migrationsInProgress'
|
||||||
, httpMan = httpMan'
|
, httpMan = httpMan'
|
||||||
}
|
, statsQueue = statsQueue'
|
||||||
|
}
|
||||||
port' = fromIntegral (confServicePort conf)
|
port' = fromIntegral (confServicePort conf)
|
||||||
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
||||||
-- Run 'concurrently_' from another thread to be able to return the
|
-- Run 'concurrently_' from another thread to be able to return the
|
||||||
|
@ -599,3 +602,98 @@ fetchTagPosts serv = forever $ do
|
||||||
-- TODO error handling, retry
|
-- TODO error handling, retry
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
|
-- ======= statistics/measurement and logging =======
|
||||||
|
|
||||||
|
data StatsEventType = PostPublishEvent
|
||||||
|
-- ^ initial publishing of a post by an instance
|
||||||
|
| RelayReceiveEvent
|
||||||
|
-- ^ receiving of posts because of being the responsible relay, for estimation of \tau_t
|
||||||
|
-- TODO: record for which hashtag
|
||||||
|
| RelayDeliveryEvent
|
||||||
|
-- ^ delivering (or at least attempt) a post to a subscriber
|
||||||
|
| IncomingPostFetchEvent
|
||||||
|
-- ^ another instance fetches a post from this instance
|
||||||
|
deriving (Enum, Show, Eq)
|
||||||
|
|
||||||
|
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
|
||||||
|
data StatsEvent = StatsEvent StatsEventType Int NodeID
|
||||||
|
|
||||||
|
-- TODO: make delay configurable
|
||||||
|
statsAccuDelay = 300000
|
||||||
|
|
||||||
|
-- | periodically flush the stats queue and accumulate all events inside
|
||||||
|
accumulateStatsThread :: TQueue StatsEvent -> IO ()
|
||||||
|
accumulateStatsThread statsQ = getPOSIXTime >>= flushLoop
|
||||||
|
where
|
||||||
|
flushLoop previousRun = do
|
||||||
|
now <- getPOSIXTime
|
||||||
|
-- TODO: instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result
|
||||||
|
-- but how to achieve the periodicity when blocking on a queue?
|
||||||
|
-- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen")
|
||||||
|
threadDelay statsAccuDelay
|
||||||
|
latestEvents <- atomically $ flushTQueue statsQ
|
||||||
|
-- accumulate the events
|
||||||
|
-- and now what? write a log to file, probably as a forkIO
|
||||||
|
-- persistently store in a TVar so it can be retrieved later by the DHT
|
||||||
|
flushLoop now
|
||||||
|
|
||||||
|
|
||||||
|
accumulateStats :: POSIXTime -> [StatsEvent] -> RelayStats
|
||||||
|
accumulateStats timeInterval events =
|
||||||
|
-- first sum all event numbers, then divide through number of seconds passed to
|
||||||
|
-- get rate per second
|
||||||
|
RelayStats
|
||||||
|
{ relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats
|
||||||
|
, relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats
|
||||||
|
, postPublishRate = postPublishRate summedStats / intervalSeconds
|
||||||
|
, postFetchRate = postFetchRate summedStats / intervalSeconds
|
||||||
|
}
|
||||||
|
where
|
||||||
|
intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12
|
||||||
|
summedStats = foldl (\stats event -> case event of
|
||||||
|
StatsEvent PostPublishEvent num _ ->
|
||||||
|
stats {postPublishRate = fromIntegral num + postPublishRate stats}
|
||||||
|
StatsEvent RelayReceiveEvent num key ->
|
||||||
|
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
|
||||||
|
StatsEvent RelayDeliveryEvent num key ->
|
||||||
|
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
|
||||||
|
StatsEvent IncomingPostFetchEvent num _ ->
|
||||||
|
stats {postFetchRate = fromIntegral num + postFetchRate stats}
|
||||||
|
)
|
||||||
|
emptyStats
|
||||||
|
events
|
||||||
|
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
|
||||||
|
let toInsert = fromJust $ extractRingEntry newVal
|
||||||
|
in
|
||||||
|
case oldVal of
|
||||||
|
KeyEntry n -> KeyEntry (n + toInsert)
|
||||||
|
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
|
||||||
|
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
|
||||||
|
_ -> error "RingMap nested too deeply"
|
||||||
|
)
|
||||||
|
|
||||||
|
-- idea: first just sum with foldl, and then map the time division over all values
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
emptyStats :: RelayStats
|
||||||
|
emptyStats = RelayStats
|
||||||
|
{ relayReceiveRates = emptyRMap
|
||||||
|
, relayDeliveryRates = emptyRMap
|
||||||
|
, postFetchRate = 0
|
||||||
|
, postPublishRate = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
-- | measured rates of relay performance
|
||||||
|
-- TODO: maybe include other metrics in here as well, like number of subscribers?
|
||||||
|
data RelayStats = RelayStats
|
||||||
|
{ relayReceiveRates :: RingMap NodeID Double
|
||||||
|
-- ^ rate of incoming posts in the responsibility of this relay
|
||||||
|
, relayDeliveryRates :: RingMap NodeID Double
|
||||||
|
-- ^ rate of relayed outgoing posts
|
||||||
|
, postFetchRate :: Double -- no need to differentiate between tags
|
||||||
|
-- ^ number of post-fetches delivered
|
||||||
|
, postPublishRate :: Double
|
||||||
|
-- ^ rate of initially publishing posts through this instance
|
||||||
|
}
|
||||||
|
|
|
@ -133,7 +133,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k)
|
||||||
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
|
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
|
||||||
|
|
||||||
addRMapEntryWith :: (Bounded k, Ord k)
|
addRMapEntryWith :: (Bounded k, Ord k)
|
||||||
=> (RingEntry k a -> RingEntry k a -> RingEntry k a)
|
=> (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value
|
||||||
-> k -- ^ key
|
-> k -- ^ key
|
||||||
-> a -- ^ value
|
-> a -- ^ value
|
||||||
-> RingMap k a
|
-> RingMap k a
|
||||||
|
@ -247,3 +247,14 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
|
||||||
-> RingMap k a
|
-> RingMap k a
|
||||||
-> [a]
|
-> [a]
|
||||||
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
|
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
|
||||||
|
|
||||||
|
|
||||||
|
-- | map a function over all payload values of a 'RingMap'
|
||||||
|
mapRMap :: (Bounded k, Ord k, Num k)
|
||||||
|
=> (a -> b) -> RingMap k a -> RingMap k b
|
||||||
|
mapRMap f = RingMap . Map.map traversingF . getRingMap
|
||||||
|
where
|
||||||
|
--traversingF :: RingEntry k a -> RingEntry k b
|
||||||
|
traversingF (KeyEntry a) = KeyEntry (f a)
|
||||||
|
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
|
||||||
|
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
|
||||||
|
|
Loading…
Reference in a new issue