From 1258f673da285148c23dd66c43aaffa970f1e4cd Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 12 Aug 2020 12:07:41 +0200 Subject: [PATCH 1/4] flush responsibility cache and retry in post queue delivery --- src/Hash2Pub/PostService.hs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 8f47227..ab3f317 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -301,6 +301,20 @@ clientAPI = Proxy relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI +---- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] +---- and their outstanding delivery queue to another instance. +---- If the transfer succeeds, the transfered subscribers are removed from the local list. +--clientDeliverSubscriptions :: PostService +-- -> Hashtag -- ^ fromTag +-- -> Hashtag -- ^ toTag +-- -> (String, Int) -- ^ hostname and port of instance to deliver to +-- -> IO (Either String ()) -- Either signals success or failure +--clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do +-- -- collect tag intearval +-- intervalTags <- takeRMapSuccesorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) =<< readTVarIO $ subscribers serv +-- -- extract subscribers and posts +-- -- send subscribers +-- -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI @@ -407,7 +421,7 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where -- ====== worker threads ====== --- | process the pending relays of incoming posts from the internal queue: +-- | process the pending relay inbox of incoming posts from the internal queue: -- Look up responsible relay node for given hashtag and forward post to it processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts serv = forever $ do @@ -422,5 +436,11 @@ processIncomingPosts serv = forever $ do httpMan <- HTTP.newManager HTTP.defaultManagerSettings resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of - Left err -> putStrLn $ "Error: " <> show err + Left err -> do + putStrLn $ "Error: " <> show err + -- 410 error indicates outdated responsibility mapping + -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post + -- TODO: keep track of maximum retries + _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) + atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) Right yay -> putStrLn $ "Yay! " <> show yay From 1d808b6776c048c06fbf3f202db1a591df843578 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 12 Aug 2020 12:16:20 +0200 Subject: [PATCH 2/4] fix typo --- src/Hash2Pub/RingMap.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index 9b439e9..e99f8b2 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -241,9 +241,9 @@ takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k) -> [a] takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing [] -takeRMapSuccesorsFromTo :: (Bounded k, Ord k, Num k) +takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k) => k -- start value for taking -> k -- stop value for taking -> RingMap k a -> [a] -takeRMapSuccesorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] +takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] From 2e88a4079b0c36888fe924c8aa72592f93926e89 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 12 Aug 2020 14:07:19 +0200 Subject: [PATCH 3/4] extract and build subscriber payload for sending --- src/Hash2Pub/PostService.hs | 55 +++++++++++++++++++++++++++---------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index ab3f317..11562d4 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -301,20 +301,47 @@ clientAPI = Proxy relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI ----- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] ----- and their outstanding delivery queue to another instance. ----- If the transfer succeeds, the transfered subscribers are removed from the local list. ---clientDeliverSubscriptions :: PostService --- -> Hashtag -- ^ fromTag --- -> Hashtag -- ^ toTag --- -> (String, Int) -- ^ hostname and port of instance to deliver to --- -> IO (Either String ()) -- Either signals success or failure ---clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do --- -- collect tag intearval --- intervalTags <- takeRMapSuccesorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) =<< readTVarIO $ subscribers serv --- -- extract subscribers and posts --- -- send subscribers --- -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry +-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] +-- and their outstanding delivery queue to another instance. +-- If the transfer succeeds, the transfered subscribers are removed from the local list. +clientDeliverSubscriptions :: PostService d + -> Hashtag -- ^ fromTag + -> Hashtag -- ^ toTag + -> (String, Int) -- ^ hostname and port of instance to deliver to + -> IO (Either String ()) -- Either signals success or failure +clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do + -- collect tag intearval + intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) <$> readTVarIO (subscribers serv) + -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] + -- extract subscribers and posts + -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT + subscriberData <- foldM (\response (subSTM, _, tag) -> do + subMap <- readTVarIO subSTM + thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do + -- duplicate the pending queue to work on a copy, in case of a delivery error + pending <- atomically $ do + queueCopy <- cloneTChan subChan + channelGetAll queueCopy + if null pending + then pure tagResponse + else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n" + ) + "" + (HMap.toList subMap) + pure $ thisTagsData <> response + ) + "" + intervalTags + -- send subscribers + -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry + pure . Right $ () + where + channelGetAll :: TChan a -> STM [a] + channelGetAll chan = channelGetAll' chan [] + channelGetAll' :: TChan a -> [a] -> STM [a] + channelGetAll' chan acc = do + haveRead <- tryReadTChan chan + maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI From c1ce386b6599931b9e45268c18674457fd4f6ab0 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 12 Aug 2020 15:23:10 +0200 Subject: [PATCH 4/4] send prepared subscriptions and clean up on success --- src/Hash2Pub/PostService.hs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 11562d4..17d585b 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -311,7 +311,7 @@ clientDeliverSubscriptions :: PostService d -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do -- collect tag intearval - intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) <$> readTVarIO (subscribers serv) + intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT @@ -333,8 +333,16 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do "" intervalTags -- send subscribers - -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry - pure . Right $ () + httpMan <- HTTP.newManager HTTP.defaultManagerSettings + resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) "")) + -- on failure return a Left, otherwise delete subscription entry + case resp of + Left err -> pure . Left . show $ err + Right _ -> do + atomically $ + modifyTVar' (subscribers serv) $ \tagMap -> + foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) + pure . Right $ () where channelGetAll :: TChan a -> STM [a] channelGetAll chan = channelGetAll' chan []