From daae9d0b38182985963f896018a46c2435e78a80 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 27 Jul 2020 21:39:33 +0200 Subject: [PATCH] process and enqueue incoming posts --- src/Hash2Pub/PostService.hs | 146 ++++++++++++++++++++++------------- src/Hash2Pub/ServiceTypes.hs | 4 +- 2 files changed, 94 insertions(+), 56 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index bc1dc23..fc3e5e8 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -11,16 +11,20 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar -import qualified Data.ByteString.Lazy.UTF8 as BSU -import qualified Data.HashMap.Strict as HMap -import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe) -import Data.String (fromString) -import qualified Data.Text as Txt -import Data.Time.Clock.POSIX +import Control.Monad (forM_, forever) +import Control.Monad.IO.Class (liftIO) +import qualified Data.ByteString.Lazy.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet +import Data.Maybe (fromMaybe) +import Data.String (fromString) +import qualified Data.Text.Lazy as Txt +import Data.Time.Clock.POSIX +import System.Random -import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.Wai.Handler.Warp as Warp import Servant import Hash2Pub.FediChordTypes @@ -29,34 +33,23 @@ import Hash2Pub.ServiceTypes data PostService d = PostService - { psPort :: Warp.Port - , psHost :: String + { psPort :: Warp.Port + , psHost :: String -- queues, other data structures - , baseDHT :: (DHT d) => d - , serviceThread :: ThreadId - , subscribers :: TVar (RingMap NodeID TagSubscribers) + , baseDHT :: (DHT d) => d + , serviceThread :: TVar ThreadId + , subscribers :: TVar (RingMap NodeID TagSubscribers) -- ^ for each tag store the subscribers + their queue , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time - , ownPosts :: TVar (HSet.HashSet Txt.Text) + , ownPosts :: TVar (HSet.HashSet Txt.Text) -- ^ just store the existence of posts for saving memory, - -- always return the same placeholder + , relayInQueue :: TQueue (Hashtag, PostID, PostContent) + -- ^ Queue for processing incoming posts of own instance asynchronously } -instance DHT d => Service PostService d where - runService dht host port = do - let - port' = fromIntegral port - warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings - servThread <- forkIO $ Warp.runSettings warpSettings postServiceApplication - pure $ PostService { - psPort = port' - , psHost = host - , baseDHT = dht - , serviceThread = servThread - } - getServicePort s = fromIntegral $ psPort s - +type Hashtag = Txt.Text +type PostID = Txt.Text type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts @@ -65,9 +58,40 @@ type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent) -- and holds a TChan duplicated from the broadcast TChan of the tag type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent) + +instance DHT d => Service PostService d where + -- | initialise 'PostService' data structures and run server + runService dht host port = do + -- create necessary TVars + threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder + subscriberVar <- newTVarIO emptyRMap + ownSubsVar <- newTVarIO HMap.empty + ownPostVar <- newTVarIO HSet.empty + relayInQueue' <- newTQueueIO + let + thisService = PostService { + psPort = port' + , psHost = host + , baseDHT = dht + , serviceThread = threadVar + , subscribers = subscriberVar + , ownSubscriptions = ownSubsVar + , ownPosts = ownPostVar + , relayInQueue = relayInQueue' + } + port' = fromIntegral port + warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings + servThreadID <- forkIO $ Warp.runSettings warpSettings $ postServiceApplication thisService + -- update thread ID after fork + atomically $ writeTVar threadVar servThreadID + pure thisService + + getServicePort s = fromIntegral $ psPort s + + -- | return a WAI application -postServiceApplication :: Application -postServiceApplication = serve exposedPostServiceAPI postServer +postServiceApplication :: PostService d -> Application +postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv -- | needed for guiding type inference @@ -78,7 +102,7 @@ exposedPostServiceAPI = Proxy -- ========= HTTP API and handlers ============= -type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text +type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text -- ^ endpoint for delivering the subscriptions and outstanding queue @@ -97,37 +121,51 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Pos -- the Origin header to $hashtag -postServer :: Server PostServiceAPI -postServer = relayInbox - :<|> subscriptionDelivery - :<|> postFetch - :<|> postMultiFetch - :<|> tagDelivery - :<|> tagSubscribe - :<|> tagUnsubscribe +postServer :: PostService d -> Server PostServiceAPI +postServer service = relayInbox service + :<|> subscriptionDelivery service + :<|> postFetch service + :<|> postMultiFetch service + :<|> tagDelivery service + :<|> tagSubscribe service + :<|> tagUnsubscribe service -relayInbox :: Txt.Text -> Handler Txt.Text -relayInbox post = pure $ "Here be InboxDragons with " <> post +relayInbox :: PostService d -> Txt.Text -> Handler NoContent +relayInbox serv post = do + -- extract contained hashtags + let + containedTags = fmap Txt.tail . filter ((==) '#' . Txt.head) . Txt.words $ post + -- generate post ID + postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer) + -- add ID to own posts + liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) + -- enqueue a relay job for each tag + liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> + atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) + ) + pure NoContent -subscriptionDelivery :: Txt.Text -> Handler Txt.Text -subscriptionDelivery subList = pure $ "Here be Subscription List dragons: " <> subList -postFetch :: Txt.Text -> Handler Txt.Text -postFetch postID = pure $ "Here be a post with dragon ID " <> postID -postMultiFetch :: Txt.Text -> Handler Txt.Text -postMultiFetch postIDs = pure $ "Here be multiple post dragons: " +subscriptionDelivery :: PostService d -> Txt.Text -> Handler Txt.Text +subscriptionDelivery serv subList = pure $ "Here be Subscription List dragons: " <> subList + +postFetch :: PostService d -> Txt.Text -> Handler Txt.Text +postFetch serv postID = pure $ "Here be a post with dragon ID " <> postID + +postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text +postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: " <> (Txt.unwords . Txt.lines $ postIDs) -tagDelivery :: Txt.Text -> Txt.Text -> Handler Txt.Text -tagDelivery hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts +tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text +tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts -tagSubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Integer -tagSubscribe hashtag origin = pure 42 +tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer +tagSubscribe serv hashtag origin = pure 42 -tagUnsubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Txt.Text -tagUnsubscribe hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag +tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text +tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag -- | define how to convert all showable types to PlainText diff --git a/src/Hash2Pub/ServiceTypes.hs b/src/Hash2Pub/ServiceTypes.hs index 430dc74..5e2b37c 100644 --- a/src/Hash2Pub/ServiceTypes.hs +++ b/src/Hash2Pub/ServiceTypes.hs @@ -1,9 +1,9 @@ {-# LANGUAGE MultiParamTypeClasses #-} module Hash2Pub.ServiceTypes where -import Data.Hashable (Hashable(..)) +import Data.Hashable (Hashable (..)) -import Hash2Pub.FediChord (DHT (..), NodeID(..)) +import Hash2Pub.FediChord (DHT (..), NodeID (..)) class Service s d where -- | run the service