process and enqueue incoming posts

This commit is contained in:
Trolli Schmittlauch 2020-07-27 21:39:33 +02:00
parent 04423171fd
commit daae9d0b38
2 changed files with 94 additions and 56 deletions

View file

@ -11,14 +11,18 @@ module Hash2Pub.PostService where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Monad (forM_, forever)
import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.ByteString.Lazy.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe)
import Data.String (fromString) import Data.String (fromString)
import qualified Data.Text as Txt import qualified Data.Text.Lazy as Txt
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import System.Random
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
@ -33,30 +37,19 @@ data PostService d = PostService
, psHost :: String , psHost :: String
-- queues, other data structures -- queues, other data structures
, baseDHT :: (DHT d) => d , baseDHT :: (DHT d) => d
, serviceThread :: ThreadId , serviceThread :: TVar ThreadId
, subscribers :: TVar (RingMap NodeID TagSubscribers) , subscribers :: TVar (RingMap NodeID TagSubscribers)
-- ^ for each tag store the subscribers + their queue -- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time -- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text) , ownPosts :: TVar (HSet.HashSet Txt.Text)
-- ^ just store the existence of posts for saving memory, -- ^ just store the existence of posts for saving memory,
-- always return the same placeholder , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
} }
instance DHT d => Service PostService d where type Hashtag = Txt.Text
runService dht host port = do type PostID = Txt.Text
let
port' = fromIntegral port
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
servThread <- forkIO $ Warp.runSettings warpSettings postServiceApplication
pure $ PostService {
psPort = port'
, psHost = host
, baseDHT = dht
, serviceThread = servThread
}
getServicePort s = fromIntegral $ psPort s
type PostContent = Txt.Text type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a -- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts -- broadcast 'TChan' for enqueuing posts
@ -65,9 +58,40 @@ type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent)
-- and holds a TChan duplicated from the broadcast TChan of the tag -- and holds a TChan duplicated from the broadcast TChan of the tag
type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent) type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent)
instance DHT d => Service PostService d where
-- | initialise 'PostService' data structures and run server
runService dht host port = do
-- create necessary TVars
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
let
thisService = PostService {
psPort = port'
, psHost = host
, baseDHT = dht
, serviceThread = threadVar
, subscribers = subscriberVar
, ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
}
port' = fromIntegral port
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
servThreadID <- forkIO $ Warp.runSettings warpSettings $ postServiceApplication thisService
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
getServicePort s = fromIntegral $ psPort s
-- | return a WAI application -- | return a WAI application
postServiceApplication :: Application postServiceApplication :: PostService d -> Application
postServiceApplication = serve exposedPostServiceAPI postServer postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
-- | needed for guiding type inference -- | needed for guiding type inference
@ -78,7 +102,7 @@ exposedPostServiceAPI = Proxy
-- ========= HTTP API and handlers ============= -- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance -- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for delivering the subscriptions and outstanding queue -- ^ endpoint for delivering the subscriptions and outstanding queue
@ -97,37 +121,51 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Pos
-- the Origin header to $hashtag -- the Origin header to $hashtag
postServer :: Server PostServiceAPI postServer :: PostService d -> Server PostServiceAPI
postServer = relayInbox postServer service = relayInbox service
:<|> subscriptionDelivery :<|> subscriptionDelivery service
:<|> postFetch :<|> postFetch service
:<|> postMultiFetch :<|> postMultiFetch service
:<|> tagDelivery :<|> tagDelivery service
:<|> tagSubscribe :<|> tagSubscribe service
:<|> tagUnsubscribe :<|> tagUnsubscribe service
relayInbox :: Txt.Text -> Handler Txt.Text relayInbox :: PostService d -> Txt.Text -> Handler NoContent
relayInbox post = pure $ "Here be InboxDragons with " <> post relayInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap Txt.tail . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
subscriptionDelivery :: Txt.Text -> Handler Txt.Text
subscriptionDelivery subList = pure $ "Here be Subscription List dragons: " <> subList
postFetch :: Txt.Text -> Handler Txt.Text
postFetch postID = pure $ "Here be a post with dragon ID " <> postID
postMultiFetch :: Txt.Text -> Handler Txt.Text subscriptionDelivery :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch postIDs = pure $ "Here be multiple post dragons: " subscriptionDelivery serv subList = pure $ "Here be Subscription List dragons: " <> subList
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postFetch serv postID = pure $ "Here be a post with dragon ID " <> postID
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: "
<> (Txt.unwords . Txt.lines $ postIDs) <> (Txt.unwords . Txt.lines $ postIDs)
tagDelivery :: Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
tagSubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe hashtag origin = pure 42 tagSubscribe serv hashtag origin = pure 42
tagUnsubscribe :: Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
-- | define how to convert all showable types to PlainText -- | define how to convert all showable types to PlainText

View file

@ -1,9 +1,9 @@
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
module Hash2Pub.ServiceTypes where module Hash2Pub.ServiceTypes where
import Data.Hashable (Hashable(..)) import Data.Hashable (Hashable (..))
import Hash2Pub.FediChord (DHT (..), NodeID(..)) import Hash2Pub.FediChord (DHT (..), NodeID (..))
class Service s d where class Service s d where
-- | run the service -- | run the service