Compare commits

..

2 commits

View file

@ -742,7 +742,7 @@ sendRequestTo = sendRequestToWithParams 5000 3
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestToWithParams :: Int -- ^ timeout in seconds sendRequestToWithParams :: Int -- ^ timeout in milliseconds
-> Int -- ^ number of retries -> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending -> Socket -- ^ connected socket to use for sending
@ -756,7 +756,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-- create a queue for passing received response messages back, even after a timeout -- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout -- start sendAndAck with timeout
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts pure $ Set.fromList recvdParts
@ -765,19 +765,20 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO () -> IO ()
sendAndAck responseQueue sock remainingSends = do sendAndAck responseQueue sock' remainingSends = do
sendMany sock $ Map.elems remainingSends sendMany sock $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely -- if all requests have been acked/ responded to, return prematurely
recvLoop responseQueue remainingSends Set.empty Nothing recvLoop sock' responseQueue remainingSends Set.empty Nothing
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses recvLoop :: Socket
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers -> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known -> Maybe Integer -- ^ total number of response parts if already known
-> IO () -> IO ()
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as -- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used -- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535 response <- deserialiseMessage <$> recv sock' 65535
case response of case response of
Right msg@Response{} -> do Right msg@Response{} -> do
atomically $ writeTBQueue responseQueue msg atomically $ writeTBQueue responseQueue msg
@ -787,9 +788,9 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
newReceivedParts = Set.insert (part msg) receivedPartNums newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
then pure () then pure ()
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts
-- drop errors and invalid messages -- drop errors and invalid messages
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache