From 1258f673da285148c23dd66c43aaffa970f1e4cd Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 12 Aug 2020 12:07:41 +0200 Subject: [PATCH] flush responsibility cache and retry in post queue delivery --- src/Hash2Pub/PostService.hs | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 8f47227..ab3f317 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -301,6 +301,20 @@ clientAPI = Proxy relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI +---- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] +---- and their outstanding delivery queue to another instance. +---- If the transfer succeeds, the transfered subscribers are removed from the local list. +--clientDeliverSubscriptions :: PostService +-- -> Hashtag -- ^ fromTag +-- -> Hashtag -- ^ toTag +-- -> (String, Int) -- ^ hostname and port of instance to deliver to +-- -> IO (Either String ()) -- Either signals success or failure +--clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do +-- -- collect tag intearval +-- intervalTags <- takeRMapSuccesorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) =<< readTVarIO $ subscribers serv +-- -- extract subscribers and posts +-- -- send subscribers +-- -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI @@ -407,7 +421,7 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where -- ====== worker threads ====== --- | process the pending relays of incoming posts from the internal queue: +-- | process the pending relay inbox of incoming posts from the internal queue: -- Look up responsible relay node for given hashtag and forward post to it processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts serv = forever $ do @@ -422,5 +436,11 @@ processIncomingPosts serv = forever $ do httpMan <- HTTP.newManager HTTP.defaultManagerSettings resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of - Left err -> putStrLn $ "Error: " <> show err + Left err -> do + putStrLn $ "Error: " <> show err + -- 410 error indicates outdated responsibility mapping + -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post + -- TODO: keep track of maximum retries + _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) + atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) Right yay -> putStrLn $ "Yay! " <> show yay