From c036dea7f9f91a03f3d068ac9cb3a827961c5db4 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 14 Sep 2020 15:49:44 +0200 Subject: [PATCH] periodically purge expired subscriptions --- app/Main.hs | 2 +- src/Hash2Pub/PostService.hs | 26 ++++++++++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index c10e0c8..d7be0a5 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -69,7 +69,7 @@ readConfig = do , confRequestRetries = 3 } sConf = ServiceConf - { confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup + { confSubscriptionExpiryTime = fromIntegral 12*3600 / fromIntegral speedup , confServicePort = read servicePortString , confServiceHost = confDomainString , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index f2a8a18..69f1b13 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -618,8 +618,26 @@ launchWorkerThreads :: DHT d => PostService d -> IO () launchWorkerThreads serv = concurrently_ (processIncomingPosts serv) $ concurrently_ - (fetchTagPosts serv) - (relayWorker serv) + (purgeSubscriptionsThread serv) + $ concurrently_ + (fetchTagPosts serv) + (relayWorker serv) + + +-- | periodically remove expired subscription entries from relay subscribers +purgeSubscriptionsThread :: PostService d -> IO () +purgeSubscriptionsThread serv = forever $ do + -- read config + now <- getPOSIXTime + let + purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10 + -- no need to atomically lock this, as newly incoming subscriptions do not + -- need to be purged + tagMap <- readTVarIO $ subscribers serv + forM_ tagMap $ \(subscriberMapSTM, _, _) -> + -- but each subscriberMap needs to be modified atomically + atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now) + threadDelay $ fromEnum purgeInterval `div` 10^6 -- | process the pending relay inbox of incoming posts from the internal queue: @@ -652,8 +670,8 @@ processIncomingPosts serv = forever $ do -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags now <- getPOSIXTime subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) - -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag - when (maybe True (\subLease -> now - subLease < 120) subscriptionStatus) $ + -- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag + when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $ void $ clientSubscribeTo serv tag -- for evaluation, return the tag of the successfully forwarded post