diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 75bdd33..ffeef17 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -12,14 +12,14 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever, void, - when) +import Control.Monad (foldM, forM, forM_, forever, unless, + void, when) import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU import qualified Data.DList as D -import Data.Either (rights) +import Data.Either (lefts, rights) import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet import Data.Maybe (fromJust, isJust) @@ -57,8 +57,6 @@ data PostService d = PostService -- ^ for each tag store the subscribers + their queue , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time - --, ownPosts :: TVar (HSet.HashSet Text) - -- ^ just store the existence of posts for saving memory, , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously , postFetchQueue :: TQueue PostID @@ -325,6 +323,7 @@ tagSubscribe serv hashtag origin = do let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv) -- setup subscription entry _ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime + --liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag pure $ round leaseTime @@ -427,10 +426,12 @@ clientSubscribeTo serv tag = do Left (FailureResponse _ fresp) |(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) + --putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost doSubscribe newRes False Left err -> pure . Left . show $ err Right lease -> do atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease) + --putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost pure . Right $ lease ) lookupResponse @@ -735,7 +736,11 @@ relayWorker serv = forever $ do forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do runningJobs <- mapM async jobset -- so far just dropping failed attempts, TODO: retry mechanism - successfulResults <- rights <$> mapM waitCatch runningJobs + results <- mapM waitCatch runningJobs + let + successfulResults = rights results + unsuccessfulResults = lefts results + unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!") putStrLn $ "successfully relayed " <> show (length successfulResults) pure () @@ -829,7 +834,7 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop -- evaluate stats rate and replace server stats -- persistently store in a TVar so it can be retrieved later by the DHT let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) - let rateStats = evaluateStats timePassed summedStats + rateStats = evaluateStats timePassed summedStats atomically $ writeTVar (loadStats serv) rateStats -- and now what? write a log to file -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum