Merge branch 'measurement_logging' into mainline

more values are parametrised through FediChordConfig
This commit is contained in:
Trolli Schmittlauch 2020-09-05 12:31:40 +02:00
commit 60f5780742
4 changed files with 75 additions and 66 deletions

View file

@ -45,27 +45,33 @@ main = do
readConfig :: IO (FediChordConf, ServiceConf) readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do readConfig = do
confDomainString : ipString : portString : servicePortString : speedup : remainingArgs <- getArgs confDomainString : ipString : portString : servicePortString : speedupString : remainingArgs <- getArgs
-- allow starting the initial node without bootstrapping info to avoid -- allow starting the initial node without bootstrapping info to avoid
-- waiting for timeout -- waiting for timeout
let let
speedup = read speedupString
confBootstrapNodes' = case remainingArgs of confBootstrapNodes' = case remainingArgs of
bootstrapHost : bootstrapPortString : _ -> bootstrapHost : bootstrapPortString : _ ->
[(bootstrapHost, read bootstrapPortString)] [(bootstrapHost, read bootstrapPortString)]
_ -> [] _ -> []
fConf = FediChordConf { fConf = FediChordConf {
confDomain = confDomainString confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString , confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString , confDhtPort = read portString
, confBootstrapNodes = confBootstrapNodes' , confBootstrapNodes = confBootstrapNodes'
--, confStabiliseInterval = 60 , confStabiliseInterval = 60 * 10^6
, confBootstrapSamplingInterval = 180 , confBootstrapSamplingInterval = 180 * 10^6 `div` speedup
, confMaxLookupCacheAge = 300 , confMaxLookupCacheAge = 300 / fromIntegral speedup
, confJoinAttemptsInterval = 60 * 10^6 `div` speedup
, confMaxNodeCacheAge = 600 / fromIntegral speedup
, confResponsePurgeAge = 60 / fromIntegral speedup
, confRequestTimeout = 5 * 10^6 `div` speedup
, confRequestRetries = 3
} }
sConf = ServiceConf { sConf = ServiceConf {
confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer) confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup
, confServicePort = read servicePortString , confServicePort = read servicePortString
, confServiceHost = confDomainString , confServiceHost = confDomainString
} }
pure (fConf, sConf) pure (fConf, sConf)

View file

@ -488,10 +488,11 @@ requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ cu
requestJoin toJoinOn ownStateSTM = do requestJoin toJoinOn ownStateSTM = do
ownState <- readTVarIO ownStateSTM ownState <- readTVarIO ownStateSTM
prn <- readTVarIO $ parentRealNode ownState prn <- readTVarIO $ parentRealNode ownState
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ownState)
let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information -- extract own state for getting request information
responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do (cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM stateSnap <- readTVar ownStateSTM
let let
@ -584,10 +585,11 @@ sendQueryIdMessages targetID ns lParam targets = do
-- create connected sockets to all query targets and use them for request handling -- create connected sockets to all query targets and use them for request handling
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
-- ToDo: make attempts and timeout configurable -- ToDo: make attempts and timeout configurable
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (lookupMessage targetID ns Nothing) sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)) targets )) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them -- ToDo: exception handling, maybe log them
@ -635,8 +637,9 @@ requestStabilise :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to -> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do requestStabilise ns neighbour = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid -> let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -673,13 +676,14 @@ requestLeave :: LocalNodeState s
-> RemoteNodeState -- target node -> RemoteNodeState -- target node
-> IO (Either String ()) -- error or success -> IO (Either String ()) -- error or success
requestLeave ns doMigration target = do requestLeave ns doMigration target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let leavePayload = LeaveRequestPayload { let srcAddr = confIP nodeConf
leavePayload = LeaveRequestPayload {
leaveSuccessors = successors ns leaveSuccessors = successors ns
, leavePredecessors = predecessors ns , leavePredecessors = predecessors ns
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid -> responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -701,10 +705,11 @@ requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed -> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do requestPing ns target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do (\sock -> do
resp <- sendRequestTo (\rid -> resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -740,22 +745,14 @@ requestPing ns target = do
) responses ) responses
-- | 'sendRequestToWithParams' with default timeout and retries already specified.
-- Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout.
sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo = sendRequestToWithParams 5000 3
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestToWithParams :: Int -- ^ timeout in milliseconds sendRequestTo :: Int -- ^ timeout in milliseconds
-> Int -- ^ number of retries -> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending -> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses -> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- give the message a random request ID -- give the message a random request ID
randomID <- randomRIO (0, 2^32-1) randomID <- randomRIO (0, 2^32-1)
let let
@ -764,7 +761,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-- create a queue for passing received response messages back, even after a timeout -- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout -- start sendAndAck with timeout
attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests _ <- attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts pure $ Set.fromList recvdParts

View file

@ -199,7 +199,7 @@ convergenceSampleThread nsSTM = forever $ do
-- unjoined node: try joining through all bootstrapping nodes -- unjoined node: try joining through all bootstrapping nodes
else tryBootstrapJoining nsSTM >> pure () else tryBootstrapJoining nsSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
threadDelay $ delaySecs * 10^6 threadDelay delaySecs
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
@ -223,10 +223,11 @@ tryBootstrapJoining nsSTM = do
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
-- Initialise an empty cache only with the responses from a bootstrapping node -- Initialise an empty cache only with the responses from a bootstrapping node
fmap Right . sendRequestTo (lookupMessage targetID ns Nothing) fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
) )
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
@ -310,12 +311,13 @@ joinOnNewEntriesThread nsSTM = loop
where where
loop = do loop = do
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
(lookupResult, cache) <- atomically $ do (lookupResult, parentNode) <- atomically $ do
cache <- readTVar $ nodeCacheSTM nsSnap cache <- readTVar $ nodeCacheSTM nsSnap
parentNode <- readTVar $ parentRealNode nsSnap
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
-- empty cache, block until cache changes and then retry -- empty cache, block until cache changes and then retry
(FORWARD s) | Set.null s -> retry (FORWARD s) | Set.null s -> retry
result -> pure (result, cache) result -> pure (result, parentNode)
case lookupResult of case lookupResult of
-- already joined -- already joined
FOUND _ -> FOUND _ ->
@ -325,8 +327,7 @@ joinOnNewEntriesThread nsSTM = loop
joinResult <- runExceptT $ fediChordVserverJoin nsSTM joinResult <- runExceptT $ fediChordVserverJoin nsSTM
either either
-- on join failure, sleep and retry -- on join failure, sleep and retry
-- TODO: make delay configurable (const $ threadDelay (confJoinAttemptsInterval . nodeConfig $ parentNode) >> loop)
(const $ threadDelay (30 * 10^6) >> loop)
(const $ pure ()) (const $ pure ())
joinResult joinResult
@ -341,20 +342,16 @@ nodeCacheWriter nsSTM =
modifyTVar' (nodeCacheSTM ns) cacheModifier modifyTVar' (nodeCacheSTM ns) cacheModifier
-- TODO: make max entry age configurable
maxEntryAge :: POSIXTime
maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones -- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do nodeCacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin" putStrLn "cache verify run: begin"
-- get cache -- get cache
(ns, cache) <- atomically $ do (ns, cache, maxEntryAge) <- atomically $ do
ns <- readTVar nsSTM ns <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM ns cache <- readTVar $ nodeCacheSTM ns
pure (ns, cache) maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
pure (ns, cache, maxEntryAge)
-- iterate entries: -- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating. -- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime now <- getPOSIXTime
@ -402,7 +399,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
) )
putStrLn "cache verify run: end" putStrLn "cache verify run: end"
threadDelay $ 10^6 * round maxEntryAge `div` 20 threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
-- | Checks the invariant of at least @jEntries@ per cache slice. -- | Checks the invariant of at least @jEntries@ per cache slice.
@ -548,8 +545,8 @@ stabiliseThread nsSTM = forever $ do
newPredecessor newPredecessor
putStrLn "stabilise run: end" putStrLn "stabilise run: end"
-- TODO: make delay configurable stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
threadDelay (60 * 10^6) threadDelay stabiliseDelay
where where
-- | send a stabilise request to the n-th neighbour -- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retry -- (specified by the provided getter function) and on failure retry
@ -636,19 +633,15 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
POSIXTime POSIXTime
-- TODO: make purge age configurable
-- | periodically clean up old request parts
responsePurgeAge :: POSIXTime
responsePurgeAge = 60 -- seconds
requestMapPurge :: MVar RequestMap -> IO () requestMapPurge :: POSIXTime -> MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do requestMapPurge purgeAge mapVar = forever $ do
rMapState <- takeMVar mapVar rMapState <- takeMVar mapVar
now <- getPOSIXTime now <- getPOSIXTime
putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) ->
now - ts < responsePurgeAge now - ts < purgeAge
) rMapState ) rMapState
threadDelay $ round responsePurgeAge * 2 * 10^6 threadDelay $ (fromEnum purgeAge * 2) `div` 10^6
-- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- | Wait for messages, deserialise them, manage parts and acknowledgement status,
@ -663,12 +656,13 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- not change. -- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state. -- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
-- handling multipart messages: -- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap) requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes -- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail -- both of them fail
concurrently_ (requestMapPurge requestMap) $ forever $ do concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
-- wait for incoming messages -- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg let aMsg = deserialiseMessage rawMsg
@ -807,4 +801,4 @@ lookupCacheCleanup nodeSTM = do
now - ts < confMaxLookupCacheAge (nodeConfig node) now - ts < confMaxLookupCacheAge (nodeConfig node)
) )
) )
threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5) threadDelay $ fromEnum (2 * confMaxLookupCacheAge (nodeConfig node)) `div` 10^6

View file

@ -411,10 +411,22 @@ data FediChordConf = FediChordConf
-- ^ listening port for the FediChord DHT -- ^ listening port for the FediChord DHT
, confBootstrapNodes :: [(String, PortNumber)] , confBootstrapNodes :: [(String, PortNumber)]
-- ^ list of potential bootstrapping nodes -- ^ list of potential bootstrapping nodes
, confStabiliseInterval :: Int
-- ^ pause between stabilise runs, in milliseconds
, confBootstrapSamplingInterval :: Int , confBootstrapSamplingInterval :: Int
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds -- ^ pause between sampling the own ID through bootstrap nodes, in milliseconds
, confMaxLookupCacheAge :: POSIXTime , confMaxLookupCacheAge :: POSIXTime
-- ^ maximum age of lookup cache entries in seconds -- ^ maximum age of key lookup cache entries in seconds
, confJoinAttemptsInterval :: Int
-- ^ interval between join attempts on newly learned nodes, in milliseconds
, confMaxNodeCacheAge :: POSIXTime
-- ^ maximum age of entries in the node cache, in milliseconds
, confResponsePurgeAge :: POSIXTime
-- ^ maximum age of message parts in response part cache, in seconds
, confRequestTimeout :: Int
-- ^ how long to wait until response has arrived, in milliseconds
, confRequestRetries :: Int
-- ^ how often re-sending a timed-out request can be retried
} }
deriving (Show, Eq) deriving (Show, Eq)