send prepared subscriptions and clean up on success
This commit is contained in:
parent
2e88a4079b
commit
c1ce386b65
|
@ -311,7 +311,7 @@ clientDeliverSubscriptions :: PostService d
|
||||||
-> IO (Either String ()) -- Either signals success or failure
|
-> IO (Either String ()) -- Either signals success or failure
|
||||||
clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
||||||
-- collect tag intearval
|
-- collect tag intearval
|
||||||
intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) <$> readTVarIO (subscribers serv)
|
intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv)
|
||||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||||
-- extract subscribers and posts
|
-- extract subscribers and posts
|
||||||
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
|
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
|
||||||
|
@ -333,8 +333,16 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
||||||
""
|
""
|
||||||
intervalTags
|
intervalTags
|
||||||
-- send subscribers
|
-- send subscribers
|
||||||
-- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry
|
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
||||||
pure . Right $ ()
|
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) ""))
|
||||||
|
-- on failure return a Left, otherwise delete subscription entry
|
||||||
|
case resp of
|
||||||
|
Left err -> pure . Left . show $ err
|
||||||
|
Right _ -> do
|
||||||
|
atomically $
|
||||||
|
modifyTVar' (subscribers serv) $ \tagMap ->
|
||||||
|
foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags)
|
||||||
|
pure . Right $ ()
|
||||||
where
|
where
|
||||||
channelGetAll :: TChan a -> STM [a]
|
channelGetAll :: TChan a -> STM [a]
|
||||||
channelGetAll chan = channelGetAll' chan []
|
channelGetAll chan = channelGetAll' chan []
|
||||||
|
|
Loading…
Reference in a new issue