extract and build subscriber payload for sending
This commit is contained in:
parent
1d808b6776
commit
2e88a4079b
|
@ -301,20 +301,47 @@ clientAPI = Proxy
|
|||
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
|
||||
|
||||
|
||||
---- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
|
||||
---- and their outstanding delivery queue to another instance.
|
||||
---- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
||||
--clientDeliverSubscriptions :: PostService
|
||||
-- -> Hashtag -- ^ fromTag
|
||||
-- -> Hashtag -- ^ toTag
|
||||
-- -> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||
-- -> IO (Either String ()) -- Either signals success or failure
|
||||
--clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
||||
-- -- collect tag intearval
|
||||
-- intervalTags <- takeRMapSuccesorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) =<< readTVarIO $ subscribers serv
|
||||
-- -- extract subscribers and posts
|
||||
-- -- send subscribers
|
||||
-- -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry
|
||||
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
|
||||
-- and their outstanding delivery queue to another instance.
|
||||
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
||||
clientDeliverSubscriptions :: PostService d
|
||||
-> Hashtag -- ^ fromTag
|
||||
-> Hashtag -- ^ toTag
|
||||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||
-> IO (Either String ()) -- Either signals success or failure
|
||||
clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
||||
-- collect tag intearval
|
||||
intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) <$> readTVarIO (subscribers serv)
|
||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||
-- extract subscribers and posts
|
||||
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
|
||||
subscriberData <- foldM (\response (subSTM, _, tag) -> do
|
||||
subMap <- readTVarIO subSTM
|
||||
thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do
|
||||
-- duplicate the pending queue to work on a copy, in case of a delivery error
|
||||
pending <- atomically $ do
|
||||
queueCopy <- cloneTChan subChan
|
||||
channelGetAll queueCopy
|
||||
if null pending
|
||||
then pure tagResponse
|
||||
else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n"
|
||||
)
|
||||
""
|
||||
(HMap.toList subMap)
|
||||
pure $ thisTagsData <> response
|
||||
)
|
||||
""
|
||||
intervalTags
|
||||
-- send subscribers
|
||||
-- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry
|
||||
pure . Right $ ()
|
||||
where
|
||||
channelGetAll :: TChan a -> STM [a]
|
||||
channelGetAll chan = channelGetAll' chan []
|
||||
channelGetAll' :: TChan a -> [a] -> STM [a]
|
||||
channelGetAll' chan acc = do
|
||||
haveRead <- tryReadTChan chan
|
||||
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
|
||||
|
||||
-- currently this is unused code
|
||||
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
|
||||
|
|
Loading…
Reference in a new issue