diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 7be7ecf..92ec096 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting - ghc-options: -Wall -Wpartial-fields -O2 + ghc-options: -Wall -Wpartial-fields diff --git a/app/Experiment.hs b/app/Experiment.hs index a999dea..ffa8869 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -33,7 +33,7 @@ parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines where parseEntry [delayT, contactT, tag] = (read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT) - parseEntry entry = error $ "invalid schedule input format: " <> show entry + parseEntry _ = error "invalid schedule input format" executeSchedule :: Int -- ^ speedup factor -> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))] diff --git a/app/Main.hs b/app/Main.hs index d7be0a5..a620fe8 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -69,12 +69,12 @@ readConfig = do , confRequestRetries = 3 } sConf = ServiceConf - { confSubscriptionExpiryTime = fromIntegral 12*3600 / fromIntegral speedup + { confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup , confServicePort = read servicePortString , confServiceHost = confDomainString , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" , confSpeedupFactor = speedup - , confStatsEvalDelay = 120 * 10^6 `div` speedup + , confStatsEvalDelay = 35 * 10^6 `div` speedup } pure (fConf, sConf) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 69f1b13..bb94e86 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -36,7 +36,7 @@ import System.IO import System.Random import Text.Read (readEither) -import Formatting (fixed, format, int, (%)) +import Formatting (fixed, float, format, (%)) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client @@ -67,7 +67,6 @@ data PostService d = PostService , httpMan :: HTTP.Manager , statsQueue :: TQueue StatsEvent , loadStats :: TVar RelayStats - -- ^ current load stats, replaced periodically , logFileHandle :: Handle } deriving (Typeable) @@ -121,7 +120,7 @@ instance DHT d => Service PostService d where -- log a start message, this also truncates existing files TxtI.hPutStrLn loggingFile $ Txt.unlines [ "# Starting mock relay implementation" - , "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions" + , "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate" ] -- Run 'concurrently_' from another thread to be able to return the -- 'PostService'. @@ -618,26 +617,8 @@ launchWorkerThreads :: DHT d => PostService d -> IO () launchWorkerThreads serv = concurrently_ (processIncomingPosts serv) $ concurrently_ - (purgeSubscriptionsThread serv) - $ concurrently_ - (fetchTagPosts serv) - (relayWorker serv) - - --- | periodically remove expired subscription entries from relay subscribers -purgeSubscriptionsThread :: PostService d -> IO () -purgeSubscriptionsThread serv = forever $ do - -- read config - now <- getPOSIXTime - let - purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10 - -- no need to atomically lock this, as newly incoming subscriptions do not - -- need to be purged - tagMap <- readTVarIO $ subscribers serv - forM_ tagMap $ \(subscriberMapSTM, _, _) -> - -- but each subscriberMap needs to be modified atomically - atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now) - threadDelay $ fromEnum purgeInterval `div` 10^6 + (fetchTagPosts serv) + (relayWorker serv) -- | process the pending relay inbox of incoming posts from the internal queue: @@ -670,8 +651,8 @@ processIncomingPosts serv = forever $ do -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags now <- getPOSIXTime subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) - -- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag - when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $ + -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag + when (maybe True (\subLease -> now - subLease < 120) subscriptionStatus) $ void $ clientSubscribeTo serv tag -- for evaluation, return the tag of the successfully forwarded post @@ -700,6 +681,7 @@ fetchTagPosts serv = forever $ do --if HTTPT.statusCode (HTTP.responseStatus response) == 200 -- then -- -- success, TODO: statistics + -- putStrLn "post fetch success" -- else pure () Left _ -> @@ -741,7 +723,6 @@ relayWorker serv = forever $ do runningJobs <- mapM async jobset -- so far just dropping failed attempts, TODO: retry mechanism successfulResults <- rights <$> mapM waitCatch runningJobs - putStrLn $ "successfully relayed " <> show (length successfulResults) pure () @@ -837,26 +818,16 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop let rateStats = evaluateStats timePassed summedStats atomically $ writeTVar (loadStats serv) rateStats -- and now what? write a log to file - -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum + -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate -- later: current (reported) load, target load - subscriberSum <- sumSubscribers TxtI.hPutStrLn (logFileHandle serv) $ - format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int ) + format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20) (sum . relayReceiveRates $ rateStats) (sum . relayDeliveryRates $ rateStats) (postPublishRate rateStats) (postFetchRate rateStats) - subscriberSum loop now - sumSubscribers = do - tagMap <- readTVarIO $ subscribers serv - foldM (\subscriberSum (subscriberMapSTM, _, _) -> do - subscriberMap <- readTVarIO subscriberMapSTM - pure $ subscriberSum + HMap.size subscriberMap - ) - 0 tagMap - -- | Evaluate the accumulated statistic events: Currently mostly calculates the event -- rates by dividing through the collection time frame