report number of subscriptions
This commit is contained in:
parent
3c28cde942
commit
a0e7142a7d
|
@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md
|
||||||
|
|
||||||
common deps
|
common deps
|
||||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
|
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
|
||||||
ghc-options: -Wall -Wpartial-fields
|
ghc-options: -Wall -Wpartial-fields -O2
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines
|
||||||
where
|
where
|
||||||
parseEntry [delayT, contactT, tag] =
|
parseEntry [delayT, contactT, tag] =
|
||||||
(read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT)
|
(read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT)
|
||||||
parseEntry _ = error "invalid schedule input format"
|
parseEntry entry = error $ "invalid schedule input format: " <> show entry
|
||||||
|
|
||||||
executeSchedule :: Int -- ^ speedup factor
|
executeSchedule :: Int -- ^ speedup factor
|
||||||
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
|
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]
|
||||||
|
|
|
@ -74,7 +74,7 @@ readConfig = do
|
||||||
, confServiceHost = confDomainString
|
, confServiceHost = confDomainString
|
||||||
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
|
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
|
||||||
, confSpeedupFactor = speedup
|
, confSpeedupFactor = speedup
|
||||||
, confStatsEvalDelay = 35 * 10^6 `div` speedup
|
, confStatsEvalDelay = 120 * 10^6 `div` speedup
|
||||||
}
|
}
|
||||||
pure (fConf, sConf)
|
pure (fConf, sConf)
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ import System.IO
|
||||||
import System.Random
|
import System.Random
|
||||||
import Text.Read (readEither)
|
import Text.Read (readEither)
|
||||||
|
|
||||||
import Formatting (fixed, float, format, (%))
|
import Formatting (fixed, format, int, (%))
|
||||||
import qualified Network.Wai.Handler.Warp as Warp
|
import qualified Network.Wai.Handler.Warp as Warp
|
||||||
import Servant
|
import Servant
|
||||||
import Servant.Client
|
import Servant.Client
|
||||||
|
@ -67,6 +67,7 @@ data PostService d = PostService
|
||||||
, httpMan :: HTTP.Manager
|
, httpMan :: HTTP.Manager
|
||||||
, statsQueue :: TQueue StatsEvent
|
, statsQueue :: TQueue StatsEvent
|
||||||
, loadStats :: TVar RelayStats
|
, loadStats :: TVar RelayStats
|
||||||
|
-- ^ current load stats, replaced periodically
|
||||||
, logFileHandle :: Handle
|
, logFileHandle :: Handle
|
||||||
}
|
}
|
||||||
deriving (Typeable)
|
deriving (Typeable)
|
||||||
|
@ -120,7 +121,7 @@ instance DHT d => Service PostService d where
|
||||||
-- log a start message, this also truncates existing files
|
-- log a start message, this also truncates existing files
|
||||||
TxtI.hPutStrLn loggingFile $ Txt.unlines
|
TxtI.hPutStrLn loggingFile $ Txt.unlines
|
||||||
[ "# Starting mock relay implementation"
|
[ "# Starting mock relay implementation"
|
||||||
, "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate"
|
, "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions"
|
||||||
]
|
]
|
||||||
-- Run 'concurrently_' from another thread to be able to return the
|
-- Run 'concurrently_' from another thread to be able to return the
|
||||||
-- 'PostService'.
|
-- 'PostService'.
|
||||||
|
@ -681,7 +682,6 @@ fetchTagPosts serv = forever $ do
|
||||||
--if HTTPT.statusCode (HTTP.responseStatus response) == 200
|
--if HTTPT.statusCode (HTTP.responseStatus response) == 200
|
||||||
-- then
|
-- then
|
||||||
-- -- success, TODO: statistics
|
-- -- success, TODO: statistics
|
||||||
-- putStrLn "post fetch success"
|
|
||||||
-- else
|
-- else
|
||||||
pure ()
|
pure ()
|
||||||
Left _ ->
|
Left _ ->
|
||||||
|
@ -723,6 +723,7 @@ relayWorker serv = forever $ do
|
||||||
runningJobs <- mapM async jobset
|
runningJobs <- mapM async jobset
|
||||||
-- so far just dropping failed attempts, TODO: retry mechanism
|
-- so far just dropping failed attempts, TODO: retry mechanism
|
||||||
successfulResults <- rights <$> mapM waitCatch runningJobs
|
successfulResults <- rights <$> mapM waitCatch runningJobs
|
||||||
|
putStrLn $ "successfully relayed " <> show (length successfulResults)
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
|
@ -818,16 +819,26 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
|
||||||
let rateStats = evaluateStats timePassed summedStats
|
let rateStats = evaluateStats timePassed summedStats
|
||||||
atomically $ writeTVar (loadStats serv) rateStats
|
atomically $ writeTVar (loadStats serv) rateStats
|
||||||
-- and now what? write a log to file
|
-- and now what? write a log to file
|
||||||
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate
|
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
|
||||||
-- later: current (reported) load, target load
|
-- later: current (reported) load, target load
|
||||||
|
subscriberSum <- sumSubscribers
|
||||||
TxtI.hPutStrLn (logFileHandle serv) $
|
TxtI.hPutStrLn (logFileHandle serv) $
|
||||||
format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20)
|
format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int )
|
||||||
(sum . relayReceiveRates $ rateStats)
|
(sum . relayReceiveRates $ rateStats)
|
||||||
(sum . relayDeliveryRates $ rateStats)
|
(sum . relayDeliveryRates $ rateStats)
|
||||||
(postPublishRate rateStats)
|
(postPublishRate rateStats)
|
||||||
(postFetchRate rateStats)
|
(postFetchRate rateStats)
|
||||||
|
subscriberSum
|
||||||
loop now
|
loop now
|
||||||
|
|
||||||
|
sumSubscribers = do
|
||||||
|
tagMap <- readTVarIO $ subscribers serv
|
||||||
|
foldM (\subscriberSum (subscriberMapSTM, _, _) -> do
|
||||||
|
subscriberMap <- readTVarIO subscriberMapSTM
|
||||||
|
pure $ subscriberSum + HMap.size subscriberMap
|
||||||
|
)
|
||||||
|
0 tagMap
|
||||||
|
|
||||||
|
|
||||||
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
|
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
|
||||||
-- rates by dividing through the collection time frame
|
-- rates by dividing through the collection time frame
|
||||||
|
|
Loading…
Reference in a new issue