diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 8f47227..17d585b 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -301,6 +301,55 @@ clientAPI = Proxy relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI +-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] +-- and their outstanding delivery queue to another instance. +-- If the transfer succeeds, the transfered subscribers are removed from the local list. +clientDeliverSubscriptions :: PostService d + -> Hashtag -- ^ fromTag + -> Hashtag -- ^ toTag + -> (String, Int) -- ^ hostname and port of instance to deliver to + -> IO (Either String ()) -- Either signals success or failure +clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do + -- collect tag intearval + intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv) + -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] + -- extract subscribers and posts + -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT + subscriberData <- foldM (\response (subSTM, _, tag) -> do + subMap <- readTVarIO subSTM + thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do + -- duplicate the pending queue to work on a copy, in case of a delivery error + pending <- atomically $ do + queueCopy <- cloneTChan subChan + channelGetAll queueCopy + if null pending + then pure tagResponse + else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n" + ) + "" + (HMap.toList subMap) + pure $ thisTagsData <> response + ) + "" + intervalTags + -- send subscribers + httpMan <- HTTP.newManager HTTP.defaultManagerSettings + resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) "")) + -- on failure return a Left, otherwise delete subscription entry + case resp of + Left err -> pure . Left . show $ err + Right _ -> do + atomically $ + modifyTVar' (subscribers serv) $ \tagMap -> + foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) + pure . Right $ () + where + channelGetAll :: TChan a -> STM [a] + channelGetAll chan = channelGetAll' chan [] + channelGetAll' :: TChan a -> [a] -> STM [a] + channelGetAll' chan acc = do + haveRead <- tryReadTChan chan + maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI @@ -407,7 +456,7 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where -- ====== worker threads ====== --- | process the pending relays of incoming posts from the internal queue: +-- | process the pending relay inbox of incoming posts from the internal queue: -- Look up responsible relay node for given hashtag and forward post to it processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts serv = forever $ do @@ -422,5 +471,11 @@ processIncomingPosts serv = forever $ do httpMan <- HTTP.newManager HTTP.defaultManagerSettings resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of - Left err -> putStrLn $ "Error: " <> show err + Left err -> do + putStrLn $ "Error: " <> show err + -- 410 error indicates outdated responsibility mapping + -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post + -- TODO: keep track of maximum retries + _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) + atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) Right yay -> putStrLn $ "Yay! " <> show yay diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index 9b439e9..e99f8b2 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -241,9 +241,9 @@ takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k) -> [a] takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing [] -takeRMapSuccesorsFromTo :: (Bounded k, Ord k, Num k) +takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k) => k -- start value for taking -> k -- stop value for taking -> RingMap k a -> [a] -takeRMapSuccesorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] +takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []