diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index c1ea936..baa2b70 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -19,7 +19,7 @@ import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (fromJust, isJust) +import Data.Maybe (isJust, fromJust) import Data.String (fromString) import Data.Text.Lazy (Text) import qualified Data.Text.Lazy as Txt @@ -54,11 +54,9 @@ data PostService d = PostService , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously , postFetchQueue :: TQueue PostID - -- ^ queue of posts to be fetched , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , httpMan :: HTTP.Manager , statsQueue :: TQueue StatsEvent - , loadStats :: TVar RelayStats } deriving (Typeable) @@ -88,7 +86,6 @@ instance DHT d => Service PostService d where migrationsInProgress' <- newTVarIO HMap.empty httpMan' <- HTTP.newManager HTTP.defaultManagerSettings statsQueue' <- newTQueueIO - loadStats' <- newTVarIO emptyStats let thisService = PostService { serviceConf = conf @@ -102,7 +99,6 @@ instance DHT d => Service PostService d where , migrationsInProgress = migrationsInProgress' , httpMan = httpMan' , statsQueue = statsQueue' - , loadStats = loadStats' } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings @@ -113,11 +109,7 @@ instance DHT d => Service PostService d where concurrently_ -- web server (Warp.runSettings warpSettings $ postServiceApplication thisService) - $ concurrently - -- post queue processing - (processIncomingPosts thisService) - -- statistics/ measurements - (launchStatsThreads thisService) + (processIncomingPosts thisService) -- update thread ID after fork atomically $ writeTVar threadVar servThreadID pure thisService @@ -321,15 +313,15 @@ tagUnsubscribe serv hashtag origin = do clientAPI :: Proxy PostServiceAPI clientAPI = Proxy -relayInboxClient - :<|> subscriptionDeliveryClient - :<|> postFetchClient - :<|> postMultiFetchClient - :<|> postInboxClient - :<|> tagDeliveryClient - :<|> tagSubscribeClient - :<|> tagUnsubscribeClient - = client clientAPI +relayInboxClient :: Text -> Text -> ClientM NoContent +relayInboxClient :<|> subscriptionDeliveryClient + :<|> postFetchClient + :<|> postMultiFetchClient + :<|> postInboxClient + :<|> tagDeliveryClient + :<|> tagSubscribeClient + :<|> tagUnsubscribeClient + = client clientAPI -- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] @@ -614,64 +606,63 @@ fetchTagPosts serv = forever $ do -- ======= statistics/measurement and logging ======= data StatsEventType = PostPublishEvent - | RelayReceiveEvent - | RelayDeliveryEvent - | IncomingPostFetchEvent - deriving (Enum, Show, Eq) + -- ^ initial publishing of a post by an instance + | RelayReceiveEvent + -- ^ receiving of posts because of being the responsible relay, for estimation of \tau_t + -- TODO: record for which hashtag + | RelayDeliveryEvent + -- ^ delivering (or at least attempt) a post to a subscriber + | IncomingPostFetchEvent + -- ^ another instance fetches a post from this instance + deriving (Enum, Show, Eq) -- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key data StatsEvent = StatsEvent StatsEventType Int NodeID - --- | measured rates of relay performance --- TODO: maybe include other metrics in here as well, like number of subscribers? -data RelayStats = RelayStats - { relayReceiveRates :: RingMap NodeID Double - -- ^ rate of incoming posts in the responsibility of this relay - , relayDeliveryRates :: RingMap NodeID Double - -- ^ rate of relayed outgoing posts - , postFetchRate :: Double -- no need to differentiate between tags - -- ^ number of post-fetches delivered - , postPublishRate :: Double - -- ^ rate of initially publishing posts through this instance - } - - -- TODO: make delay configurable -statsEvalDelay = 300000 +statsAccuDelay = 300000 - -launchStatsThreads :: PostService d -> IO () -launchStatsThreads serv = do - -- create shared accumulator - sharedAccum <- newTVarIO emptyStats - concurrently_ - (accumulateStatsThread sharedAccum $ statsQueue serv) - (evaluateStatsThread serv sharedAccum) - - --- | Read stats events from queue and add them to a shared accumulator. --- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result. -accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO () -accumulateStatsThread statsAccumulator statsQ = forever $ do - -- blocks until stats event arrives - event <- atomically $ readTQueue statsQ - -- add the event number to current accumulator - atomically $ modifyTVar' statsAccumulator $ statsAdder event - - --- | add incoming stats events to accumulator value -statsAdder :: StatsEvent -> RelayStats -> RelayStats -statsAdder event stats = case event of - StatsEvent PostPublishEvent num _ -> - stats {postPublishRate = fromIntegral num + postPublishRate stats} - StatsEvent RelayReceiveEvent num key -> - stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)} - StatsEvent RelayDeliveryEvent num key -> - stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)} - StatsEvent IncomingPostFetchEvent num _ -> - stats {postFetchRate = fromIntegral num + postFetchRate stats} +-- | periodically flush the stats queue and accumulate all events inside +accumulateStatsThread :: TQueue StatsEvent -> IO () +accumulateStatsThread statsQ = getPOSIXTime >>= flushLoop where + flushLoop previousRun = do + now <- getPOSIXTime + -- TODO: instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result + -- but how to achieve the periodicity when blocking on a queue? + -- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen") + threadDelay statsAccuDelay + latestEvents <- atomically $ flushTQueue statsQ + -- accumulate the events + -- and now what? write a log to file, probably as a forkIO + -- persistently store in a TVar so it can be retrieved later by the DHT + flushLoop now + + +accumulateStats :: POSIXTime -> [StatsEvent] -> RelayStats +accumulateStats timeInterval events = + -- first sum all event numbers, then divide through number of seconds passed to + -- get rate per second + RelayStats + { relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats + , relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats + , postPublishRate = postPublishRate summedStats / intervalSeconds + , postFetchRate = postFetchRate summedStats / intervalSeconds + } + where + intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12 + summedStats = foldl (\stats event -> case event of + StatsEvent PostPublishEvent num _ -> + stats {postPublishRate = fromIntegral num + postPublishRate stats} + StatsEvent RelayReceiveEvent num key -> + stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)} + StatsEvent RelayDeliveryEvent num key -> + stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)} + StatsEvent IncomingPostFetchEvent num _ -> + stats {postFetchRate = fromIntegral num + postFetchRate stats} + ) + emptyStats + events sumIfEntryExists = addRMapEntryWith (\newVal oldVal -> let toInsert = fromJust $ extractRingEntry newVal in @@ -682,45 +673,8 @@ statsAdder event stats = case event of _ -> error "RingMap nested too deeply" ) +-- idea: first just sum with foldl, and then map the time division over all values --- Periodically exchange the accumulated statistics with empty ones, evaluate them --- and make them the current statistics of the service. -evaluateStatsThread :: PostService d -> TVar RelayStats -> IO () -evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop - where - loop previousTs = do - threadDelay statsEvalDelay - -- get and reset the stats accumulator - summedStats <- atomically $ do - stats <- readTVar statsAcc - writeTVar statsAcc emptyStats - pure stats - -- as the transaction might retry several times, current time needs to - -- be read afterwards - now <- getPOSIXTime - -- evaluate stats rate and replace server stats - atomically . writeTVar (loadStats serv) . evaluateStats (now - previousTs) $ summedStats - -- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen") - -- and now what? write a log to file, probably as a forkIO - -- persistently store in a TVar so it can be retrieved later by the DHT - loop now - - --- | Evaluate the accumulated statistic events: Currently mostly calculates the event --- rates by dividing through the collection time frame -evaluateStats :: POSIXTime -> RelayStats -> RelayStats -evaluateStats timeInterval summedStats = - -- first sum all event numbers, then divide through number of seconds passed to - -- get rate per second - RelayStats - { relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats - , relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats - , postPublishRate = postPublishRate summedStats / intervalSeconds - , postFetchRate = postFetchRate summedStats / intervalSeconds - } - where - -- TODO: take speedup into account - intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12 emptyStats :: RelayStats @@ -730,3 +684,16 @@ emptyStats = RelayStats , postFetchRate = 0 , postPublishRate = 0 } + +-- | measured rates of relay performance +-- TODO: maybe include other metrics in here as well, like number of subscribers? +data RelayStats = RelayStats + { relayReceiveRates :: RingMap NodeID Double + -- ^ rate of incoming posts in the responsibility of this relay + , relayDeliveryRates :: RingMap NodeID Double + -- ^ rate of relayed outgoing posts + , postFetchRate :: Double -- no need to differentiate between tags + -- ^ number of post-fetches delivered + , postPublishRate :: Double + -- ^ rate of initially publishing posts through this instance + }