From ad52a017aa18c92188ddf89a3edc8e16340d1132 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 29 Jul 2020 22:15:14 +0200 Subject: [PATCH] add relay inbox endpoint --- src/Hash2Pub/PostService.hs | 74 ++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 169d2b7..059ebe5 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -56,10 +56,11 @@ type PostID = Txt.Text type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts -type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag) +type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) +type TagSubscribersSTM = TVar TagSubscribers -- | each subscriber is identified by its contact data "hostname" "port" -- and holds a TChan duplicated from the broadcast TChan of the tag -type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID)) +type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID)) instance DHT d => Service PostService d where @@ -115,7 +116,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= -type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent +type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent -- ^ endpoint for delivering the subscriptions and outstanding queue @@ -123,6 +124,8 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Put -- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text -- ^ endpoint for fetching multiple posts at once + :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent + -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text -- ^ delivery endpoint for posts of $tag at subscribing instance :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer @@ -139,28 +142,28 @@ postServer service = relayInbox service :<|> subscriptionDelivery service :<|> postFetch service :<|> postMultiFetch service + :<|> postInbox service :<|> tagDelivery service :<|> tagSubscribe service :<|> tagUnsubscribe service -relayInbox :: PostService d -> Txt.Text -> Handler NoContent -relayInbox serv post = do - -- extract contained hashtags +relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent +relayInbox serv tag posts = do let - containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post - -- generate post ID - postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer) - -- add ID to own posts - liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) - -- enqueue a relay job for each tag - liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> - atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) - ) + -- skip checking whether the post actually contains the tag, just drop full post + postIDs = head . Txt.splitOn "," <$> Txt.lines posts + broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag + -- if tag is not in own responsibility, return a 410 Gone + maybe + (throwError $ err410 { errBody = "Relay is not responsible for this tag"}) + -- otherwise enqueue posts into broadcast queue of the tag + (\queue -> + liftIO $ forM_ postIDs (atomically . writeTChan queue) + ) + broadcastChan pure NoContent - - subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent subscriptionDelivery serv subList = do let @@ -198,6 +201,23 @@ postMultiFetch serv postIDs = do else throwError $ err404 { errBody = "No post found with this ID" } ) "" idList + +postInbox :: PostService d -> Txt.Text -> Handler NoContent +postInbox serv post = do + -- extract contained hashtags + let + containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post + -- generate post ID + postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer) + -- add ID to own posts + liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) + -- enqueue a relay job for each tag + liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> + atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) + ) + pure NoContent + + tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts @@ -225,7 +245,7 @@ enqueueSubscriptions tagMapSTM tag subscriber posts = do setupSubscriberChannel :: STM (TChan PostID) setupSubscriberChannel = do tagMap <- readTVar tagMapSTM - case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of + case lookupRelayTags tag tagMap of Nothing -> do -- if no collision/ tag doesn't exist yet, just initialize a -- new subscriber map @@ -247,6 +267,24 @@ enqueueSubscriptions tagMapSTM tag subscriber posts = do Just tagOutChan -> pure tagOutChan +-- | returns the broadcast channel of a hashtag if there are any subscribers to it +getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) +getTagBroadcastChannel serv tag = do + tagMap <- readTVar $ subscribers serv + case lookupRelayTags tag tagMap of + Nothing -> pure Nothing + Just (subscriberSTM, broadcastChan, _) -> do + subscriberMap <- readTVar subscriberSTM + if HMap.null subscriberMap + then pure Nothing + else pure (Just broadcastChan) + + +-- | look up the subscription data of a tag +lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag) +lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag) + + -- normalise the unicode representation of a string to NFC normaliseTag :: Txt.Text -> Txt.Text normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict