add lease time to subscription entries

This commit is contained in:
Trolli Schmittlauch 2020-07-29 23:06:07 +02:00
parent ad52a017aa
commit da47f8062f

View file

@ -60,7 +60,8 @@ type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
type TagSubscribersSTM = TVar TagSubscribers
-- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID))
-- + an expiration timestamp
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
instance DHT d => Service PostService d where
@ -175,10 +176,12 @@ subscriptionDelivery serv subList = do
processTag :: TVar RelayTags -> Txt.Text -> IO ()
processTag subscriberSTM tagData = do
let
tag:subText:posts:_ = Txt.splitOn "," tagData
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
-- ignore checking of lease time
leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer)
sub = read . Txt.unpack $ subText :: (String, Int)
postList = Txt.words posts
enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
@ -232,18 +235,22 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro
-- | Write all pending posts of a subscriber-tag-combination to its queue.
-- Sets up all necessary data structures if they are still missing.
enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map
enqueueSubscription :: TVar RelayTags -- tag-subscriber map
-> Hashtag -- hashtag of pending posts
-> (String, Int) -- subscriber's connection information
-> [PostID] -- pending posts
-> POSIXTime -- lease expiry time
-> IO ()
enqueueSubscriptions tagMapSTM tag subscriber posts = do
enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
-- get the tag output queue and, if necessary, create it
subChan <- atomically setupSubscriberChannel
subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime
forM_ posts (atomically . writeTChan subChan)
where
setupSubscriberChannel :: STM (TChan PostID)
setupSubscriberChannel = do
-- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
-- If the queue doesn't exist yet, all necessary data structures are set up accordingly.
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM
case lookupRelayTags tag tagMap of
Nothing -> do
@ -251,7 +258,7 @@ enqueueSubscriptions tagMapSTM tag subscriber posts = do
-- new subscriber map
broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do
@ -261,10 +268,10 @@ enqueueSubscriptions tagMapSTM tag subscriber posts = do
Nothing -> do
-- for new subscribers, create new output channel
tagOutChan <- dupTChan broadcastChan
writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap
writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap
pure tagOutChan
-- existing subscriber's channels are just returned
Just tagOutChan -> pure tagOutChan
Just (tagOutChan, _) -> pure tagOutChan
-- | returns the broadcast channel of a hashtag if there are any subscribers to it