From a0e7142a7d8f8eff19469d71d4d07b8c8fc29021 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 14 Sep 2020 14:57:25 +0200 Subject: [PATCH] report number of subscriptions --- Hash2Pub.cabal | 2 +- app/Experiment.hs | 2 +- app/Main.hs | 2 +- src/Hash2Pub/PostService.hs | 21 ++++++++++++++++----- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 92ec096..7be7ecf 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting - ghc-options: -Wall -Wpartial-fields + ghc-options: -Wall -Wpartial-fields -O2 diff --git a/app/Experiment.hs b/app/Experiment.hs index ffa8869..a999dea 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -33,7 +33,7 @@ parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines where parseEntry [delayT, contactT, tag] = (read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT) - parseEntry _ = error "invalid schedule input format" + parseEntry entry = error $ "invalid schedule input format: " <> show entry executeSchedule :: Int -- ^ speedup factor -> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))] diff --git a/app/Main.hs b/app/Main.hs index a620fe8..c10e0c8 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -74,7 +74,7 @@ readConfig = do , confServiceHost = confDomainString , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" , confSpeedupFactor = speedup - , confStatsEvalDelay = 35 * 10^6 `div` speedup + , confStatsEvalDelay = 120 * 10^6 `div` speedup } pure (fConf, sConf) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index bb94e86..f2a8a18 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -36,7 +36,7 @@ import System.IO import System.Random import Text.Read (readEither) -import Formatting (fixed, float, format, (%)) +import Formatting (fixed, format, int, (%)) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client @@ -67,6 +67,7 @@ data PostService d = PostService , httpMan :: HTTP.Manager , statsQueue :: TQueue StatsEvent , loadStats :: TVar RelayStats + -- ^ current load stats, replaced periodically , logFileHandle :: Handle } deriving (Typeable) @@ -120,7 +121,7 @@ instance DHT d => Service PostService d where -- log a start message, this also truncates existing files TxtI.hPutStrLn loggingFile $ Txt.unlines [ "# Starting mock relay implementation" - , "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate" + , "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions" ] -- Run 'concurrently_' from another thread to be able to return the -- 'PostService'. @@ -681,7 +682,6 @@ fetchTagPosts serv = forever $ do --if HTTPT.statusCode (HTTP.responseStatus response) == 200 -- then -- -- success, TODO: statistics - -- putStrLn "post fetch success" -- else pure () Left _ -> @@ -723,6 +723,7 @@ relayWorker serv = forever $ do runningJobs <- mapM async jobset -- so far just dropping failed attempts, TODO: retry mechanism successfulResults <- rights <$> mapM waitCatch runningJobs + putStrLn $ "successfully relayed " <> show (length successfulResults) pure () @@ -818,16 +819,26 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop let rateStats = evaluateStats timePassed summedStats atomically $ writeTVar (loadStats serv) rateStats -- and now what? write a log to file - -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate + -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum -- later: current (reported) load, target load + subscriberSum <- sumSubscribers TxtI.hPutStrLn (logFileHandle serv) $ - format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20) + format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int ) (sum . relayReceiveRates $ rateStats) (sum . relayDeliveryRates $ rateStats) (postPublishRate rateStats) (postFetchRate rateStats) + subscriberSum loop now + sumSubscribers = do + tagMap <- readTVarIO $ subscribers serv + foldM (\subscriberSum (subscriberMapSTM, _, _) -> do + subscriberMap <- readTVarIO subscriberMapSTM + pure $ subscriberSum + HMap.size subscriberMap + ) + 0 tagMap + -- | Evaluate the accumulated statistic events: Currently mostly calculates the event -- rates by dividing through the collection time frame