explicitly pass socket in send-receive-loop
This commit is contained in:
parent
4ba592d8a2
commit
3bd4cb667d
|
@ -742,7 +742,7 @@ sendRequestTo = sendRequestToWithParams 5000 3
|
|||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||
sendRequestToWithParams :: Int -- ^ timeout in seconds
|
||||
sendRequestToWithParams :: Int -- ^ timeout in milliseconds
|
||||
-> Int -- ^ number of retries
|
||||
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
||||
-> Socket -- ^ connected socket to use for sending
|
||||
|
@ -765,19 +765,20 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
|||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> IO ()
|
||||
sendAndAck responseQueue sock remainingSends = do
|
||||
sendAndAck responseQueue sock' remainingSends = do
|
||||
sendMany sock $ Map.elems remainingSends
|
||||
-- if all requests have been acked/ responded to, return prematurely
|
||||
recvLoop responseQueue remainingSends Set.empty Nothing
|
||||
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
recvLoop sock' responseQueue remainingSends Set.empty Nothing
|
||||
recvLoop :: Socket
|
||||
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> Set.Set Integer -- ^ already received response part numbers
|
||||
-> Maybe Integer -- ^ total number of response parts if already known
|
||||
-> IO ()
|
||||
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
|
||||
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
|
||||
-- 65535 is maximum length of UDP packets, as long as
|
||||
-- no IPv6 jumbograms are used
|
||||
response <- deserialiseMessage <$> recv sock 65535
|
||||
response <- deserialiseMessage <$> recv sock' 65535
|
||||
case response of
|
||||
Right msg@Response{} -> do
|
||||
atomically $ writeTBQueue responseQueue msg
|
||||
|
@ -787,9 +788,9 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
|||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
||||
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
||||
then pure ()
|
||||
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
|
||||
else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts
|
||||
-- drop errors and invalid messages
|
||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
|
||||
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
|
||||
|
||||
|
||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||
|
|
Loading…
Reference in a new issue