diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 89c14d2..baa2b70 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -19,7 +19,7 @@ import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (isJust) +import Data.Maybe (isJust, fromJust) import Data.String (fromString) import Data.Text.Lazy (Text) import qualified Data.Text.Lazy as Txt @@ -56,6 +56,7 @@ data PostService d = PostService , postFetchQueue :: TQueue PostID , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , httpMan :: HTTP.Manager + , statsQueue :: TQueue StatsEvent } deriving (Typeable) @@ -84,9 +85,10 @@ instance DHT d => Service PostService d where postFetchQueue' <- newTQueueIO migrationsInProgress' <- newTVarIO HMap.empty httpMan' <- HTTP.newManager HTTP.defaultManagerSettings + statsQueue' <- newTQueueIO let - thisService = PostService { - serviceConf = conf + thisService = PostService + { serviceConf = conf , baseDHT = dht , serviceThread = threadVar , subscribers = subscriberVar @@ -96,7 +98,8 @@ instance DHT d => Service PostService d where , postFetchQueue = postFetchQueue' , migrationsInProgress = migrationsInProgress' , httpMan = httpMan' - } + , statsQueue = statsQueue' + } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings -- Run 'concurrently_' from another thread to be able to return the @@ -599,3 +602,98 @@ fetchTagPosts serv = forever $ do -- TODO error handling, retry pure () + +-- ======= statistics/measurement and logging ======= + +data StatsEventType = PostPublishEvent + -- ^ initial publishing of a post by an instance + | RelayReceiveEvent + -- ^ receiving of posts because of being the responsible relay, for estimation of \tau_t + -- TODO: record for which hashtag + | RelayDeliveryEvent + -- ^ delivering (or at least attempt) a post to a subscriber + | IncomingPostFetchEvent + -- ^ another instance fetches a post from this instance + deriving (Enum, Show, Eq) + +-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key +data StatsEvent = StatsEvent StatsEventType Int NodeID + +-- TODO: make delay configurable +statsAccuDelay = 300000 + +-- | periodically flush the stats queue and accumulate all events inside +accumulateStatsThread :: TQueue StatsEvent -> IO () +accumulateStatsThread statsQ = getPOSIXTime >>= flushLoop + where + flushLoop previousRun = do + now <- getPOSIXTime + -- TODO: instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result + -- but how to achieve the periodicity when blocking on a queue? + -- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen") + threadDelay statsAccuDelay + latestEvents <- atomically $ flushTQueue statsQ + -- accumulate the events + -- and now what? write a log to file, probably as a forkIO + -- persistently store in a TVar so it can be retrieved later by the DHT + flushLoop now + + +accumulateStats :: POSIXTime -> [StatsEvent] -> RelayStats +accumulateStats timeInterval events = + -- first sum all event numbers, then divide through number of seconds passed to + -- get rate per second + RelayStats + { relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats + , relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats + , postPublishRate = postPublishRate summedStats / intervalSeconds + , postFetchRate = postFetchRate summedStats / intervalSeconds + } + where + intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12 + summedStats = foldl (\stats event -> case event of + StatsEvent PostPublishEvent num _ -> + stats {postPublishRate = fromIntegral num + postPublishRate stats} + StatsEvent RelayReceiveEvent num key -> + stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)} + StatsEvent RelayDeliveryEvent num key -> + stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)} + StatsEvent IncomingPostFetchEvent num _ -> + stats {postFetchRate = fromIntegral num + postFetchRate stats} + ) + emptyStats + events + sumIfEntryExists = addRMapEntryWith (\newVal oldVal -> + let toInsert = fromJust $ extractRingEntry newVal + in + case oldVal of + KeyEntry n -> KeyEntry (n + toInsert) + ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert)) + ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal) + _ -> error "RingMap nested too deeply" + ) + +-- idea: first just sum with foldl, and then map the time division over all values + + + +emptyStats :: RelayStats +emptyStats = RelayStats + { relayReceiveRates = emptyRMap + , relayDeliveryRates = emptyRMap + , postFetchRate = 0 + , postPublishRate = 0 + } + +-- | measured rates of relay performance +-- TODO: maybe include other metrics in here as well, like number of subscribers? +data RelayStats = RelayStats + { relayReceiveRates :: RingMap NodeID Double + -- ^ rate of incoming posts in the responsibility of this relay + , relayDeliveryRates :: RingMap NodeID Double + -- ^ rate of relayed outgoing posts + , postFetchRate :: Double -- no need to differentiate between tags + -- ^ number of post-fetches delivered + , postPublishRate :: Double + -- ^ rate of initially publishing posts through this instance + } diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index ae1ec15..8416278 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -133,7 +133,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k) rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards addRMapEntryWith :: (Bounded k, Ord k) - => (RingEntry k a -> RingEntry k a -> RingEntry k a) + => (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value -> k -- ^ key -> a -- ^ value -> RingMap k a @@ -247,3 +247,14 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k) -> RingMap k a -> [a] takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] + + +-- | map a function over all payload values of a 'RingMap' +mapRMap :: (Bounded k, Ord k, Num k) + => (a -> b) -> RingMap k a -> RingMap k b +mapRMap f = RingMap . Map.map traversingF . getRingMap + where + --traversingF :: RingEntry k a -> RingEntry k b + traversingF (KeyEntry a) = KeyEntry (f a) + traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry) + traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing