Compare commits
2 commits
cd8ea07600
...
3bd4cb667d
Author | SHA1 | Date | |
---|---|---|---|
|
3bd4cb667d | ||
|
4ba592d8a2 |
|
@ -742,7 +742,7 @@ sendRequestTo = sendRequestToWithParams 5000 3
|
|||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||
sendRequestToWithParams :: Int -- ^ timeout in seconds
|
||||
sendRequestToWithParams :: Int -- ^ timeout in milliseconds
|
||||
-> Int -- ^ number of retries
|
||||
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
||||
-> Socket -- ^ connected socket to use for sending
|
||||
|
@ -756,7 +756,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
|||
-- create a queue for passing received response messages back, even after a timeout
|
||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||
-- start sendAndAck with timeout
|
||||
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
|
||||
attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
|
||||
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
||||
recvdParts <- atomically $ flushTBQueue responseQ
|
||||
pure $ Set.fromList recvdParts
|
||||
|
@ -765,19 +765,20 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
|||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> IO ()
|
||||
sendAndAck responseQueue sock remainingSends = do
|
||||
sendAndAck responseQueue sock' remainingSends = do
|
||||
sendMany sock $ Map.elems remainingSends
|
||||
-- if all requests have been acked/ responded to, return prematurely
|
||||
recvLoop responseQueue remainingSends Set.empty Nothing
|
||||
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
recvLoop sock' responseQueue remainingSends Set.empty Nothing
|
||||
recvLoop :: Socket
|
||||
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> Set.Set Integer -- ^ already received response part numbers
|
||||
-> Maybe Integer -- ^ total number of response parts if already known
|
||||
-> IO ()
|
||||
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
|
||||
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
|
||||
-- 65535 is maximum length of UDP packets, as long as
|
||||
-- no IPv6 jumbograms are used
|
||||
response <- deserialiseMessage <$> recv sock 65535
|
||||
response <- deserialiseMessage <$> recv sock' 65535
|
||||
case response of
|
||||
Right msg@Response{} -> do
|
||||
atomically $ writeTBQueue responseQueue msg
|
||||
|
@ -787,9 +788,9 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
|||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
||||
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
||||
then pure ()
|
||||
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
|
||||
else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts
|
||||
-- drop errors and invalid messages
|
||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
|
||||
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
|
||||
|
||||
|
||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||
|
|
Loading…
Reference in a new issue