diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index ab3f317..11562d4 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -301,20 +301,47 @@ clientAPI = Proxy relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI ----- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] ----- and their outstanding delivery queue to another instance. ----- If the transfer succeeds, the transfered subscribers are removed from the local list. ---clientDeliverSubscriptions :: PostService --- -> Hashtag -- ^ fromTag --- -> Hashtag -- ^ toTag --- -> (String, Int) -- ^ hostname and port of instance to deliver to --- -> IO (Either String ()) -- Either signals success or failure ---clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do --- -- collect tag intearval --- intervalTags <- takeRMapSuccesorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) =<< readTVarIO $ subscribers serv --- -- extract subscribers and posts --- -- send subscribers --- -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry +-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] +-- and their outstanding delivery queue to another instance. +-- If the transfer succeeds, the transfered subscribers are removed from the local list. +clientDeliverSubscriptions :: PostService d + -> Hashtag -- ^ fromTag + -> Hashtag -- ^ toTag + -> (String, Int) -- ^ hostname and port of instance to deliver to + -> IO (Either String ()) -- Either signals success or failure +clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do + -- collect tag intearval + intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) <$> readTVarIO (subscribers serv) + -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] + -- extract subscribers and posts + -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT + subscriberData <- foldM (\response (subSTM, _, tag) -> do + subMap <- readTVarIO subSTM + thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do + -- duplicate the pending queue to work on a copy, in case of a delivery error + pending <- atomically $ do + queueCopy <- cloneTChan subChan + channelGetAll queueCopy + if null pending + then pure tagResponse + else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n" + ) + "" + (HMap.toList subMap) + pure $ thisTagsData <> response + ) + "" + intervalTags + -- send subscribers + -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry + pure . Right $ () + where + channelGetAll :: TChan a -> STM [a] + channelGetAll chan = channelGetAll' chan [] + channelGetAll' :: TChan a -> [a] -> STM [a] + channelGetAll' chan acc = do + haveRead <- tryReadTChan chan + maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI