diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 8811080..9be7d1b 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -41,7 +41,7 @@ data PostService d = PostService -- queues, other data structures , baseDHT :: (DHT d) => d , serviceThread :: TVar ThreadId - , subscribers :: TVar (RingMap NodeID TagSubscribers) + , subscribers :: TVar RelayTags -- ^ for each tag store the subscribers + their queue , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time @@ -56,10 +56,10 @@ type PostID = Txt.Text type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts -type RelayTags = RingMap NodeID (TagSubscribers, TChan PostContent) +type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag) -- | each subscriber is identified by its contact data "hostname" "port" -- and holds a TChan duplicated from the broadcast TChan of the tag -type TagSubscribers = HMap.HashMap (String, Int) (TChan PostContent) +type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID)) instance DHT d => Service PostService d where @@ -114,7 +114,7 @@ exposedPostServiceAPI = Proxy type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- ^ delivery endpoint of newly published posts of the relay's instance - :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text + :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent -- ^ endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text -- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid @@ -145,7 +145,7 @@ relayInbox :: PostService d -> Txt.Text -> Handler NoContent relayInbox serv post = do -- extract contained hashtags let - containedTags = fmap (Txt.fromStrict . normalize NFC . Txt.toStrict . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post + containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post -- generate post ID postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer) -- add ID to own posts @@ -158,8 +158,25 @@ relayInbox serv post = do -subscriptionDelivery :: PostService d -> Txt.Text -> Handler Txt.Text -subscriptionDelivery serv subList = pure $ "Here be Subscription List dragons: " <> subList +subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent +subscriptionDelivery serv subList = do + let + tagSubs = Txt.lines subList + liftIO $ forM_ tagSubs $ processTag (subscribers serv) + pure NoContent + -- TODO: check and only accept tags in own (future?) responsibility + where + processTag :: TVar RelayTags -> Txt.Text -> IO () + processTag subscriberSTM tagData = do + let + tag:subText:posts:_ = Txt.splitOn "," tagData + sub = read . Txt.unpack $ subText :: (String, Int) + postList = Txt.words posts + enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList + + + + postFetch :: PostService d -> Txt.Text -> Handler Txt.Text postFetch serv postID = pure $ "Here be a post with dragon ID " <> postID @@ -178,6 +195,49 @@ tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Tex tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag +-- ======= data structure manipulations ========= + +-- | Write all pending posts of a subscriber-tag-combination to its queue. +-- Sets up all necessary data structures if they are still missing. +enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map + -> Hashtag -- hashtag of pending posts + -> (String, Int) -- subscriber's connection information + -> [PostID] -- pending posts + -> IO () +enqueueSubscriptions tagMapSTM tag subscriber posts = do + -- get the tag output queue and, if necessary, create it + subChan <- atomically setupSubscriberChannel + forM_ posts (atomically . writeTChan subChan) + where + setupSubscriberChannel :: STM (TChan PostID) + setupSubscriberChannel = do + tagMap <- readTVar tagMapSTM + case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of + Nothing -> do + -- if no collision/ tag doesn't exist yet, just initialize a + -- new subscriber map + broadcastChan <- newBroadcastTChan + tagOutChan <- dupTChan broadcastChan + newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan + writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap + pure tagOutChan + Just (foundSubMapSTM, broadcastChan, _) -> do + -- otherwise use the existing subscriber map + foundSubMap <- readTVar foundSubMapSTM + case HMap.lookup subscriber foundSubMap of + Nothing -> do + -- for new subscribers, create new output channel + tagOutChan <- dupTChan broadcastChan + writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap + pure tagOutChan + -- existing subscriber's channels are just returned + Just tagOutChan -> pure tagOutChan + + +-- normalise the unicode representation of a string to NFC +normaliseTag :: Txt.Text -> Txt.Text +normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict + -- | define how to convert all showable types to PlainText -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap