process incoming posts in parallel

This commit is contained in:
Trolli Schmittlauch 2020-09-09 18:50:45 +02:00
parent 12fcd13754
commit e3a8912360

View file

@ -12,7 +12,8 @@ import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM_, forever, void, when)
import Control.Monad (foldM, forM, forM_, forever, void,
when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
@ -560,6 +561,28 @@ normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
hashtagToId :: Hashtag -> NodeID
hashtagToId = genKeyID . Txt.unpack
readUpToTChan :: Int -> TChan a -> STM [a]
readUpToTChan 0 _ = pure []
readUpToTChan n chan = do
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTChan (pred n) chan
pure (val:moreReads)
readUpToTQueue :: Int -> TQueue a -> STM [a]
readUpToTQueue 0 _ = pure []
readUpToTQueue n q = do
readFromQueue <- tryReadTQueue q
case readFromQueue of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTQueue (pred n) q
pure (val:moreReads)
-- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
@ -571,30 +594,37 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
-- ====== worker threads ======
-- TODO: make configurable
numParallelDeliveries = 10
-- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do
-- blocks until available
-- TODO: process multiple in parallel
(tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv
deliveriesToProcess <- atomically $ do
readResult <- readUpToTQueue numParallelDeliveries $ relayInQueue serv
if null readResult
then retry
else pure readResult
runningJobs <- forM deliveriesToProcess $ \(tag, pID, pContent) -> async $ do
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of
-- no vserver active => wait and retry
Nothing -> threadDelay $ 10 * 10^6
Nothing -> threadDelay (10 * 10^6) >> pure (Left "no vserver active")
Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of
Left err -> do
putStrLn $ "Error: " <> show err
-- 410 error indicates outdated responsibility mapping
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
pure . Left $ "Error: " <> show err
Right _ -> do
-- TODO: stats
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
@ -602,6 +632,13 @@ processIncomingPosts serv = forever $ do
when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $
void $ clientSubscribeTo serv tag
-- for evaluation, return the tag of the successfully forwarded post
pure $ Right tag
-- collect async results
results <- mapM waitCatch runningJobs
-- TODO: statistics
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
fetchTagPosts :: DHT d => PostService d -> IO ()
@ -626,10 +663,6 @@ fetchTagPosts serv = forever $ do
pure ()
-- TODO: make configurable
numParallelDeliveries = 10
-- TODO: paralellelisation
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
-- atomically (to be able to retry) fold a list of due delivery actions
@ -638,7 +671,7 @@ relayWorker serv = forever $ do
jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpTo 500 postChan
postsToDeliver <- readUpToTChan 500 postChan
-- append fetch job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
@ -658,16 +691,6 @@ relayWorker serv = forever $ do
-- TODO: stats
pure ()
where
readUpTo :: Int -> TChan a -> STM [a]
readUpTo 0 _ = pure []
readUpTo n chan = do
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpTo (pred n) chan
pure (val:moreReads)
-- ======= statistics/measurement and logging =======