set up subscription data structures and transfer subscription endpoint
This commit is contained in:
parent
3b65757406
commit
970c94ff0d
|
@ -41,7 +41,7 @@ data PostService d = PostService
|
||||||
-- queues, other data structures
|
-- queues, other data structures
|
||||||
, baseDHT :: (DHT d) => d
|
, baseDHT :: (DHT d) => d
|
||||||
, serviceThread :: TVar ThreadId
|
, serviceThread :: TVar ThreadId
|
||||||
, subscribers :: TVar (RingMap NodeID TagSubscribers)
|
, subscribers :: TVar RelayTags
|
||||||
-- ^ for each tag store the subscribers + their queue
|
-- ^ for each tag store the subscribers + their queue
|
||||||
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
|
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
|
||||||
-- ^ tags subscribed by the own node have an assigned lease time
|
-- ^ tags subscribed by the own node have an assigned lease time
|
||||||
|
@ -56,10 +56,10 @@ type PostID = Txt.Text
|
||||||
type PostContent = Txt.Text
|
type PostContent = Txt.Text
|
||||||
-- | For each handled tag, store its subscribers and provide a
|
-- | For each handled tag, store its subscribers and provide a
|
||||||
-- broadcast 'TChan' for enqueuing posts
|
-- broadcast 'TChan' for enqueuing posts
|
||||||
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent)
|
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag)
|
||||||
-- | each subscriber is identified by its contact data "hostname" "port"
|
-- | each subscriber is identified by its contact data "hostname" "port"
|
||||||
-- and holds a TChan duplicated from the broadcast TChan of the tag
|
-- and holds a TChan duplicated from the broadcast TChan of the tag
|
||||||
type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent)
|
type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID))
|
||||||
|
|
||||||
|
|
||||||
instance DHT d => Service PostService d where
|
instance DHT d => Service PostService d where
|
||||||
|
@ -114,7 +114,7 @@ exposedPostServiceAPI = Proxy
|
||||||
|
|
||||||
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
||||||
-- ^ delivery endpoint of newly published posts of the relay's instance
|
-- ^ delivery endpoint of newly published posts of the relay's instance
|
||||||
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
|
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
|
||||||
-- ^ endpoint for delivering the subscriptions and outstanding queue
|
-- ^ endpoint for delivering the subscriptions and outstanding queue
|
||||||
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
|
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
|
||||||
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
||||||
|
@ -145,7 +145,7 @@ relayInbox :: PostService d -> Txt.Text -> Handler NoContent
|
||||||
relayInbox serv post = do
|
relayInbox serv post = do
|
||||||
-- extract contained hashtags
|
-- extract contained hashtags
|
||||||
let
|
let
|
||||||
containedTags = fmap (Txt.fromStrict . normalize NFC . Txt.toStrict . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
|
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
|
||||||
-- generate post ID
|
-- generate post ID
|
||||||
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer)
|
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer)
|
||||||
-- add ID to own posts
|
-- add ID to own posts
|
||||||
|
@ -158,8 +158,25 @@ relayInbox serv post = do
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
subscriptionDelivery :: PostService d -> Txt.Text -> Handler Txt.Text
|
subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent
|
||||||
subscriptionDelivery serv subList = pure $ "Here be Subscription List dragons: " <> subList
|
subscriptionDelivery serv subList = do
|
||||||
|
let
|
||||||
|
tagSubs = Txt.lines subList
|
||||||
|
liftIO $ forM_ tagSubs $ processTag (subscribers serv)
|
||||||
|
pure NoContent
|
||||||
|
-- TODO: check and only accept tags in own (future?) responsibility
|
||||||
|
where
|
||||||
|
processTag :: TVar RelayTags -> Txt.Text -> IO ()
|
||||||
|
processTag subscriberSTM tagData = do
|
||||||
|
let
|
||||||
|
tag:subText:posts:_ = Txt.splitOn "," tagData
|
||||||
|
sub = read . Txt.unpack $ subText :: (String, Int)
|
||||||
|
postList = Txt.words posts
|
||||||
|
enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
|
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
|
||||||
postFetch serv postID = pure $ "Here be a post with dragon ID " <> postID
|
postFetch serv postID = pure $ "Here be a post with dragon ID " <> postID
|
||||||
|
@ -178,6 +195,49 @@ tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Tex
|
||||||
tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
|
tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
|
||||||
|
|
||||||
|
|
||||||
|
-- ======= data structure manipulations =========
|
||||||
|
|
||||||
|
-- | Write all pending posts of a subscriber-tag-combination to its queue.
|
||||||
|
-- Sets up all necessary data structures if they are still missing.
|
||||||
|
enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map
|
||||||
|
-> Hashtag -- hashtag of pending posts
|
||||||
|
-> (String, Int) -- subscriber's connection information
|
||||||
|
-> [PostID] -- pending posts
|
||||||
|
-> IO ()
|
||||||
|
enqueueSubscriptions tagMapSTM tag subscriber posts = do
|
||||||
|
-- get the tag output queue and, if necessary, create it
|
||||||
|
subChan <- atomically setupSubscriberChannel
|
||||||
|
forM_ posts (atomically . writeTChan subChan)
|
||||||
|
where
|
||||||
|
setupSubscriberChannel :: STM (TChan PostID)
|
||||||
|
setupSubscriberChannel = do
|
||||||
|
tagMap <- readTVar tagMapSTM
|
||||||
|
case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of
|
||||||
|
Nothing -> do
|
||||||
|
-- if no collision/ tag doesn't exist yet, just initialize a
|
||||||
|
-- new subscriber map
|
||||||
|
broadcastChan <- newBroadcastTChan
|
||||||
|
tagOutChan <- dupTChan broadcastChan
|
||||||
|
newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan
|
||||||
|
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
|
||||||
|
pure tagOutChan
|
||||||
|
Just (foundSubMapSTM, broadcastChan, _) -> do
|
||||||
|
-- otherwise use the existing subscriber map
|
||||||
|
foundSubMap <- readTVar foundSubMapSTM
|
||||||
|
case HMap.lookup subscriber foundSubMap of
|
||||||
|
Nothing -> do
|
||||||
|
-- for new subscribers, create new output channel
|
||||||
|
tagOutChan <- dupTChan broadcastChan
|
||||||
|
writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap
|
||||||
|
pure tagOutChan
|
||||||
|
-- existing subscriber's channels are just returned
|
||||||
|
Just tagOutChan -> pure tagOutChan
|
||||||
|
|
||||||
|
|
||||||
|
-- normalise the unicode representation of a string to NFC
|
||||||
|
normaliseTag :: Txt.Text -> Txt.Text
|
||||||
|
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
|
||||||
|
|
||||||
-- | define how to convert all showable types to PlainText
|
-- | define how to convert all showable types to PlainText
|
||||||
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
|
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
|
||||||
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
|
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
|
||||||
|
|
Loading…
Reference in a new issue