diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index f9b2fc4..608551f 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -251,7 +251,9 @@ postFetch serv postID = do postSet <- liftIO . readTVarIO . ownPosts $ serv if HSet.member postID postSet -- decision: always return the same placeholder post - then pure placeholderPost + then do + liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant + pure placeholderPost else throwError $ err404 { errBody = "No post found with this ID" } @@ -261,11 +263,14 @@ postMultiFetch serv postIDs = do let idList = Txt.lines postIDs postSet <- liftIO . readTVarIO . ownPosts $ serv -- look up existence of all given post IDs, fail if even one is missing - foldM (\response postID -> + response <- foldM (\response postID -> if HSet.member postID postSet then pure $ placeholderPost <> "\n" <> response else throwError $ err404 { errBody = "No post found with this ID" } ) "" idList + -- this shouldn't be reached in case of error + liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant + pure response -- | delivery endpoint: inbox for initially publishing a post at an instance @@ -677,9 +682,18 @@ relayWorker serv = forever $ do subscriberMap <- readTVar subscriberMapSTM foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do postsToDeliver <- readUpToTChan 500 postChan - -- append fetch job to job list + let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) + -- append relay push job to job list pure $ if not (null postsToDeliver) - then jobAcc' `D.snoc` runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) + then jobAcc' `D.snoc` (do + deliveryResult <- postDeliveryAction + either + (const $ pure ()) + -- on successful push, record that event for statistics + (const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag)) + deliveryResult + pure deliveryResult + ) else jobAcc' ) jobAcc $ HMap.toList subscriberMap ) D.empty subscriptionMap @@ -693,7 +707,6 @@ relayWorker serv = forever $ do runningJobs <- mapM async jobset -- so far just dropping failed attempts, TODO: retry mechanism successfulResults <- rights <$> mapM waitCatch runningJobs - -- TODO: stats pure ()