Improve general readability #69
					 3 changed files with 64 additions and 45 deletions
				
			
		|  | @ -21,6 +21,7 @@ import qualified Data.HashMap.Strict       as HMap | ||||||
| import qualified Data.HashSet              as HSet | import qualified Data.HashSet              as HSet | ||||||
| import           Data.Maybe                (isJust) | import           Data.Maybe                (isJust) | ||||||
| import           Data.String               (fromString) | import           Data.String               (fromString) | ||||||
|  | import           Data.Text.Lazy            (Text) | ||||||
| import qualified Data.Text.Lazy            as Txt | import qualified Data.Text.Lazy            as Txt | ||||||
| import           Data.Text.Normalize       (NormalizationMode (NFC), normalize) | import           Data.Text.Normalize       (NormalizationMode (NFC), normalize) | ||||||
| import           Data.Time.Clock.POSIX | import           Data.Time.Clock.POSIX | ||||||
|  | @ -35,6 +36,7 @@ import           Servant | ||||||
| import           Servant.Client | import           Servant.Client | ||||||
| 
 | 
 | ||||||
| import           Hash2Pub.FediChordTypes | import           Hash2Pub.FediChordTypes | ||||||
|  | import           Hash2Pub.PostService.API | ||||||
| import           Hash2Pub.RingMap | import           Hash2Pub.RingMap | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -47,7 +49,7 @@ data PostService d = PostService | ||||||
|     -- ^ for each tag store the subscribers + their queue |     -- ^ for each tag store the subscribers + their queue | ||||||
|     , ownSubscriptions     :: TVar (HMap.HashMap NodeID POSIXTime) |     , ownSubscriptions     :: TVar (HMap.HashMap NodeID POSIXTime) | ||||||
|     -- ^ tags subscribed by the own node have an assigned lease time |     -- ^ tags subscribed by the own node have an assigned lease time | ||||||
|     , ownPosts             :: TVar (HSet.HashSet Txt.Text) |     , ownPosts             :: TVar (HSet.HashSet Text) | ||||||
|     -- ^ just store the existence of posts for saving memory, |     -- ^ just store the existence of posts for saving memory, | ||||||
|     , relayInQueue         :: TQueue (Hashtag, PostID, PostContent) |     , relayInQueue         :: TQueue (Hashtag, PostID, PostContent) | ||||||
|     -- ^ Queue for processing incoming posts of own instance asynchronously |     -- ^ Queue for processing incoming posts of own instance asynchronously | ||||||
|  | @ -57,9 +59,9 @@ data PostService d = PostService | ||||||
|     } |     } | ||||||
|     deriving (Typeable) |     deriving (Typeable) | ||||||
| 
 | 
 | ||||||
| type Hashtag = Txt.Text | type Hashtag = Text | ||||||
| type PostID = Txt.Text | type PostID = Text | ||||||
| type PostContent = Txt.Text | type PostContent = Text | ||||||
| -- | For each handled tag, store its subscribers and provide a | -- | For each handled tag, store its subscribers and provide a | ||||||
| -- broadcast 'TChan' for enqueuing posts | -- broadcast 'TChan' for enqueuing posts | ||||||
| type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) | type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) | ||||||
|  | @ -130,38 +132,13 @@ postServiceApplication :: DHT d => PostService d -> Application | ||||||
| postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv | postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | needed for guiding type inference |  | ||||||
| exposedPostServiceAPI :: Proxy PostServiceAPI |  | ||||||
| exposedPostServiceAPI = Proxy |  | ||||||
| 
 |  | ||||||
| -- ========= constants =========== | -- ========= constants =========== | ||||||
| 
 | 
 | ||||||
| placeholderPost :: Txt.Text | placeholderPost :: Text | ||||||
| placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB | placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB | ||||||
| 
 | 
 | ||||||
| -- ========= HTTP API and handlers ============= | -- ========= HTTP API and handlers ============= | ||||||
| 
 | 
 | ||||||
| type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent |  | ||||||
|                  -- delivery endpoint at responsible relay for delivering posts of $tag for distribution |  | ||||||
|                  :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text |  | ||||||
|                  -- endpoint for delivering the subscriptions and outstanding queue |  | ||||||
|                  :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text |  | ||||||
|                  -- fetch endpoint for posts, full post ID is http://$domain/post/$postid |  | ||||||
|                  :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text |  | ||||||
|                  -- endpoint for fetching multiple posts at once |  | ||||||
|                  :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent |  | ||||||
|                  -- delivery endpoint of newly published posts of the relay's instance |  | ||||||
|                  :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text |  | ||||||
|                  -- delivery endpoint for posts of $tag at subscribing instance |  | ||||||
|                  :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer |  | ||||||
|                  -- endpoint for subscribing the instance specified in |  | ||||||
|                  -- the Origin header to $hashtag. |  | ||||||
|                  -- Returns subscription lease time in seconds. |  | ||||||
|                  :<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text |  | ||||||
|                  -- endpoint for unsubscribing the instance specified in |  | ||||||
|                  -- the Origin header to $hashtag |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| postServer :: DHT d => PostService d -> Server PostServiceAPI | postServer :: DHT d => PostService d -> Server PostServiceAPI | ||||||
| postServer service = relayInbox service | postServer service = relayInbox service | ||||||
|         :<|> subscriptionDelivery service |         :<|> subscriptionDelivery service | ||||||
|  | @ -173,7 +150,7 @@ postServer service = relayInbox service | ||||||
|         :<|> tagUnsubscribe service |         :<|> tagUnsubscribe service | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent | relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent | ||||||
| relayInbox serv tag posts = do | relayInbox serv tag posts = do | ||||||
|     let |     let | ||||||
|     -- skip checking whether the post actually contains the tag, just drop full post |     -- skip checking whether the post actually contains the tag, just drop full post | ||||||
|  | @ -201,7 +178,7 @@ newtype UnhandledTagException = UnhandledTagException String | ||||||
| 
 | 
 | ||||||
| instance Exception UnhandledTagException | instance Exception UnhandledTagException | ||||||
| 
 | 
 | ||||||
| subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text | subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text | ||||||
| subscriptionDelivery serv senderID subList = do | subscriptionDelivery serv senderID subList = do | ||||||
|     let |     let | ||||||
|         tagSubs = Txt.lines subList |         tagSubs = Txt.lines subList | ||||||
|  | @ -235,7 +212,7 @@ subscriptionDelivery serv senderID subList = do | ||||||
|       Right _  -> pure "" |       Right _  -> pure "" | ||||||
|     -- TODO: check and only accept tags in own (future?) responsibility |     -- TODO: check and only accept tags in own (future?) responsibility | ||||||
|   where |   where | ||||||
|       processTag :: TVar RelayTags -> Txt.Text -> STM () |       processTag :: TVar RelayTags -> Text -> STM () | ||||||
|       processTag subscriberSTM tagData = do |       processTag subscriberSTM tagData = do | ||||||
|           let |           let | ||||||
|             tag:subText:lease:posts:_ = Txt.splitOn "," tagData |             tag:subText:lease:posts:_ = Txt.splitOn "," tagData | ||||||
|  | @ -246,7 +223,7 @@ subscriptionDelivery serv senderID subList = do | ||||||
|           enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime |           enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| postFetch :: PostService d -> Txt.Text -> Handler Txt.Text | postFetch :: PostService d -> Text -> Handler Text | ||||||
| postFetch serv postID = do | postFetch serv postID = do | ||||||
|     postSet <- liftIO . readTVarIO . ownPosts $ serv |     postSet <- liftIO . readTVarIO . ownPosts $ serv | ||||||
|     if HSet.member postID postSet |     if HSet.member postID postSet | ||||||
|  | @ -255,7 +232,7 @@ postFetch serv postID = do | ||||||
|        else throwError $ err404 { errBody = "No post found with this ID" } |        else throwError $ err404 { errBody = "No post found with this ID" } | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text | postMultiFetch :: PostService d -> Text -> Handler Text | ||||||
| postMultiFetch serv postIDs = do | postMultiFetch serv postIDs = do | ||||||
|     let idList = Txt.lines postIDs |     let idList = Txt.lines postIDs | ||||||
|     postSet <- liftIO . readTVarIO . ownPosts $ serv |     postSet <- liftIO . readTVarIO . ownPosts $ serv | ||||||
|  | @ -267,7 +244,7 @@ postMultiFetch serv postIDs = do | ||||||
|           ) "" idList |           ) "" idList | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| postInbox :: PostService d -> Txt.Text -> Handler NoContent | postInbox :: PostService d -> Text -> Handler NoContent | ||||||
| postInbox serv post = do | postInbox serv post = do | ||||||
|     -- extract contained hashtags |     -- extract contained hashtags | ||||||
|     let |     let | ||||||
|  | @ -277,13 +254,13 @@ postInbox serv post = do | ||||||
|     -- add ID to own posts |     -- add ID to own posts | ||||||
|     liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) |     liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) | ||||||
|     -- enqueue a relay job for each tag |     -- enqueue a relay job for each tag | ||||||
|     liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> |     liftIO $ forM_ (containedTags :: [Text]) (\tag -> | ||||||
|         atomically $ writeTQueue (relayInQueue serv) (tag, postId,  post) |         atomically $ writeTQueue (relayInQueue serv) (tag, postId,  post) | ||||||
|                                  ) |                                  ) | ||||||
|     pure NoContent |     pure NoContent | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text | tagDelivery :: PostService d -> Text -> Text -> Handler Text | ||||||
| tagDelivery serv hashtag posts = do | tagDelivery serv hashtag posts = do | ||||||
|     let postIDs = Txt.lines posts |     let postIDs = Txt.lines posts | ||||||
|     subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv |     subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv | ||||||
|  | @ -294,7 +271,7 @@ tagDelivery serv hashtag posts = do | ||||||
|             pure () |             pure () | ||||||
|     pure $ "Received a postID for tag " <> hashtag |     pure $ "Received a postID for tag " <> hashtag | ||||||
| 
 | 
 | ||||||
| tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer | tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer | ||||||
| tagSubscribe serv hashtag origin = do | tagSubscribe serv hashtag origin = do | ||||||
|     responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) |     responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) | ||||||
|     if not responsible |     if not responsible | ||||||
|  | @ -313,7 +290,7 @@ tagSubscribe serv hashtag origin = do | ||||||
|     pure $ round leaseTime |     pure $ round leaseTime | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text | tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text | ||||||
| tagUnsubscribe serv hashtag origin = do | tagUnsubscribe serv hashtag origin = do | ||||||
|     responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) |     responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) | ||||||
|     if not responsible |     if not responsible | ||||||
|  | @ -333,8 +310,15 @@ tagUnsubscribe serv hashtag origin = do | ||||||
| clientAPI :: Proxy PostServiceAPI | clientAPI :: Proxy PostServiceAPI | ||||||
| clientAPI = Proxy | clientAPI = Proxy | ||||||
| 
 | 
 | ||||||
| 
 | relayInboxClient :: Text -> Text -> ClientM NoContent | ||||||
| relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI | relayInboxClient :<|> subscriptionDeliveryClient | ||||||
|  |                  :<|> postFetchClient | ||||||
|  |                  :<|> postMultiFetchClient | ||||||
|  |                  :<|> postInboxClient | ||||||
|  |                  :<|> tagDeliveryClient | ||||||
|  |                  :<|> tagSubscribeClient | ||||||
|  |                  :<|> tagUnsubscribeClient | ||||||
|  |                  = client clientAPI | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] | -- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] | ||||||
|  | @ -542,7 +526,7 @@ lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| -- normalise the unicode representation of a string to NFC | -- normalise the unicode representation of a string to NFC | ||||||
| normaliseTag :: Txt.Text -> Txt.Text | normaliseTag :: Text -> Text | ||||||
| normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict | normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
							
								
								
									
										37
									
								
								src/Hash2Pub/PostService/API.hs
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								src/Hash2Pub/PostService/API.hs
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,37 @@ | ||||||
|  | {-# LANGUAGE DataKinds             #-} | ||||||
|  | {-# LANGUAGE FlexibleInstances     #-} | ||||||
|  | {-# LANGUAGE InstanceSigs          #-} | ||||||
|  | {-# LANGUAGE MultiParamTypeClasses #-} | ||||||
|  | {-# LANGUAGE OverloadedStrings     #-} | ||||||
|  | {-# LANGUAGE RankNTypes            #-} | ||||||
|  | {-# LANGUAGE TypeOperators         #-} | ||||||
|  | module Hash2Pub.PostService.API where | ||||||
|  | 
 | ||||||
|  | import           Data.Text.Lazy (Text) | ||||||
|  | 
 | ||||||
|  | import           Servant | ||||||
|  | 
 | ||||||
|  | type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent | ||||||
|  |                  -- delivery endpoint at responsible relay for delivering posts of $tag for distribution | ||||||
|  |                  :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text | ||||||
|  |                  -- endpoint for delivering the subscriptions and outstanding queue | ||||||
|  |                  :<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text | ||||||
|  |                  -- fetch endpoint for posts, full post ID is http://$domain/post/$postid | ||||||
|  |                  :<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text | ||||||
|  |                  -- endpoint for fetching multiple posts at once | ||||||
|  |                  :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent | ||||||
|  |                  -- delivery endpoint of newly published posts of the relay's instance | ||||||
|  |                  :<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text | ||||||
|  |                  -- delivery endpoint for posts of $tag at subscribing instance | ||||||
|  |                  :<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer | ||||||
|  |                  -- endpoint for subscribing the instance specified in | ||||||
|  |                  -- the Origin header to $hashtag. | ||||||
|  |                  -- Returns subscription lease time in seconds. | ||||||
|  |                  :<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text | ||||||
|  |                  -- endpoint for unsubscribing the instance specified in | ||||||
|  |                  -- the Origin header to $hashtag | ||||||
|  | 
 | ||||||
|  | -- | needed for guiding type inference | ||||||
|  | exposedPostServiceAPI :: Proxy PostServiceAPI | ||||||
|  | exposedPostServiceAPI = Proxy | ||||||
|  | 
 | ||||||
|  | @ -1,7 +1,5 @@ | ||||||
| module Hash2Pub.ProtocolTypes where | module Hash2Pub.ProtocolTypes where | ||||||
| 
 | 
 | ||||||
| import qualified Data.Map                as Map |  | ||||||
| import           Data.Maybe              (mapMaybe) |  | ||||||
| import qualified Data.Set                as Set | import qualified Data.Set                as Set | ||||||
| import           Data.Time.Clock.POSIX   (POSIXTime) | import           Data.Time.Clock.POSIX   (POSIXTime) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue