refactor relay processing to STM-retry instead of busy-wait

This commit is contained in:
Trolli Schmittlauch 2020-09-09 14:23:36 +02:00
parent 2b39648a77
commit 0ffe9effc0
2 changed files with 34 additions and 16 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist
ghc-options: -Wall -Wpartial-fields -O2

View file

@ -17,6 +17,7 @@ import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.DList as D
import Data.Either (rights)
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
@ -39,6 +40,7 @@ import Servant.Client
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService.API
import Hash2Pub.RingMap
import Hash2Pub.Utils
data PostService d = PostService
@ -612,27 +614,43 @@ fetchTagPosts serv = forever $ do
pure ()
-- TODO: make configurable
numParallelDeliveries = 10
-- TODO: paralellelisation
-- TODO: make sure it doesn't busy-wait
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
subscriptionMap <- readTVarIO $ subscribers serv
-- for each tag, try to deliver some posts to subscriber
forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVarIO subscriberMapSTM
forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do
-- atomically (to be able to retry) fold a list of due delivery actions
jobsToProcess <- atomically $ do
subscriptionMap <- readTVar $ subscribers serv
jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpTo 500 postChan
response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append fetch job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
else jobAcc'
) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap
-- if no relay jobs, then retry
if null jobList
then retry
else pure jobList
-- when processing the list, send several deliveries in parallel
forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do
runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism
successfulResults <- rights <$> mapM waitCatch runningJobs
-- TODO: stats
pure ()
)
)
where
readUpTo :: Int -> TChan a -> IO [a]
readUpTo :: Int -> TChan a -> STM [a]
readUpTo 0 _ = pure []
readUpTo n chan = do
readFromChan <- atomically (tryReadTChan chan)
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do