diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 2953d97..5e8d25d 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist ghc-options: -Wall -Wpartial-fields -O2 diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 099855d..c556d7f 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -17,6 +17,7 @@ import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU +import qualified Data.DList as D import Data.Either (rights) import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet @@ -39,6 +40,7 @@ import Servant.Client import Hash2Pub.FediChordTypes import Hash2Pub.PostService.API import Hash2Pub.RingMap +import Hash2Pub.Utils data PostService d = PostService @@ -612,27 +614,43 @@ fetchTagPosts serv = forever $ do pure () +-- TODO: make configurable +numParallelDeliveries = 10 + -- TODO: paralellelisation --- TODO: make sure it doesn't busy-wait relayWorker :: PostService d -> IO () relayWorker serv = forever $ do - subscriptionMap <- readTVarIO $ subscribers serv - -- for each tag, try to deliver some posts to subscriber - forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do - subscriberMap <- readTVarIO subscriberMapSTM - forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do - postsToDeliver <- readUpTo 500 postChan - response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) - -- so far just dropping failed attempts, TODO: retry mechanism - -- TODO: stats - pure () - ) - ) + -- atomically (to be able to retry) fold a list of due delivery actions + jobsToProcess <- atomically $ do + subscriptionMap <- readTVar $ subscribers serv + jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do + subscriberMap <- readTVar subscriberMapSTM + foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do + postsToDeliver <- readUpTo 500 postChan + -- append fetch job to job list + pure $ if not (null postsToDeliver) + then jobAcc' `D.snoc` runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) + else jobAcc' + ) jobAcc $ HMap.toList subscriberMap + ) D.empty subscriptionMap + -- if no relay jobs, then retry + if null jobList + then retry + else pure jobList + + -- when processing the list, send several deliveries in parallel + forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do + runningJobs <- mapM async jobset + -- so far just dropping failed attempts, TODO: retry mechanism + successfulResults <- rights <$> mapM waitCatch runningJobs + -- TODO: stats + pure () + where - readUpTo :: Int -> TChan a -> IO [a] + readUpTo :: Int -> TChan a -> STM [a] readUpTo 0 _ = pure [] readUpTo n chan = do - readFromChan <- atomically (tryReadTChan chan) + readFromChan <- tryReadTChan chan case readFromChan of Nothing -> pure [] Just val -> do