diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 059ebe5..81b00a3 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -60,7 +60,8 @@ type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) type TagSubscribersSTM = TVar TagSubscribers -- | each subscriber is identified by its contact data "hostname" "port" -- and holds a TChan duplicated from the broadcast TChan of the tag -type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID)) +-- + an expiration timestamp +type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime)) instance DHT d => Service PostService d where @@ -175,10 +176,12 @@ subscriptionDelivery serv subList = do processTag :: TVar RelayTags -> Txt.Text -> IO () processTag subscriberSTM tagData = do let - tag:subText:posts:_ = Txt.splitOn "," tagData + tag:subText:lease:posts:_ = Txt.splitOn "," tagData + -- ignore checking of lease time + leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer) sub = read . Txt.unpack $ subText :: (String, Int) postList = Txt.words posts - enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList + enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime postFetch :: PostService d -> Txt.Text -> Handler Txt.Text @@ -232,39 +235,43 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro -- | Write all pending posts of a subscriber-tag-combination to its queue. -- Sets up all necessary data structures if they are still missing. -enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map +enqueueSubscription :: TVar RelayTags -- tag-subscriber map -> Hashtag -- hashtag of pending posts -> (String, Int) -- subscriber's connection information -> [PostID] -- pending posts + -> POSIXTime -- lease expiry time -> IO () -enqueueSubscriptions tagMapSTM tag subscriber posts = do +enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do -- get the tag output queue and, if necessary, create it - subChan <- atomically setupSubscriberChannel + subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime forM_ posts (atomically . writeTChan subChan) - where - setupSubscriberChannel :: STM (TChan PostID) - setupSubscriberChannel = do - tagMap <- readTVar tagMapSTM - case lookupRelayTags tag tagMap of - Nothing -> do - -- if no collision/ tag doesn't exist yet, just initialize a - -- new subscriber map - broadcastChan <- newBroadcastTChan - tagOutChan <- dupTChan broadcastChan - newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan - writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap - pure tagOutChan - Just (foundSubMapSTM, broadcastChan, _) -> do - -- otherwise use the existing subscriber map - foundSubMap <- readTVar foundSubMapSTM - case HMap.lookup subscriber foundSubMap of - Nothing -> do - -- for new subscribers, create new output channel - tagOutChan <- dupTChan broadcastChan - writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap - pure tagOutChan - -- existing subscriber's channels are just returned - Just tagOutChan -> pure tagOutChan + + +-- | STM operation to return the outgoing post queue of a tag to a specified subscriber. +-- If the queue doesn't exist yet, all necessary data structures are set up accordingly. +setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID) +setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do + tagMap <- readTVar tagMapSTM + case lookupRelayTags tag tagMap of + Nothing -> do + -- if no collision/ tag doesn't exist yet, just initialize a + -- new subscriber map + broadcastChan <- newBroadcastTChan + tagOutChan <- dupTChan broadcastChan + newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime) + writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap + pure tagOutChan + Just (foundSubMapSTM, broadcastChan, _) -> do + -- otherwise use the existing subscriber map + foundSubMap <- readTVar foundSubMapSTM + case HMap.lookup subscriber foundSubMap of + Nothing -> do + -- for new subscribers, create new output channel + tagOutChan <- dupTChan broadcastChan + writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap + pure tagOutChan + -- existing subscriber's channels are just returned + Just (tagOutChan, _) -> pure tagOutChan -- | returns the broadcast channel of a hashtag if there are any subscribers to it