forked from schmittlauch/Hash2Pub
flush responsibility cache and retry in post queue delivery
This commit is contained in:
parent
96c1963a4f
commit
1258f673da
|
@ -301,6 +301,20 @@ clientAPI = Proxy
|
||||||
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
|
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
|
||||||
|
|
||||||
|
|
||||||
|
---- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
|
||||||
|
---- and their outstanding delivery queue to another instance.
|
||||||
|
---- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
||||||
|
--clientDeliverSubscriptions :: PostService
|
||||||
|
-- -> Hashtag -- ^ fromTag
|
||||||
|
-- -> Hashtag -- ^ toTag
|
||||||
|
-- -> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||||
|
-- -> IO (Either String ()) -- Either signals success or failure
|
||||||
|
--clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
||||||
|
-- -- collect tag intearval
|
||||||
|
-- intervalTags <- takeRMapSuccesorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack fromTag) =<< readTVarIO $ subscribers serv
|
||||||
|
-- -- extract subscribers and posts
|
||||||
|
-- -- send subscribers
|
||||||
|
-- -- on failure return a Left, otherwise flush remaining queues atomically, schedule all newly arrived posts to still be relayed and delete subscription entry
|
||||||
|
|
||||||
-- currently this is unused code
|
-- currently this is unused code
|
||||||
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
|
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
|
||||||
|
@ -407,7 +421,7 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
|
||||||
|
|
||||||
-- ====== worker threads ======
|
-- ====== worker threads ======
|
||||||
|
|
||||||
-- | process the pending relays of incoming posts from the internal queue:
|
-- | process the pending relay inbox of incoming posts from the internal queue:
|
||||||
-- Look up responsible relay node for given hashtag and forward post to it
|
-- Look up responsible relay node for given hashtag and forward post to it
|
||||||
processIncomingPosts :: DHT d => PostService d -> IO ()
|
processIncomingPosts :: DHT d => PostService d -> IO ()
|
||||||
processIncomingPosts serv = forever $ do
|
processIncomingPosts serv = forever $ do
|
||||||
|
@ -422,5 +436,11 @@ processIncomingPosts serv = forever $ do
|
||||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
||||||
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
||||||
case resp of
|
case resp of
|
||||||
Left err -> putStrLn $ "Error: " <> show err
|
Left err -> do
|
||||||
|
putStrLn $ "Error: " <> show err
|
||||||
|
-- 410 error indicates outdated responsibility mapping
|
||||||
|
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
|
||||||
|
-- TODO: keep track of maximum retries
|
||||||
|
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
|
||||||
|
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
|
||||||
Right yay -> putStrLn $ "Yay! " <> show yay
|
Right yay -> putStrLn $ "Yay! " <> show yay
|
||||||
|
|
Loading…
Reference in a new issue