add relay inbox endpoint

This commit is contained in:
Trolli Schmittlauch 2020-07-29 22:15:14 +02:00
parent bd70e2dff0
commit ad52a017aa

View file

@ -56,10 +56,11 @@ type PostID = Txt.Text
type PostContent = Txt.Text type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a -- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts -- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag) type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
type TagSubscribersSTM = TVar TagSubscribers
-- | each subscriber is identified by its contact data "hostname" "port" -- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag -- and holds a TChan duplicated from the broadcast TChan of the tag
type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID)) type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID))
instance DHT d => Service PostService d where instance DHT d => Service PostService d where
@ -115,7 +116,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers ============= -- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance -- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
-- ^ endpoint for delivering the subscriptions and outstanding queue -- ^ endpoint for delivering the subscriptions and outstanding queue
@ -123,6 +124,8 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Put
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid -- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for fetching multiple posts at once -- ^ endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint for posts of $tag at subscribing instance -- ^ delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
@ -139,28 +142,28 @@ postServer service = relayInbox service
:<|> subscriptionDelivery service :<|> subscriptionDelivery service
:<|> postFetch service :<|> postFetch service
:<|> postMultiFetch service :<|> postMultiFetch service
:<|> postInbox service
:<|> tagDelivery service :<|> tagDelivery service
:<|> tagSubscribe service :<|> tagSubscribe service
:<|> tagUnsubscribe service :<|> tagUnsubscribe service
relayInbox :: PostService d -> Txt.Text -> Handler NoContent relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent
relayInbox serv post = do relayInbox serv tag posts = do
-- extract contained hashtags
let let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post -- skip checking whether the post actually contains the tag, just drop full post
-- generate post ID postIDs = head . Txt.splitOn "," <$> Txt.lines posts
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer) broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
-- add ID to own posts -- if tag is not in own responsibility, return a 410 Gone
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) maybe
-- enqueue a relay job for each tag (throwError $ err410 { errBody = "Relay is not responsible for this tag"})
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> -- otherwise enqueue posts into broadcast queue of the tag
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) (\queue ->
liftIO $ forM_ postIDs (atomically . writeTChan queue)
) )
broadcastChan
pure NoContent pure NoContent
subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent
subscriptionDelivery serv subList = do subscriptionDelivery serv subList = do
let let
@ -198,6 +201,23 @@ postMultiFetch serv postIDs = do
else throwError $ err404 { errBody = "No post found with this ID" } else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList ) "" idList
postInbox :: PostService d -> Txt.Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
@ -225,7 +245,7 @@ enqueueSubscriptions tagMapSTM tag subscriber posts = do
setupSubscriberChannel :: STM (TChan PostID) setupSubscriberChannel :: STM (TChan PostID)
setupSubscriberChannel = do setupSubscriberChannel = do
tagMap <- readTVar tagMapSTM tagMap <- readTVar tagMapSTM
case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of case lookupRelayTags tag tagMap of
Nothing -> do Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a -- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map -- new subscriber map
@ -247,6 +267,24 @@ enqueueSubscriptions tagMapSTM tag subscriber posts = do
Just tagOutChan -> pure tagOutChan Just tagOutChan -> pure tagOutChan
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv
case lookupRelayTags tag tagMap of
Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM
if HMap.null subscriberMap
then pure Nothing
else pure (Just broadcastChan)
-- | look up the subscription data of a tag
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag)
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC -- normalise the unicode representation of a string to NFC
normaliseTag :: Txt.Text -> Txt.Text normaliseTag :: Txt.Text -> Txt.Text
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict