diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 608551f..c6bac4a 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -185,10 +185,8 @@ relayInbox serv tag posts = do -- if noone subscribed to the tag, nothing needs to be done (pure ()) -- otherwise enqueue posts into broadcast queue of the tag - (\queue -> do + (\queue -> liftIO $ forM_ postIDs (atomically . writeTChan queue) - -- report the received post for statistic purposes - liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag) ) broadcastChan pure NoContent @@ -251,9 +249,7 @@ postFetch serv postID = do postSet <- liftIO . readTVarIO . ownPosts $ serv if HSet.member postID postSet -- decision: always return the same placeholder post - then do - liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant - pure placeholderPost + then pure placeholderPost else throwError $ err404 { errBody = "No post found with this ID" } @@ -263,14 +259,11 @@ postMultiFetch serv postIDs = do let idList = Txt.lines postIDs postSet <- liftIO . readTVarIO . ownPosts $ serv -- look up existence of all given post IDs, fail if even one is missing - response <- foldM (\response postID -> + foldM (\response postID -> if HSet.member postID postSet then pure $ placeholderPost <> "\n" <> response else throwError $ err404 { errBody = "No post found with this ID" } ) "" idList - -- this shouldn't be reached in case of error - liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant - pure response -- | delivery endpoint: inbox for initially publishing a post at an instance @@ -682,18 +675,9 @@ relayWorker serv = forever $ do subscriberMap <- readTVar subscriberMapSTM foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do postsToDeliver <- readUpToTChan 500 postChan - let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) - -- append relay push job to job list + -- append fetch job to job list pure $ if not (null postsToDeliver) - then jobAcc' `D.snoc` (do - deliveryResult <- postDeliveryAction - either - (const $ pure ()) - -- on successful push, record that event for statistics - (const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag)) - deliveryResult - pure deliveryResult - ) + then jobAcc' `D.snoc` runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) else jobAcc' ) jobAcc $ HMap.toList subscriberMap ) D.empty subscriptionMap @@ -707,6 +691,7 @@ relayWorker serv = forever $ do runningJobs <- mapM async jobset -- so far just dropping failed attempts, TODO: retry mechanism successfulResults <- rights <$> mapM waitCatch runningJobs + -- TODO: stats pure ()