report post fetches to statistics

This commit is contained in:
Trolli Schmittlauch 2020-09-09 19:55:34 +02:00
parent 620e998876
commit f8d30d0cc4

View file

@ -251,7 +251,9 @@ postFetch serv postID = do
postSet <- liftIO . readTVarIO . ownPosts $ serv postSet <- liftIO . readTVarIO . ownPosts $ serv
if HSet.member postID postSet if HSet.member postID postSet
-- decision: always return the same placeholder post -- decision: always return the same placeholder post
then pure placeholderPost then do
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant
pure placeholderPost
else throwError $ err404 { errBody = "No post found with this ID" } else throwError $ err404 { errBody = "No post found with this ID" }
@ -261,11 +263,14 @@ postMultiFetch serv postIDs = do
let idList = Txt.lines postIDs let idList = Txt.lines postIDs
postSet <- liftIO . readTVarIO . ownPosts $ serv postSet <- liftIO . readTVarIO . ownPosts $ serv
-- look up existence of all given post IDs, fail if even one is missing -- look up existence of all given post IDs, fail if even one is missing
foldM (\response postID -> response <- foldM (\response postID ->
if HSet.member postID postSet if HSet.member postID postSet
then pure $ placeholderPost <> "\n" <> response then pure $ placeholderPost <> "\n" <> response
else throwError $ err404 { errBody = "No post found with this ID" } else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList ) "" idList
-- this shouldn't be reached in case of error
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant
pure response
-- | delivery endpoint: inbox for initially publishing a post at an instance -- | delivery endpoint: inbox for initially publishing a post at an instance
@ -677,9 +682,18 @@ relayWorker serv = forever $ do
subscriberMap <- readTVar subscriberMapSTM subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpToTChan 500 postChan postsToDeliver <- readUpToTChan 500 postChan
-- append fetch job to job list let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append relay push job to job list
pure $ if not (null postsToDeliver) pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) then jobAcc' `D.snoc` (do
deliveryResult <- postDeliveryAction
either
(const $ pure ())
-- on successful push, record that event for statistics
(const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag))
deliveryResult
pure deliveryResult
)
else jobAcc' else jobAcc'
) jobAcc $ HMap.toList subscriberMap ) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap ) D.empty subscriptionMap
@ -693,7 +707,6 @@ relayWorker serv = forever $ do
runningJobs <- mapM async jobset runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism -- so far just dropping failed attempts, TODO: retry mechanism
successfulResults <- rights <$> mapM waitCatch runningJobs successfulResults <- rights <$> mapM waitCatch runningJobs
-- TODO: stats
pure () pure ()