Compare commits

...

2 commits

4 changed files with 42 additions and 13 deletions

View file

@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
ghc-options: -Wall -Wpartial-fields ghc-options: -Wall -Wpartial-fields -O2

View file

@ -33,7 +33,7 @@ parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines
where where
parseEntry [delayT, contactT, tag] = parseEntry [delayT, contactT, tag] =
(read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT) (read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT)
parseEntry _ = error "invalid schedule input format" parseEntry entry = error $ "invalid schedule input format: " <> show entry
executeSchedule :: Int -- ^ speedup factor executeSchedule :: Int -- ^ speedup factor
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))] -> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]

View file

@ -69,12 +69,12 @@ readConfig = do
, confRequestRetries = 3 , confRequestRetries = 3
} }
sConf = ServiceConf sConf = ServiceConf
{ confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup { confSubscriptionExpiryTime = fromIntegral 12*3600 / fromIntegral speedup
, confServicePort = read servicePortString , confServicePort = read servicePortString
, confServiceHost = confDomainString , confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup , confSpeedupFactor = speedup
, confStatsEvalDelay = 35 * 10^6 `div` speedup , confStatsEvalDelay = 120 * 10^6 `div` speedup
} }
pure (fConf, sConf) pure (fConf, sConf)

View file

@ -36,7 +36,7 @@ import System.IO
import System.Random import System.Random
import Text.Read (readEither) import Text.Read (readEither)
import Formatting (fixed, float, format, (%)) import Formatting (fixed, format, int, (%))
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
import Servant.Client import Servant.Client
@ -67,6 +67,7 @@ data PostService d = PostService
, httpMan :: HTTP.Manager , httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent , statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats , loadStats :: TVar RelayStats
-- ^ current load stats, replaced periodically
, logFileHandle :: Handle , logFileHandle :: Handle
} }
deriving (Typeable) deriving (Typeable)
@ -120,7 +121,7 @@ instance DHT d => Service PostService d where
-- log a start message, this also truncates existing files -- log a start message, this also truncates existing files
TxtI.hPutStrLn loggingFile $ Txt.unlines TxtI.hPutStrLn loggingFile $ Txt.unlines
[ "# Starting mock relay implementation" [ "# Starting mock relay implementation"
, "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate" , "#relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions"
] ]
-- Run 'concurrently_' from another thread to be able to return the -- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'. -- 'PostService'.
@ -616,11 +617,29 @@ numParallelDeliveries = 10
launchWorkerThreads :: DHT d => PostService d -> IO () launchWorkerThreads :: DHT d => PostService d -> IO ()
launchWorkerThreads serv = concurrently_ launchWorkerThreads serv = concurrently_
(processIncomingPosts serv) (processIncomingPosts serv)
$ concurrently_
(purgeSubscriptionsThread serv)
$ concurrently_ $ concurrently_
(fetchTagPosts serv) (fetchTagPosts serv)
(relayWorker serv) (relayWorker serv)
-- | periodically remove expired subscription entries from relay subscribers
purgeSubscriptionsThread :: PostService d -> IO ()
purgeSubscriptionsThread serv = forever $ do
-- read config
now <- getPOSIXTime
let
purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10
-- no need to atomically lock this, as newly incoming subscriptions do not
-- need to be purged
tagMap <- readTVarIO $ subscribers serv
forM_ tagMap $ \(subscriberMapSTM, _, _) ->
-- but each subscriberMap needs to be modified atomically
atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now)
threadDelay $ fromEnum purgeInterval `div` 10^6
-- | process the pending relay inbox of incoming posts from the internal queue: -- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it -- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts :: DHT d => PostService d -> IO ()
@ -651,8 +670,8 @@ processIncomingPosts serv = forever $ do
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag -- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag
when (maybe True (\subLease -> now - subLease < 120) subscriptionStatus) $ when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $
void $ clientSubscribeTo serv tag void $ clientSubscribeTo serv tag
-- for evaluation, return the tag of the successfully forwarded post -- for evaluation, return the tag of the successfully forwarded post
@ -681,7 +700,6 @@ fetchTagPosts serv = forever $ do
--if HTTPT.statusCode (HTTP.responseStatus response) == 200 --if HTTPT.statusCode (HTTP.responseStatus response) == 200
-- then -- then
-- -- success, TODO: statistics -- -- success, TODO: statistics
-- putStrLn "post fetch success"
-- else -- else
pure () pure ()
Left _ -> Left _ ->
@ -723,6 +741,7 @@ relayWorker serv = forever $ do
runningJobs <- mapM async jobset runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism -- so far just dropping failed attempts, TODO: retry mechanism
successfulResults <- rights <$> mapM waitCatch runningJobs successfulResults <- rights <$> mapM waitCatch runningJobs
putStrLn $ "successfully relayed " <> show (length successfulResults)
pure () pure ()
@ -818,16 +837,26 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
let rateStats = evaluateStats timePassed summedStats let rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats atomically $ writeTVar (loadStats serv) rateStats
-- and now what? write a log to file -- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load -- later: current (reported) load, target load
subscriberSum <- sumSubscribers
TxtI.hPutStrLn (logFileHandle serv) $ TxtI.hPutStrLn (logFileHandle serv) $
format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20) format (fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int )
(sum . relayReceiveRates $ rateStats) (sum . relayReceiveRates $ rateStats)
(sum . relayDeliveryRates $ rateStats) (sum . relayDeliveryRates $ rateStats)
(postPublishRate rateStats) (postPublishRate rateStats)
(postFetchRate rateStats) (postFetchRate rateStats)
subscriberSum
loop now loop now
sumSubscribers = do
tagMap <- readTVarIO $ subscribers serv
foldM (\subscriberSum (subscriberMapSTM, _, _) -> do
subscriberMap <- readTVarIO subscriberMapSTM
pure $ subscriberSum + HMap.size subscriberMap
)
0 tagMap
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event -- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame -- rates by dividing through the collection time frame