scale request timeout with speedup and pass it directly to function
This commit is contained in:
parent
4f08d33d2e
commit
c9b0e66110
|
@ -65,6 +65,8 @@ readConfig = do
|
||||||
, confJoinAttemptsInterval = 60 * 10^6 `div` speedup
|
, confJoinAttemptsInterval = 60 * 10^6 `div` speedup
|
||||||
, confMaxNodeCacheAge = 600 / fromIntegral speedup
|
, confMaxNodeCacheAge = 600 / fromIntegral speedup
|
||||||
, confResponsePurgeAge = 60 / fromIntegral speedup
|
, confResponsePurgeAge = 60 / fromIntegral speedup
|
||||||
|
, confRequestTimeout = 5 * 10^6 `div` speedup
|
||||||
|
, confRequestRetries = 3
|
||||||
}
|
}
|
||||||
sConf = ServiceConf {
|
sConf = ServiceConf {
|
||||||
confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup
|
confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` speedup
|
||||||
|
|
|
@ -488,10 +488,11 @@ requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ cu
|
||||||
requestJoin toJoinOn ownStateSTM = do
|
requestJoin toJoinOn ownStateSTM = do
|
||||||
ownState <- readTVarIO ownStateSTM
|
ownState <- readTVarIO ownStateSTM
|
||||||
prn <- readTVarIO $ parentRealNode ownState
|
prn <- readTVarIO $ parentRealNode ownState
|
||||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ownState)
|
||||||
|
let srcAddr = confIP nodeConf
|
||||||
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
||||||
-- extract own state for getting request information
|
-- extract own state for getting request information
|
||||||
responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
||||||
(cacheInsertQ, joinedState) <- atomically $ do
|
(cacheInsertQ, joinedState) <- atomically $ do
|
||||||
stateSnap <- readTVar ownStateSTM
|
stateSnap <- readTVar ownStateSTM
|
||||||
let
|
let
|
||||||
|
@ -584,10 +585,11 @@ sendQueryIdMessages targetID ns lParam targets = do
|
||||||
|
|
||||||
-- create connected sockets to all query targets and use them for request handling
|
-- create connected sockets to all query targets and use them for request handling
|
||||||
|
|
||||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
|
let srcAddr = confIP nodeConf
|
||||||
-- ToDo: make attempts and timeout configurable
|
-- ToDo: make attempts and timeout configurable
|
||||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
||||||
sendRequestTo (lookupMessage targetID ns Nothing)
|
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
||||||
)) targets
|
)) targets
|
||||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||||
-- ToDo: exception handling, maybe log them
|
-- ToDo: exception handling, maybe log them
|
||||||
|
@ -635,8 +637,9 @@ requestStabilise :: LocalNodeState s -- ^ sending node
|
||||||
-> RemoteNodeState -- ^ neighbour node to send to
|
-> RemoteNodeState -- ^ neighbour node to send to
|
||||||
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
|
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
|
||||||
requestStabilise ns neighbour = do
|
requestStabilise ns neighbour = do
|
||||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid ->
|
let srcAddr = confIP nodeConf
|
||||||
|
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||||
Request {
|
Request {
|
||||||
requestID = rid
|
requestID = rid
|
||||||
, sender = toRemoteNodeState ns
|
, sender = toRemoteNodeState ns
|
||||||
|
@ -673,13 +676,14 @@ requestLeave :: LocalNodeState s
|
||||||
-> RemoteNodeState -- target node
|
-> RemoteNodeState -- target node
|
||||||
-> IO (Either String ()) -- error or success
|
-> IO (Either String ()) -- error or success
|
||||||
requestLeave ns doMigration target = do
|
requestLeave ns doMigration target = do
|
||||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
let leavePayload = LeaveRequestPayload {
|
let srcAddr = confIP nodeConf
|
||||||
|
leavePayload = LeaveRequestPayload {
|
||||||
leaveSuccessors = successors ns
|
leaveSuccessors = successors ns
|
||||||
, leavePredecessors = predecessors ns
|
, leavePredecessors = predecessors ns
|
||||||
, leaveDoMigration = doMigration
|
, leaveDoMigration = doMigration
|
||||||
}
|
}
|
||||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid ->
|
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||||
Request {
|
Request {
|
||||||
requestID = rid
|
requestID = rid
|
||||||
, sender = toRemoteNodeState ns
|
, sender = toRemoteNodeState ns
|
||||||
|
@ -701,10 +705,11 @@ requestPing :: LocalNodeState s -- ^ sending node
|
||||||
-> RemoteNodeState -- ^ node to be PINGed
|
-> RemoteNodeState -- ^ node to be PINGed
|
||||||
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
|
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
|
||||||
requestPing ns target = do
|
requestPing ns target = do
|
||||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
|
let srcAddr = confIP nodeConf
|
||||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
||||||
(\sock -> do
|
(\sock -> do
|
||||||
resp <- sendRequestTo (\rid ->
|
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||||
Request {
|
Request {
|
||||||
requestID = rid
|
requestID = rid
|
||||||
, sender = toRemoteNodeState ns
|
, sender = toRemoteNodeState ns
|
||||||
|
@ -740,22 +745,14 @@ requestPing ns target = do
|
||||||
) responses
|
) responses
|
||||||
|
|
||||||
|
|
||||||
-- | 'sendRequestToWithParams' with default timeout and retries already specified.
|
|
||||||
-- Generic function for sending a request over a connected socket and collecting the response.
|
|
||||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout.
|
|
||||||
sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
|
||||||
-> Socket -- ^ connected socket to use for sending
|
|
||||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
|
||||||
sendRequestTo = sendRequestToWithParams 5000 3
|
|
||||||
|
|
||||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||||
sendRequestToWithParams :: Int -- ^ timeout in milliseconds
|
sendRequestTo :: Int -- ^ timeout in milliseconds
|
||||||
-> Int -- ^ number of retries
|
-> Int -- ^ number of retries
|
||||||
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
||||||
-> Socket -- ^ connected socket to use for sending
|
-> Socket -- ^ connected socket to use for sending
|
||||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||||
sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
-- give the message a random request ID
|
-- give the message a random request ID
|
||||||
randomID <- randomRIO (0, 2^32-1)
|
randomID <- randomRIO (0, 2^32-1)
|
||||||
let
|
let
|
||||||
|
@ -764,7 +761,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
-- create a queue for passing received response messages back, even after a timeout
|
-- create a queue for passing received response messages back, even after a timeout
|
||||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||||
-- start sendAndAck with timeout
|
-- start sendAndAck with timeout
|
||||||
attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
|
_ <- attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
|
||||||
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
||||||
recvdParts <- atomically $ flushTBQueue responseQ
|
recvdParts <- atomically $ flushTBQueue responseQ
|
||||||
pure $ Set.fromList recvdParts
|
pure $ Set.fromList recvdParts
|
||||||
|
|
|
@ -223,10 +223,11 @@ tryBootstrapJoining nsSTM = do
|
||||||
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
|
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
|
||||||
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
|
let srcAddr = confIP nodeConf
|
||||||
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
|
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
|
||||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||||
fmap Right . sendRequestTo (lookupMessage targetID ns Nothing)
|
fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
||||||
)
|
)
|
||||||
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
|
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
|
||||||
|
|
||||||
|
|
|
@ -423,6 +423,10 @@ data FediChordConf = FediChordConf
|
||||||
-- ^ maximum age of entries in the node cache, in milliseconds
|
-- ^ maximum age of entries in the node cache, in milliseconds
|
||||||
, confResponsePurgeAge :: POSIXTime
|
, confResponsePurgeAge :: POSIXTime
|
||||||
-- ^ maximum age of message parts in response part cache, in seconds
|
-- ^ maximum age of message parts in response part cache, in seconds
|
||||||
|
, confRequestTimeout :: Int
|
||||||
|
-- ^ how long to wait until response has arrived, in milliseconds
|
||||||
|
, confRequestRetries :: Int
|
||||||
|
-- ^ how often re-sending a timed-out request can be retried
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue