diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 17d585b..8f47227 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -301,55 +301,6 @@ clientAPI = Proxy relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI --- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] --- and their outstanding delivery queue to another instance. --- If the transfer succeeds, the transfered subscribers are removed from the local list. -clientDeliverSubscriptions :: PostService d - -> Hashtag -- ^ fromTag - -> Hashtag -- ^ toTag - -> (String, Int) -- ^ hostname and port of instance to deliver to - -> IO (Either String ()) -- Either signals success or failure -clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do - -- collect tag intearval - intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv) - -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] - -- extract subscribers and posts - -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT - subscriberData <- foldM (\response (subSTM, _, tag) -> do - subMap <- readTVarIO subSTM - thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do - -- duplicate the pending queue to work on a copy, in case of a delivery error - pending <- atomically $ do - queueCopy <- cloneTChan subChan - channelGetAll queueCopy - if null pending - then pure tagResponse - else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n" - ) - "" - (HMap.toList subMap) - pure $ thisTagsData <> response - ) - "" - intervalTags - -- send subscribers - httpMan <- HTTP.newManager HTTP.defaultManagerSettings - resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) "")) - -- on failure return a Left, otherwise delete subscription entry - case resp of - Left err -> pure . Left . show $ err - Right _ -> do - atomically $ - modifyTVar' (subscribers serv) $ \tagMap -> - foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) - pure . Right $ () - where - channelGetAll :: TChan a -> STM [a] - channelGetAll chan = channelGetAll' chan [] - channelGetAll' :: TChan a -> [a] -> STM [a] - channelGetAll' chan acc = do - haveRead <- tryReadTChan chan - maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI @@ -456,7 +407,7 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where -- ====== worker threads ====== --- | process the pending relay inbox of incoming posts from the internal queue: +-- | process the pending relays of incoming posts from the internal queue: -- Look up responsible relay node for given hashtag and forward post to it processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts serv = forever $ do @@ -471,11 +422,5 @@ processIncomingPosts serv = forever $ do httpMan <- HTTP.newManager HTTP.defaultManagerSettings resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of - Left err -> do - putStrLn $ "Error: " <> show err - -- 410 error indicates outdated responsibility mapping - -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post - -- TODO: keep track of maximum retries - _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) - atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) + Left err -> putStrLn $ "Error: " <> show err Right yay -> putStrLn $ "Yay! " <> show yay diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index e99f8b2..9b439e9 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -241,9 +241,9 @@ takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k) -> [a] takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing [] -takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k) +takeRMapSuccesorsFromTo :: (Bounded k, Ord k, Num k) => k -- start value for taking -> k -- stop value for taking -> RingMap k a -> [a] -takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] +takeRMapSuccesorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []