periodically purge expired subscriptions
This commit is contained in:
parent
a0e7142a7d
commit
c036dea7f9
|
@ -69,7 +69,7 @@ readConfig = do
|
||||||
, confRequestRetries = 3
|
, confRequestRetries = 3
|
||||||
}
|
}
|
||||||
sConf = ServiceConf
|
sConf = ServiceConf
|
||||||
{ confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup
|
{ confSubscriptionExpiryTime = fromIntegral 12*3600 / fromIntegral speedup
|
||||||
, confServicePort = read servicePortString
|
, confServicePort = read servicePortString
|
||||||
, confServiceHost = confDomainString
|
, confServiceHost = confDomainString
|
||||||
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
|
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
|
||||||
|
|
|
@ -617,11 +617,29 @@ numParallelDeliveries = 10
|
||||||
launchWorkerThreads :: DHT d => PostService d -> IO ()
|
launchWorkerThreads :: DHT d => PostService d -> IO ()
|
||||||
launchWorkerThreads serv = concurrently_
|
launchWorkerThreads serv = concurrently_
|
||||||
(processIncomingPosts serv)
|
(processIncomingPosts serv)
|
||||||
|
$ concurrently_
|
||||||
|
(purgeSubscriptionsThread serv)
|
||||||
$ concurrently_
|
$ concurrently_
|
||||||
(fetchTagPosts serv)
|
(fetchTagPosts serv)
|
||||||
(relayWorker serv)
|
(relayWorker serv)
|
||||||
|
|
||||||
|
|
||||||
|
-- | periodically remove expired subscription entries from relay subscribers
|
||||||
|
purgeSubscriptionsThread :: PostService d -> IO ()
|
||||||
|
purgeSubscriptionsThread serv = forever $ do
|
||||||
|
-- read config
|
||||||
|
now <- getPOSIXTime
|
||||||
|
let
|
||||||
|
purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10
|
||||||
|
-- no need to atomically lock this, as newly incoming subscriptions do not
|
||||||
|
-- need to be purged
|
||||||
|
tagMap <- readTVarIO $ subscribers serv
|
||||||
|
forM_ tagMap $ \(subscriberMapSTM, _, _) ->
|
||||||
|
-- but each subscriberMap needs to be modified atomically
|
||||||
|
atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now)
|
||||||
|
threadDelay $ fromEnum purgeInterval `div` 10^6
|
||||||
|
|
||||||
|
|
||||||
-- | process the pending relay inbox of incoming posts from the internal queue:
|
-- | process the pending relay inbox of incoming posts from the internal queue:
|
||||||
-- Look up responsible relay node for given hashtag and forward post to it
|
-- Look up responsible relay node for given hashtag and forward post to it
|
||||||
processIncomingPosts :: DHT d => PostService d -> IO ()
|
processIncomingPosts :: DHT d => PostService d -> IO ()
|
||||||
|
@ -652,8 +670,8 @@ processIncomingPosts serv = forever $ do
|
||||||
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
|
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
|
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
|
||||||
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag
|
-- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag
|
||||||
when (maybe True (\subLease -> now - subLease < 120) subscriptionStatus) $
|
when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $
|
||||||
void $ clientSubscribeTo serv tag
|
void $ clientSubscribeTo serv tag
|
||||||
|
|
||||||
-- for evaluation, return the tag of the successfully forwarded post
|
-- for evaluation, return the tag of the successfully forwarded post
|
||||||
|
|
Loading…
Reference in a new issue