diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 89c14d2..81cf552 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -12,16 +12,16 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM_, forever, void, when) +import Control.Monad (foldM, forM, forM_, forever, void, + when) import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (isJust) +import Data.Maybe (fromMaybe, isJust) import Data.String (fromString) -import Data.Text.Lazy (Text) import qualified Data.Text.Lazy as Txt import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX @@ -36,7 +36,6 @@ import Servant import Servant.Client import Hash2Pub.FediChordTypes -import Hash2Pub.PostService.API import Hash2Pub.RingMap @@ -49,7 +48,7 @@ data PostService d = PostService -- ^ for each tag store the subscribers + their queue , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time - , ownPosts :: TVar (HSet.HashSet Text) + , ownPosts :: TVar (HSet.HashSet Txt.Text) -- ^ just store the existence of posts for saving memory, , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously @@ -59,9 +58,9 @@ data PostService d = PostService } deriving (Typeable) -type Hashtag = Text -type PostID = Text -type PostContent = Text +type Hashtag = Txt.Text +type PostID = Txt.Text +type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) @@ -132,13 +131,38 @@ postServiceApplication :: DHT d => PostService d -> Application postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv +-- | needed for guiding type inference +exposedPostServiceAPI :: Proxy PostServiceAPI +exposedPostServiceAPI = Proxy + -- ========= constants =========== -placeholderPost :: Text +placeholderPost :: Txt.Text placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= +type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent + -- delivery endpoint at responsible relay for delivering posts of $tag for distribution + :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text + -- endpoint for delivering the subscriptions and outstanding queue + :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text + -- fetch endpoint for posts, full post ID is http://$domain/post/$postid + :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text + -- endpoint for fetching multiple posts at once + :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent + -- delivery endpoint of newly published posts of the relay's instance + :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text + -- delivery endpoint for posts of $tag at subscribing instance + :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer + -- endpoint for subscribing the instance specified in + -- the Origin header to $hashtag. + -- Returns subscription lease time in seconds. + :<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text + -- endpoint for unsubscribing the instance specified in + -- the Origin header to $hashtag + + postServer :: DHT d => PostService d -> Server PostServiceAPI postServer service = relayInbox service :<|> subscriptionDelivery service @@ -150,7 +174,7 @@ postServer service = relayInbox service :<|> tagUnsubscribe service -relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent +relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent relayInbox serv tag posts = do let -- skip checking whether the post actually contains the tag, just drop full post @@ -178,7 +202,7 @@ newtype UnhandledTagException = UnhandledTagException String instance Exception UnhandledTagException -subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text +subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text subscriptionDelivery serv senderID subList = do let tagSubs = Txt.lines subList @@ -212,7 +236,7 @@ subscriptionDelivery serv senderID subList = do Right _ -> pure "" -- TODO: check and only accept tags in own (future?) responsibility where - processTag :: TVar RelayTags -> Text -> STM () + processTag :: TVar RelayTags -> Txt.Text -> STM () processTag subscriberSTM tagData = do let tag:subText:lease:posts:_ = Txt.splitOn "," tagData @@ -223,7 +247,7 @@ subscriptionDelivery serv senderID subList = do enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime -postFetch :: PostService d -> Text -> Handler Text +postFetch :: PostService d -> Txt.Text -> Handler Txt.Text postFetch serv postID = do postSet <- liftIO . readTVarIO . ownPosts $ serv if HSet.member postID postSet @@ -232,7 +256,7 @@ postFetch serv postID = do else throwError $ err404 { errBody = "No post found with this ID" } -postMultiFetch :: PostService d -> Text -> Handler Text +postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text postMultiFetch serv postIDs = do let idList = Txt.lines postIDs postSet <- liftIO . readTVarIO . ownPosts $ serv @@ -244,7 +268,7 @@ postMultiFetch serv postIDs = do ) "" idList -postInbox :: PostService d -> Text -> Handler NoContent +postInbox :: PostService d -> Txt.Text -> Handler NoContent postInbox serv post = do -- extract contained hashtags let @@ -254,13 +278,13 @@ postInbox serv post = do -- add ID to own posts liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) -- enqueue a relay job for each tag - liftIO $ forM_ (containedTags :: [Text]) (\tag -> + liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) ) pure NoContent -tagDelivery :: PostService d -> Text -> Text -> Handler Text +tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = do let postIDs = Txt.lines posts subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv @@ -271,7 +295,7 @@ tagDelivery serv hashtag posts = do pure () pure $ "Received a postID for tag " <> hashtag -tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer +tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = do responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible @@ -290,7 +314,7 @@ tagSubscribe serv hashtag origin = do pure $ round leaseTime -tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text +tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe serv hashtag origin = do responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible @@ -310,15 +334,8 @@ tagUnsubscribe serv hashtag origin = do clientAPI :: Proxy PostServiceAPI clientAPI = Proxy -relayInboxClient :: Text -> Text -> ClientM NoContent -relayInboxClient :<|> subscriptionDeliveryClient - :<|> postFetchClient - :<|> postMultiFetchClient - :<|> postInboxClient - :<|> tagDeliveryClient - :<|> tagSubscribeClient - :<|> tagUnsubscribeClient - = client clientAPI + +relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI -- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] @@ -526,7 +543,7 @@ lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) -- normalise the unicode representation of a string to NFC -normaliseTag :: Text -> Text +normaliseTag :: Txt.Text -> Txt.Text normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict @@ -584,7 +601,7 @@ fetchTagPosts serv = forever $ do -- TODO: batching, retry -- TODO: process multiple in parallel pIdUri <- atomically . readTQueue $ postFetchQueue serv - fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri + fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) case resp of Right response -> diff --git a/src/Hash2Pub/PostService/API.hs b/src/Hash2Pub/PostService/API.hs deleted file mode 100644 index 1484631..0000000 --- a/src/Hash2Pub/PostService/API.hs +++ /dev/null @@ -1,37 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE InstanceSigs #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE TypeOperators #-} -module Hash2Pub.PostService.API where - -import Data.Text.Lazy (Text) - -import Servant - -type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint at responsible relay for delivering posts of $tag for distribution - :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text - -- endpoint for delivering the subscriptions and outstanding queue - :<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text - -- fetch endpoint for posts, full post ID is http://$domain/post/$postid - :<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text - -- endpoint for fetching multiple posts at once - :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance - :<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text - -- delivery endpoint for posts of $tag at subscribing instance - :<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer - -- endpoint for subscribing the instance specified in - -- the Origin header to $hashtag. - -- Returns subscription lease time in seconds. - :<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text - -- endpoint for unsubscribing the instance specified in - -- the Origin header to $hashtag - --- | needed for guiding type inference -exposedPostServiceAPI :: Proxy PostServiceAPI -exposedPostServiceAPI = Proxy - diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index a5af10c..86825a7 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -1,5 +1,7 @@ module Hash2Pub.ProtocolTypes where +import qualified Data.Map as Map +import Data.Maybe (mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX (POSIXTime) diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index ae1ec15..e99f8b2 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -23,7 +23,7 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where a == b = getRingMap a == getRingMap b instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where - show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap) + show rmap = shows "RingMap " (show $ getRingMap rmap) -- | entry of a 'RingMap' that holds a value and can also -- wrap around the lookup direction at the edges of the name space. @@ -207,7 +207,7 @@ takeEntriesUntil_ :: (Integral i, Bounded k, Ord k) -> Maybe i -- possible number limit -> [a] -> [a] -takeEntriesUntil_ _rmap' _getterFunc' _havingReached _previousEntry (Just remaining) takeAcc +takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc -- length limit reached | remaining <= 0 = takeAcc takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =