Compare commits

..

No commits in common. "f8d30d0cc40b4b13cbcf1c740a33bf37441bd50c" and "85d10f677314bea16cd20479f6cd5f4095fa8ea2" have entirely different histories.

View file

@ -185,10 +185,8 @@ relayInbox serv tag posts = do
-- if noone subscribed to the tag, nothing needs to be done
(pure ())
-- otherwise enqueue posts into broadcast queue of the tag
(\queue -> do
(\queue ->
liftIO $ forM_ postIDs (atomically . writeTChan queue)
-- report the received post for statistic purposes
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag)
)
broadcastChan
pure NoContent
@ -251,9 +249,7 @@ postFetch serv postID = do
postSet <- liftIO . readTVarIO . ownPosts $ serv
if HSet.member postID postSet
-- decision: always return the same placeholder post
then do
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant
pure placeholderPost
then pure placeholderPost
else throwError $ err404 { errBody = "No post found with this ID" }
@ -263,14 +259,11 @@ postMultiFetch serv postIDs = do
let idList = Txt.lines postIDs
postSet <- liftIO . readTVarIO . ownPosts $ serv
-- look up existence of all given post IDs, fail if even one is missing
response <- foldM (\response postID ->
foldM (\response postID ->
if HSet.member postID postSet
then pure $ placeholderPost <> "\n" <> response
else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList
-- this shouldn't be reached in case of error
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant
pure response
-- | delivery endpoint: inbox for initially publishing a post at an instance
@ -682,18 +675,9 @@ relayWorker serv = forever $ do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpToTChan 500 postChan
let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append relay push job to job list
-- append fetch job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` (do
deliveryResult <- postDeliveryAction
either
(const $ pure ())
-- on successful push, record that event for statistics
(const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag))
deliveryResult
pure deliveryResult
)
then jobAcc' `D.snoc` runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
else jobAcc'
) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap
@ -707,6 +691,7 @@ relayWorker serv = forever $ do
runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism
successfulResults <- rights <$> mapM waitCatch runningJobs
-- TODO: stats
pure ()