From 2548b6a507c249462c68a519285c79d17429c344 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 20 Aug 2020 11:49:23 +0200 Subject: [PATCH] automatically subscribe when publishing to a tag --- src/Hash2Pub/PostService.hs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index a871343..0eb6e00 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -17,7 +17,7 @@ import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever) +import Control.Monad (foldM, forM, forM_, forever, when, void) import Control.Monad.IO.Class (liftIO) import Control.Monad.STM import Data.Bifunctor @@ -150,7 +150,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance + -- delivery endpoint at responsible relay for delivering posts of $tag for distribution :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text @@ -191,7 +191,7 @@ relayInbox serv tag posts = do if responsible then pure () else - (throwError $ err410 { errBody = "Relay is not responsible for this tag"}) + throwError $ err410 { errBody = "Relay is not responsible for this tag"} broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag maybe -- if noone subscribed to the tag, nothing needs to be done @@ -396,7 +396,8 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead --- | Subscribe the client to the given hashtag. On success it returns the given lease time. +-- | Subscribe the client to the given hashtag. On success it returns the given lease time, +-- but also records the subscription in its own data structure. clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) clientSubscribeTo serv tag = do lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) @@ -413,7 +414,9 @@ clientSubscribeTo serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doSubscribe newRes False Left err -> pure . Left . show $ err - Right lease -> pure . Right $ lease + Right lease -> do + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (genKeyID . Txt.unpack $ tag) (fromInteger lease) + pure . Right $ lease ) lookupResponse @@ -435,7 +438,9 @@ clientUnsubscribeFrom serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doUnsubscribe newRes False Left err -> pure . Left . show $ err - Right _ -> pure . Right $ () + Right _ -> do + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (genKeyID . Txt.unpack $ tag) + pure . Right $ () ) lookupResponse @@ -580,7 +585,14 @@ processIncomingPosts serv = forever $ do -- TODO: keep track of maximum retries _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) - Right yay -> putStrLn $ "Yay! " <> show yay + Right yay -> do + putStrLn $ "Yay! " <> show yay + -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags + now <- getPOSIXTime + subscriptionStatus <- HMap.lookup (genKeyID . Txt.unpack $ tag) <$> readTVarIO (ownSubscriptions serv) + -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag + when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ + void $ clientSubscribeTo serv tag -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID