From c1ce386b6599931b9e45268c18674457fd4f6ab0 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 12 Aug 2020 15:23:10 +0200 Subject: [PATCH] send prepared subscriptions and clean up on success --- src/Hash2Pub/PostService.hs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 11562d4..17d585b 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -311,7 +311,7 @@ clientDeliverSubscriptions :: PostService d -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do -- collect tag intearval - intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) <$> readTVarIO (subscribers serv) + intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT @@ -333,8 +333,16 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do "" intervalTags -- send subscribers - -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry - pure . Right $ () + httpMan <- HTTP.newManager HTTP.defaultManagerSettings + resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) "")) + -- on failure return a Left, otherwise delete subscription entry + case resp of + Left err -> pure . Left . show $ err + Right _ -> do + atomically $ + modifyTVar' (subscribers serv) $ \tagMap -> + foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) + pure . Right $ () where channelGetAll :: TChan a -> STM [a] channelGetAll chan = channelGetAll' chan []