Merge branch 'measurement_logging' into mainline

closes #11, implements #60
This commit is contained in:
Trolli Schmittlauch 2020-09-17 02:19:04 +02:00
commit b48b7a7bba
8 changed files with 396 additions and 86 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist, formatting
ghc-options: -Wall -Wpartial-fields -O2 ghc-options: -Wall -Wpartial-fields -O2

View file

@ -33,7 +33,7 @@ parseSchedule = fmap (parseEntry . Txt.split (== ';')) . Txt.lines
where where
parseEntry [delayT, contactT, tag] = parseEntry [delayT, contactT, tag] =
(read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT) (read $ Txt.unpack delayT, tag, read $ Txt.unpack contactT)
parseEntry _ = error "invalid schedule input format" parseEntry entry = error $ "invalid schedule input format: " <> show entry
executeSchedule :: Int -- ^ speedup factor executeSchedule :: Int -- ^ speedup factor
-> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))] -> [(Int, Hashtag, (String, Int))] -- ^ [(delay in microseconds, hashtag, (hostname, port))]

View file

@ -54,12 +54,12 @@ readConfig = do
bootstrapHost : bootstrapPortString : _ -> bootstrapHost : bootstrapPortString : _ ->
[(bootstrapHost, read bootstrapPortString)] [(bootstrapHost, read bootstrapPortString)]
_ -> [] _ -> []
fConf = FediChordConf { fConf = FediChordConf
confDomain = confDomainString { confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString , confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString , confDhtPort = read portString
, confBootstrapNodes = confBootstrapNodes' , confBootstrapNodes = confBootstrapNodes'
, confStabiliseInterval = 60 * 10^6 , confStabiliseInterval = 80 * 10^6
, confBootstrapSamplingInterval = 180 * 10^6 `div` speedup , confBootstrapSamplingInterval = 180 * 10^6 `div` speedup
, confMaxLookupCacheAge = 300 / fromIntegral speedup , confMaxLookupCacheAge = 300 / fromIntegral speedup
, confJoinAttemptsInterval = 60 * 10^6 `div` speedup , confJoinAttemptsInterval = 60 * 10^6 `div` speedup
@ -68,10 +68,13 @@ readConfig = do
, confRequestTimeout = 5 * 10^6 `div` speedup , confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3 , confRequestRetries = 3
} }
sConf = ServiceConf { sConf = ServiceConf
confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup { confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup
, confServicePort = read servicePortString , confServicePort = read servicePortString
, confServiceHost = confDomainString , confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"
, confSpeedupFactor = speedup
, confStatsEvalDelay = 120 * 10^6 `div` speedup
} }
pure (fConf, sConf) pure (fConf, sConf)

View file

@ -587,7 +587,6 @@ sendQueryIdMessages targetID ns lParam targets = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
-- ToDo: make attempts and timeout configurable
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)) targets )) targets
@ -866,7 +865,7 @@ mkServerSocket ip port = do
sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port) sockAddr <- addrAddress <$> resolve (Just $ show . fromHostAddress6 $ ip) (Just port)
sock <- socket AF_INET6 Datagram defaultProtocol sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1 setSocketOption sock IPv6Only 1
bind sock sockAddr bind sock sockAddr `catch` (\e -> putStrLn $ "Caught exception while bind " <> show sock <> " " <> show sockAddr <> ": " <> show (e :: SomeException))
pure sock pure sock
-- | create a UDP datagram socket, connected to a destination. -- | create a UDP datagram socket, connected to a destination.
@ -882,6 +881,6 @@ mkSendSocket srcIp dest destPort = do
setSocketOption sendSock IPv6Only 1 setSocketOption sendSock IPv6Only 1
-- bind to the configured local IP to make sure that outgoing packets are sent from -- bind to the configured local IP to make sure that outgoing packets are sent from
-- this source address -- this source address
bind sendSock srcAddr bind sendSock srcAddr `catch` (\e -> putStrLn $ "Caught exception while mkSendSocket bind " <> show sendSock <> " " <> show srcAddr <> ": " <> show (e :: SomeException))
connect sendSock destAddr connect sendSock destAddr
pure sendSock pure sendSock

View file

@ -345,7 +345,6 @@ nodeCacheWriter nsSTM =
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones -- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do nodeCacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin"
-- get cache -- get cache
(ns, cache, maxEntryAge) <- atomically $ do (ns, cache, maxEntryAge) <- atomically $ do
ns <- readTVar nsSTM ns <- readTVar nsSTM
@ -398,7 +397,6 @@ nodeCacheVerifyThread nsSTM = forever $ do
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
) )
putStrLn "cache verify run: end"
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
@ -465,7 +463,6 @@ stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do stabiliseThread nsSTM = forever $ do
oldNs <- readTVarIO nsSTM oldNs <- readTVarIO nsSTM
putStrLn "stabilise run: begin"
-- iterate through the same snapshot, collect potential new neighbours -- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of -- and nodes to be deleted, and modify these changes only at the end of
@ -544,7 +541,6 @@ stabiliseThread nsSTM = forever $ do
) )
newPredecessor newPredecessor
putStrLn "stabilise run: end"
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
threadDelay stabiliseDelay threadDelay stabiliseDelay
where where

View file

@ -457,6 +457,12 @@ data ServiceConf = ServiceConf
-- ^ listening port for service -- ^ listening port for service
, confServiceHost :: String , confServiceHost :: String
-- ^ hostname of service -- ^ hostname of service
, confLogfilePath :: String
-- ^ where to store the (measurement) log file
, confStatsEvalDelay :: Int
-- ^ delay between statistic rate measurement samplings, in microseconds
, confSpeedupFactor :: Int
-- While the speedup factor needs to be already included in all
} }
class DHT d where class DHT d where

View file

@ -12,25 +12,31 @@ import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception (Exception (..), try) import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM_, forever, void, when) import Control.Monad (foldM, forM, forM_, forever, unless,
void, when)
import Control.Monad.IO.Class (liftIO) import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import qualified Data.DList as D
import Data.Either (lefts, rights)
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (isJust) import Data.Maybe (fromJust, isJust)
import Data.String (fromString) import Data.String (fromString)
import Data.Text.Lazy (Text) import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Text.Normalize (NormalizationMode (NFC), normalize)
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Typeable (Typeable) import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT import qualified Network.HTTP.Types as HTTPT
import System.IO
import System.Random import System.Random
import Text.Read (readEither) import Text.Read (readEither)
import Formatting (fixed, format, int, (%))
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
import Servant.Client import Servant.Client
@ -38,7 +44,9 @@ import Servant.Client
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.PostService.API import Hash2Pub.PostService.API
import Hash2Pub.RingMap import Hash2Pub.RingMap
import Hash2Pub.Utils
import Debug.Trace
data PostService d = PostService data PostService d = PostService
{ serviceConf :: ServiceConf { serviceConf :: ServiceConf
@ -49,13 +57,16 @@ data PostService d = PostService
-- ^ for each tag store the subscribers + their queue -- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time -- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Text)
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent) , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously -- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID , postFetchQueue :: TQueue PostID
-- ^ queue of posts to be fetched
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager , httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats
-- ^ current load stats, replaced periodically
, logFileHandle :: Handle
} }
deriving (Typeable) deriving (Typeable)
@ -79,26 +90,38 @@ instance DHT d => Service PostService d where
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty --ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats
loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering
let let
thisService = PostService { thisService = PostService
serviceConf = conf { serviceConf = conf
, baseDHT = dht , baseDHT = dht
, serviceThread = threadVar , serviceThread = threadVar
, subscribers = subscriberVar , subscribers = subscriberVar
, ownSubscriptions = ownSubsVar , ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar --, ownPosts = ownPostVar
, relayInQueue = relayInQueue' , relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue' , postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress' , migrationsInProgress = migrationsInProgress'
, httpMan = httpMan' , httpMan = httpMan'
, statsQueue = statsQueue'
, loadStats = loadStats'
, logFileHandle = loggingFile
} }
port' = fromIntegral (confServicePort conf) port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- log a start message, this also truncates existing files
TxtI.hPutStrLn loggingFile $ Txt.unlines
[ "# Starting mock relay implementation"
, "#time stamp ; relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions"
]
-- Run 'concurrently_' from another thread to be able to return the -- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'. -- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well. -- Terminating that parent thread will make all child threads terminate as well.
@ -106,7 +129,11 @@ instance DHT d => Service PostService d where
concurrently_ concurrently_
-- web server -- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService) (Warp.runSettings warpSettings $ postServiceApplication thisService)
(processIncomingPosts thisService) $ concurrently
-- background processing workers
(launchWorkerThreads thisService)
-- statistics/ measurements
(launchStatsThreads thisService)
-- update thread ID after fork -- update thread ID after fork
atomically $ writeTVar threadVar servThreadID atomically $ writeTVar threadVar servThreadID
pure thisService pure thisService
@ -150,6 +177,7 @@ postServer service = relayInbox service
:<|> tagUnsubscribe service :<|> tagUnsubscribe service
-- | delivery endpoint: receive posts of a handled tag and enqueue them for relaying
relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent
relayInbox serv tag posts = do relayInbox serv tag posts = do
let let
@ -166,8 +194,10 @@ relayInbox serv tag posts = do
-- if noone subscribed to the tag, nothing needs to be done -- if noone subscribed to the tag, nothing needs to be done
(pure ()) (pure ())
-- otherwise enqueue posts into broadcast queue of the tag -- otherwise enqueue posts into broadcast queue of the tag
(\queue -> (\queue -> do
liftIO $ forM_ postIDs (atomically . writeTChan queue) liftIO $ forM_ postIDs (atomically . writeTChan queue)
-- report the received post for statistic purposes
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag)
) )
broadcastChan broadcastChan
pure NoContent pure NoContent
@ -178,6 +208,7 @@ newtype UnhandledTagException = UnhandledTagException String
instance Exception UnhandledTagException instance Exception UnhandledTagException
-- | delivery endpoint: receives a list of subscribers of tags and their outstanding queues for migration
subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text
subscriptionDelivery serv senderID subList = do subscriptionDelivery serv senderID subList = do
let let
@ -223,27 +254,30 @@ subscriptionDelivery serv senderID subList = do
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
-- | endpoint for fetching a post by its ID
postFetch :: PostService d -> Text -> Handler Text postFetch :: PostService d -> Text -> Handler Text
postFetch serv postID = do postFetch serv _ = do
postSet <- liftIO . readTVarIO . ownPosts $ serv -- decision: for saving memory do not store published posts, just
if HSet.member postID postSet -- pretend there is a post for each requested ID
-- decision: always return the same placeholder post liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant
then pure placeholderPost pure placeholderPost
else throwError $ err404 { errBody = "No post found with this ID" }
-- | endpoint for fetching multiple posts of this instance by their IDs
postMultiFetch :: PostService d -> Text -> Handler Text postMultiFetch :: PostService d -> Text -> Handler Text
postMultiFetch serv postIDs = do postMultiFetch serv postIDs = do
let idList = Txt.lines postIDs let
postSet <- liftIO . readTVarIO . ownPosts $ serv idList = Txt.lines postIDs
-- look up existence of all given post IDs, fail if even one is missing -- decision: for saving memory do not store published posts, just
foldM (\response postID -> -- pretend there is a post for each requested ID
if HSet.member postID postSet response = foldl (\response' _ ->
then pure $ placeholderPost <> "\n" <> response placeholderPost <> "\n" <> response'
else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList ) "" idList
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant
pure response
-- | delivery endpoint: inbox for initially publishing a post at an instance
postInbox :: PostService d -> Text -> Handler NoContent postInbox :: PostService d -> Text -> Handler NoContent
postInbox serv post = do postInbox serv post = do
-- extract contained hashtags -- extract contained hashtags
@ -251,8 +285,7 @@ postInbox serv post = do
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID -- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer) postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- add ID to own posts -- decision: for saving memory do not store published post IDs, just deliver a post for any requested ID
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag -- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Text]) (\tag -> liftIO $ forM_ (containedTags :: [Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
@ -260,6 +293,7 @@ postInbox serv post = do
pure NoContent pure NoContent
-- | delivery endpoint: receive postIDs of a certain subscribed hashtag
tagDelivery :: PostService d -> Text -> Text -> Handler Text tagDelivery :: PostService d -> Text -> Text -> Handler Text
tagDelivery serv hashtag posts = do tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts let postIDs = Txt.lines posts
@ -271,6 +305,8 @@ tagDelivery serv hashtag posts = do
pure () pure ()
pure $ "Received a postID for tag " <> hashtag pure $ "Received a postID for tag " <> hashtag
-- | receive subscription requests to a handled hashtag
tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer
tagSubscribe serv hashtag origin = do tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
@ -287,9 +323,11 @@ tagSubscribe serv hashtag origin = do
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv) let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry -- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime _ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
--liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag
pure $ round leaseTime pure $ round leaseTime
-- | receive and handle unsubscription requests regarding a handled tag
tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text
tagUnsubscribe serv hashtag origin = do tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
@ -310,8 +348,8 @@ tagUnsubscribe serv hashtag origin = do
clientAPI :: Proxy PostServiceAPI clientAPI :: Proxy PostServiceAPI
clientAPI = Proxy clientAPI = Proxy
relayInboxClient :: Text -> Text -> ClientM NoContent relayInboxClient
relayInboxClient :<|> subscriptionDeliveryClient :<|> subscriptionDeliveryClient
:<|> postFetchClient :<|> postFetchClient
:<|> postMultiFetchClient :<|> postMultiFetchClient
:<|> postInboxClient :<|> postInboxClient
@ -388,10 +426,12 @@ clientSubscribeTo serv tag = do
Left (FailureResponse _ fresp) Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup |(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
--putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost
doSubscribe newRes False doSubscribe newRes False
Left err -> pure . Left . show $ err Left err -> pure . Left . show $ err
Right lease -> do Right lease -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease) atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
--putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost
pure . Right $ lease pure . Right $ lease
) )
lookupResponse lookupResponse
@ -525,15 +565,37 @@ lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) lookupTagSubscriptions tag = rMapLookup (hashtagToId tag)
-- normalise the unicode representation of a string to NFC -- normalise the unicode representation of a string to NFC and convert to lower case
normaliseTag :: Text -> Text normaliseTag :: Text -> Text
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict normaliseTag = Txt.toLower . Txt.fromStrict . normalize NFC . Txt.toStrict
-- | convert a hashtag to its representation on the DHT -- | convert a hashtag to its representation on the DHT
hashtagToId :: Hashtag -> NodeID hashtagToId :: Hashtag -> NodeID
hashtagToId = genKeyID . Txt.unpack hashtagToId = genKeyID . Txt.unpack
readUpToTChan :: Int -> TChan a -> STM [a]
readUpToTChan 0 _ = pure []
readUpToTChan n chan = do
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTChan (pred n) chan
pure (val:moreReads)
readUpToTQueue :: Int -> TQueue a -> STM [a]
readUpToTQueue 0 _ = pure []
readUpToTQueue n q = do
readFromQueue <- tryReadTQueue q
case readFromQueue of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTQueue (pred n) q
pure (val:moreReads)
-- | define how to convert all showable types to PlainText -- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
@ -545,37 +607,79 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
-- ====== worker threads ====== -- ====== worker threads ======
-- TODO: make configurable
numParallelDeliveries = 10
launchWorkerThreads :: DHT d => PostService d -> IO ()
launchWorkerThreads serv = concurrently_
(processIncomingPosts serv)
$ concurrently_
(purgeSubscriptionsThread serv)
$ concurrently_
(fetchTagPosts serv)
(relayWorker serv)
-- | periodically remove expired subscription entries from relay subscribers
purgeSubscriptionsThread :: PostService d -> IO ()
purgeSubscriptionsThread serv = forever $ do
-- read config
now <- getPOSIXTime
let
purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10
-- no need to atomically lock this, as newly incoming subscriptions do not
-- need to be purged
tagMap <- readTVarIO $ subscribers serv
forM_ tagMap $ \(subscriberMapSTM, _, _) ->
-- but each subscriberMap needs to be modified atomically
atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now)
threadDelay $ fromEnum purgeInterval `div` 10^6
-- | process the pending relay inbox of incoming posts from the internal queue: -- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it -- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO () processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do processIncomingPosts serv = forever $ do
-- blocks until available -- blocks until available
-- TODO: process multiple in parallel deliveriesToProcess <- atomically $ do
(tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv readResult <- readUpToTQueue numParallelDeliveries $ relayInQueue serv
if null readResult
then retry
else pure readResult
runningJobs <- forM deliveriesToProcess $ \(tag, pID, pContent) -> async $ do
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of case lookupRes of
-- no vserver active => wait and retry -- no vserver active => wait and retry
Nothing -> threadDelay $ 10 * 10^6 Nothing -> threadDelay (10 * 10^6) >> pure (Left "no vserver active")
Just (responsibleHost, responsiblePort) -> do Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of case resp of
Left err -> do Left err -> do
putStrLn $ "Error: " <> show err
-- 410 error indicates outdated responsibility mapping -- 410 error indicates outdated responsibility mapping
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post -- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
-- TODO: keep track of maximum retries -- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
pure . Left $ "Error: " <> show err
Right _ -> do Right _ -> do
-- TODO: stats
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag -- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag
when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $
void $ clientSubscribeTo serv tag void $ clientSubscribeTo serv tag
-- for evaluation, return the tag of the successfully forwarded post
pure $ Right tag
-- collect async results
results <- mapM waitCatch runningJobs
-- report the count of published posts for statistics
atomically . writeTQueue (statsQueue serv) $ StatsEvent PostPublishEvent (length . rights $ results) 0 -- hashtag published to doesn't matter
pure ()
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
fetchTagPosts :: DHT d => PostService d -> IO () fetchTagPosts :: DHT d => PostService d -> IO ()
@ -588,14 +692,193 @@ fetchTagPosts serv = forever $ do
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of case resp of
Right response -> Right response ->
if HTTPT.statusCode (HTTP.responseStatus response) == 200
then
-- success, TODO: statistics
putStrLn "post fetch success"
else
-- TODO error handling, retry -- TODO error handling, retry
--if HTTPT.statusCode (HTTP.responseStatus response) == 200
-- then
-- -- success, TODO: statistics
-- else
pure () pure ()
Left _ -> Left _ ->
-- TODO error handling, retry -- TODO error handling, retry
pure () pure ()
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
-- atomically (to be able to retry) fold a list of due delivery actions
jobsToProcess <- atomically $ do
subscriptionMap <- readTVar $ subscribers serv
jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpToTChan 500 postChan
let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append relay push job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` (do
deliveryResult <- postDeliveryAction
either
(const $ pure ())
-- on successful push, record that event for statistics
(const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag))
deliveryResult
pure deliveryResult
)
else jobAcc'
) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap
-- if no relay jobs, then retry
if null jobList
then retry
else pure jobList
-- when processing the list, send several deliveries in parallel
forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do
runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism
results <- mapM waitCatch runningJobs
let
successfulResults = rights results
unsuccessfulResults = lefts results
unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!")
putStrLn $ "successfully relayed " <> show (length successfulResults)
pure ()
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
| RelayReceiveEvent
| RelayDeliveryEvent
| IncomingPostFetchEvent
deriving (Enum, Show, Eq)
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
data StatsEvent = StatsEvent StatsEventType Int NodeID
deriving (Show, Eq)
-- | measured rates of relay performance
-- TODO: maybe include other metrics in here as well, like number of subscribers?
data RelayStats = RelayStats
{ relayReceiveRates :: RingMap NodeID Double
-- ^ rate of incoming posts in the responsibility of this relay
, relayDeliveryRates :: RingMap NodeID Double
-- ^ rate of relayed outgoing posts
, postFetchRate :: Double -- no need to differentiate between tags
-- ^ number of post-fetches delivered
, postPublishRate :: Double
-- ^ rate of initially publishing posts through this instance
}
deriving (Show, Eq)
launchStatsThreads :: PostService d -> IO ()
launchStatsThreads serv = do
-- create shared accumulator
sharedAccum <- newTVarIO emptyStats
concurrently_
(accumulateStatsThread sharedAccum $ statsQueue serv)
(evaluateStatsThread serv sharedAccum)
-- | Read stats events from queue and add them to a shared accumulator.
-- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result.
accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO ()
accumulateStatsThread statsAccumulator statsQ = forever $ do
-- blocks until stats event arrives
event <- atomically $ readTQueue statsQ
-- add the event number to current accumulator
atomically $ modifyTVar' statsAccumulator $ statsAdder event
-- | add incoming stats events to accumulator value
statsAdder :: StatsEvent -> RelayStats -> RelayStats
statsAdder event stats = case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
where
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- Periodically exchange the accumulated statistics with empty ones, evaluate them
-- and make them the current statistics of the service.
evaluateStatsThread :: PostService d -> TVar RelayStats -> IO ()
evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
where
loop previousTs = do
threadDelay $ confStatsEvalDelay (serviceConf serv)
-- get and reset the stats accumulator
summedStats <- atomically $ do
stats <- readTVar statsAcc
writeTVar statsAcc emptyStats
pure stats
-- as the transaction might retry several times, current time needs to
-- be read afterwards
now <- getPOSIXTime
-- evaluate stats rate and replace server stats
-- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats
-- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load
subscriberSum <- sumSubscribers
TxtI.hPutStrLn (logFileHandle serv) $
format (fixed 9 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int )
(realToFrac now :: Double)
(sum . relayReceiveRates $ rateStats)
(sum . relayDeliveryRates $ rateStats)
(postPublishRate rateStats)
(postFetchRate rateStats)
subscriberSum
loop now
sumSubscribers = do
tagMap <- readTVarIO $ subscribers serv
foldM (\subscriberSum (subscriberMapSTM, _, _) -> do
subscriberMap <- readTVarIO subscriberMapSTM
pure $ subscriberSum + HMap.size subscriberMap
)
0 tagMap
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
evaluateStats timeInterval summedStats =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
, relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
intervalSeconds = realToFrac timeInterval
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}

View file

@ -25,6 +25,29 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap) show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
instance (Bounded k, Ord k) => Functor (RingMap k) where
-- | map a function over all payload values of a 'RingMap'
fmap f = RingMap . Map.map traversingF . getRingMap
where
traversingF (KeyEntry a) = KeyEntry (f a)
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
instance (Bounded k, Ord k) => Foldable (RingMap k) where
foldr f initVal = Map.foldr traversingFR initVal . getRingMap
where
traversingFR (KeyEntry a) acc = f a acc
traversingFR (ProxyEntry _ Nothing) acc = acc
traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc
foldl f initVal = Map.foldl traversingFL initVal . getRingMap
where
traversingFL acc (KeyEntry a) = f acc a
traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
-- | entry of a 'RingMap' that holds a value and can also -- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space. -- wrap around the lookup direction at the edges of the name space.
data RingEntry k a = KeyEntry a data RingEntry k a = KeyEntry a
@ -133,7 +156,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k)
rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
addRMapEntryWith :: (Bounded k, Ord k) addRMapEntryWith :: (Bounded k, Ord k)
=> (RingEntry k a -> RingEntry k a -> RingEntry k a) => (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value
-> k -- ^ key -> k -- ^ key
-> a -- ^ value -> a -- ^ value
-> RingMap k a -> RingMap k a