Compare commits

...

6 commits

Author SHA1 Message Date
Trolli Schmittlauch c823e6357a accumulate all statistic/ measurement events to a measurement summary
- RingMap can now be mapped over
2020-09-07 13:00:15 +02:00
Trolli Schmittlauch 4d2d6faf1b Merge pull request 'Improve general readability' (#69) from Hecate/Hash2Pub:readability into mainline
brings several readability enhancements:

- Text is always unqualified
- The Servant API declaration is moved to its own file
- Redundant imports are removed
2020-09-05 16:27:23 +02:00
Hécate 7d833e064b Improve readability 2020-09-05 15:45:47 +02:00
Trolli Schmittlauch fa78c6fc43 clarify different nix-shell environments in readme 2020-09-05 15:01:46 +02:00
Hécate d3e5eac5c5 Unsused imports and syntax error 2020-09-05 12:41:18 +02:00
Trolli Schmittlauch 60f5780742 Merge branch 'measurement_logging' into mainline
more values are parametrised through FediChordConfig
2020-09-05 12:31:40 +02:00
5 changed files with 186 additions and 56 deletions

View file

@ -14,4 +14,7 @@ The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
The project and its developent environment are built with [Nix](https://nixos.org/nix/).
The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once.
The development environment can be entered with `nix-shell shell-minimal.nix`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build" shell-minimal.nix` to do both steps at once.
While the `shell-minimal.nix` environment contains everything necessary for building and testing this project, the `shell.nix` additionally contains the Haskell IDE engine *hie* and the documentation for all used Haskell packages for more convenient development.
Be aware that these need to be build from source and can take a very long time to build.

View file

@ -12,16 +12,16 @@ import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, void,
when)
import Control.Monad (foldM, forM_, forever, void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe, isJust)
import Data.Maybe (isJust, fromJust)
import Data.String (fromString)
import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC), normalize)
import Data.Time.Clock.POSIX
@ -36,6 +36,7 @@ import Servant
import Servant.Client
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService.API
import Hash2Pub.RingMap
@ -48,19 +49,20 @@ data PostService d = PostService
-- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text)
, ownPosts :: TVar (HSet.HashSet Text)
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
}
deriving (Typeable)
type Hashtag = Txt.Text
type PostID = Txt.Text
type PostContent = Txt.Text
type Hashtag = Text
type PostID = Text
type PostContent = Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
@ -83,9 +85,10 @@ instance DHT d => Service PostService d where
postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
let
thisService = PostService {
serviceConf = conf
thisService = PostService
{ serviceConf = conf
, baseDHT = dht
, serviceThread = threadVar
, subscribers = subscriberVar
@ -95,7 +98,8 @@ instance DHT d => Service PostService d where
, postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
}
, statsQueue = statsQueue'
}
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- Run 'concurrently_' from another thread to be able to return the
@ -131,38 +135,13 @@ postServiceApplication :: DHT d => PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy
-- ========= constants ===========
placeholderPost :: Txt.Text
placeholderPost :: Text
placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
postServer :: DHT d => PostService d -> Server PostServiceAPI
postServer service = relayInbox service
:<|> subscriptionDelivery service
@ -174,7 +153,7 @@ postServer service = relayInbox service
:<|> tagUnsubscribe service
relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent
relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent
relayInbox serv tag posts = do
let
-- skip checking whether the post actually contains the tag, just drop full post
@ -202,7 +181,7 @@ newtype UnhandledTagException = UnhandledTagException String
instance Exception UnhandledTagException
subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text
subscriptionDelivery serv senderID subList = do
let
tagSubs = Txt.lines subList
@ -236,7 +215,7 @@ subscriptionDelivery serv senderID subList = do
Right _ -> pure ""
-- TODO: check and only accept tags in own (future?) responsibility
where
processTag :: TVar RelayTags -> Txt.Text -> STM ()
processTag :: TVar RelayTags -> Text -> STM ()
processTag subscriberSTM tagData = do
let
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
@ -247,7 +226,7 @@ subscriptionDelivery serv senderID subList = do
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postFetch :: PostService d -> Text -> Handler Text
postFetch serv postID = do
postSet <- liftIO . readTVarIO . ownPosts $ serv
if HSet.member postID postSet
@ -256,7 +235,7 @@ postFetch serv postID = do
else throwError $ err404 { errBody = "No post found with this ID" }
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch :: PostService d -> Text -> Handler Text
postMultiFetch serv postIDs = do
let idList = Txt.lines postIDs
postSet <- liftIO . readTVarIO . ownPosts $ serv
@ -268,7 +247,7 @@ postMultiFetch serv postIDs = do
) "" idList
postInbox :: PostService d -> Txt.Text -> Handler NoContent
postInbox :: PostService d -> Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
@ -278,13 +257,13 @@ postInbox serv post = do
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
liftIO $ forM_ (containedTags :: [Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery :: PostService d -> Text -> Text -> Handler Text
tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
@ -295,7 +274,7 @@ tagDelivery serv hashtag posts = do
pure ()
pure $ "Received a postID for tag " <> hashtag
tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer
tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
@ -314,7 +293,7 @@ tagSubscribe serv hashtag origin = do
pure $ round leaseTime
tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text
tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
@ -334,8 +313,15 @@ tagUnsubscribe serv hashtag origin = do
clientAPI :: Proxy PostServiceAPI
clientAPI = Proxy
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
relayInboxClient :: Text -> Text -> ClientM NoContent
relayInboxClient :<|> subscriptionDeliveryClient
:<|> postFetchClient
:<|> postMultiFetchClient
:<|> postInboxClient
:<|> tagDeliveryClient
:<|> tagSubscribeClient
:<|> tagUnsubscribeClient
= client clientAPI
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
@ -543,7 +529,7 @@ lookupTagSubscriptions tag = rMapLookup (hashtagToId tag)
-- normalise the unicode representation of a string to NFC
normaliseTag :: Txt.Text -> Txt.Text
normaliseTag :: Text -> Text
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
@ -601,7 +587,7 @@ fetchTagPosts serv = forever $ do
-- TODO: batching, retry
-- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of
Right response ->
@ -616,3 +602,98 @@ fetchTagPosts serv = forever $ do
-- TODO error handling, retry
pure ()
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
-- ^ initial publishing of a post by an instance
| RelayReceiveEvent
-- ^ receiving of posts because of being the responsible relay, for estimation of \tau_t
-- TODO: record for which hashtag
| RelayDeliveryEvent
-- ^ delivering (or at least attempt) a post to a subscriber
| IncomingPostFetchEvent
-- ^ another instance fetches a post from this instance
deriving (Enum, Show, Eq)
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
data StatsEvent = StatsEvent StatsEventType Int NodeID
-- TODO: make delay configurable
statsAccuDelay = 300000
-- | periodically flush the stats queue and accumulate all events inside
accumulateStatsThread :: TQueue StatsEvent -> IO ()
accumulateStatsThread statsQ = getPOSIXTime >>= flushLoop
where
flushLoop previousRun = do
now <- getPOSIXTime
-- TODO: instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result
-- but how to achieve the periodicity when blocking on a queue?
-- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen")
threadDelay statsAccuDelay
latestEvents <- atomically $ flushTQueue statsQ
-- accumulate the events
-- and now what? write a log to file, probably as a forkIO
-- persistently store in a TVar so it can be retrieved later by the DHT
flushLoop now
accumulateStats :: POSIXTime -> [StatsEvent] -> RelayStats
accumulateStats timeInterval events =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats
, relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12
summedStats = foldl (\stats event -> case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
)
emptyStats
events
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- idea: first just sum with foldl, and then map the time division over all values
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}
-- | measured rates of relay performance
-- TODO: maybe include other metrics in here as well, like number of subscribers?
data RelayStats = RelayStats
{ relayReceiveRates :: RingMap NodeID Double
-- ^ rate of incoming posts in the responsibility of this relay
, relayDeliveryRates :: RingMap NodeID Double
-- ^ rate of relayed outgoing posts
, postFetchRate :: Double -- no need to differentiate between tags
-- ^ number of post-fetches delivered
, postPublishRate :: Double
-- ^ rate of initially publishing posts through this instance
}

View file

@ -0,0 +1,37 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
module Hash2Pub.PostService.API where
import Data.Text.Lazy (Text)
import Servant
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
-- | needed for guiding type inference
exposedPostServiceAPI :: Proxy PostServiceAPI
exposedPostServiceAPI = Proxy

View file

@ -1,7 +1,5 @@
module Hash2Pub.ProtocolTypes where
import qualified Data.Map as Map
import Data.Maybe (mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX (POSIXTime)

View file

@ -23,7 +23,7 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
a == b = getRingMap a == getRingMap b
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows "RingMap " (show $ getRingMap rmap)
show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
@ -133,7 +133,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k)
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
addRMapEntryWith :: (Bounded k, Ord k)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value
-> k -- ^ key
-> a -- ^ value
-> RingMap k a
@ -207,7 +207,7 @@ takeEntriesUntil_ :: (Integral i, Bounded k, Ord k)
-> Maybe i -- possible number limit
-> [a]
-> [a]
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc
takeEntriesUntil_ _rmap' _getterFunc' _havingReached _previousEntry (Just remaining) takeAcc
-- length limit reached
| remaining <= 0 = takeAcc
takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =
@ -247,3 +247,14 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
-> RingMap k a
-> [a]
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
-- | map a function over all payload values of a 'RingMap'
mapRMap :: (Bounded k, Ord k, Num k)
=> (a -> b) -> RingMap k a -> RingMap k b
mapRMap f = RingMap . Map.map traversingF . getRingMap
where
--traversingF :: RingEntry k a -> RingEntry k b
traversingF (KeyEntry a) = KeyEntry (f a)
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing