From 50044673a65d0ba0afcc4abc104fa19a70b67757 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 31 Jul 2020 17:46:33 +0200 Subject: [PATCH] server endpoint for tag-post delivery --- src/Hash2Pub/PostService.hs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index ae122e2..cab4350 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -19,7 +19,7 @@ import Control.Monad.IO.Class (liftIO) import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe) +import Data.Maybe (fromMaybe, isJust) import Data.String (fromString) import qualified Data.Text.Lazy as Txt import Data.Text.Normalize (NormalizationMode (NFC), @@ -48,6 +48,7 @@ data PostService d = PostService -- ^ just store the existence of posts for saving memory, , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously + , postFetchQueue :: TQueue PostID } deriving (Typeable) @@ -73,6 +74,7 @@ instance DHT d => Service PostService d where ownSubsVar <- newTVarIO HMap.empty ownPostVar <- newTVarIO HSet.empty relayInQueue' <- newTQueueIO + postFetchQueue' <- newTQueueIO let thisService = PostService { serviceConf = conf @@ -82,6 +84,7 @@ instance DHT d => Service PostService d where , ownSubscriptions = ownSubsVar , ownPosts = ownPostVar , relayInQueue = relayInQueue' + , postFetchQueue = postFetchQueue' } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings @@ -221,7 +224,15 @@ postInbox serv post = do tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text -tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts +tagDelivery serv hashtag posts = do + let postIDs = Txt.lines posts + subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv + if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions) + then -- TODO: increase a counter/ statistics for received posts of this tag + liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) + else -- silently drop posts from unsubscribed tags + pure () + pure $ "Received a postID for tag " <> hashtag tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = pure 42 @@ -251,7 +262,7 @@ enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID) setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do tagMap <- readTVar tagMapSTM - case lookupRelayTags tag tagMap of + case lookupTagSubscriptions tag tagMap of Nothing -> do -- if no collision/ tag doesn't exist yet, just initialize a -- new subscriber map @@ -277,7 +288,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) getTagBroadcastChannel serv tag = do tagMap <- readTVar $ subscribers serv - case lookupRelayTags tag tagMap of + case lookupTagSubscriptions tag tagMap of Nothing -> pure Nothing Just (subscriberSTM, broadcastChan, _) -> do subscriberMap <- readTVar subscriberSTM @@ -287,8 +298,8 @@ getTagBroadcastChannel serv tag = do -- | look up the subscription data of a tag -lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag) -lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag) +lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a +lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag) -- normalise the unicode representation of a string to NFC