automatically subscribe when publishing to a tag

This commit is contained in:
Trolli Schmittlauch 2020-08-20 11:49:23 +02:00
parent 2ee40a7f64
commit 2548b6a507

View file

@ -17,7 +17,7 @@ import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever)
import Control.Monad (foldM, forM, forM_, forever, when, void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import Data.Bifunctor
@ -150,7 +150,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
-- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
@ -191,7 +191,7 @@ relayInbox serv tag posts = do
if responsible
then pure ()
else
(throwError $ err410 { errBody = "Relay is not responsible for this tag"})
throwError $ err410 { errBody = "Relay is not responsible for this tag"}
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
maybe
-- if noone subscribed to the tag, nothing needs to be done
@ -396,7 +396,8 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
-- | Subscribe the client to the given hashtag. On success it returns the given lease time.
-- | Subscribe the client to the given hashtag. On success it returns the given lease time,
-- but also records the subscription in its own data structure.
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
clientSubscribeTo serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
@ -413,7 +414,9 @@ clientSubscribeTo serv tag = do
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doSubscribe newRes False
Left err -> pure . Left . show $ err
Right lease -> pure . Right $ lease
Right lease -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (genKeyID . Txt.unpack $ tag) (fromInteger lease)
pure . Right $ lease
)
lookupResponse
@ -435,7 +438,9 @@ clientUnsubscribeFrom serv tag = do
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe newRes False
Left err -> pure . Left . show $ err
Right _ -> pure . Right $ ()
Right _ -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (genKeyID . Txt.unpack $ tag)
pure . Right $ ()
)
lookupResponse
@ -580,7 +585,14 @@ processIncomingPosts serv = forever $ do
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
Right yay -> putStrLn $ "Yay! " <> show yay
Right yay -> do
putStrLn $ "Yay! " <> show yay
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (genKeyID . Txt.unpack $ tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag
when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $
void $ clientSubscribeTo serv tag
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID