Compare commits
No commits in common. "3bd4cb667db2ee4c94c2537e958cfd29e6819f72" and "cd8ea0760007602e58e3d4b6aff253fec5d235a9" have entirely different histories.
3bd4cb667d
...
cd8ea07600
|
@ -742,7 +742,7 @@ sendRequestTo = sendRequestToWithParams 5000 3
|
||||||
|
|
||||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||||
sendRequestToWithParams :: Int -- ^ timeout in milliseconds
|
sendRequestToWithParams :: Int -- ^ timeout in seconds
|
||||||
-> Int -- ^ number of retries
|
-> Int -- ^ number of retries
|
||||||
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
||||||
-> Socket -- ^ connected socket to use for sending
|
-> Socket -- ^ connected socket to use for sending
|
||||||
|
@ -756,7 +756,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
-- create a queue for passing received response messages back, even after a timeout
|
-- create a queue for passing received response messages back, even after a timeout
|
||||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||||
-- start sendAndAck with timeout
|
-- start sendAndAck with timeout
|
||||||
attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
|
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
|
||||||
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
||||||
recvdParts <- atomically $ flushTBQueue responseQ
|
recvdParts <- atomically $ flushTBQueue responseQ
|
||||||
pure $ Set.fromList recvdParts
|
pure $ Set.fromList recvdParts
|
||||||
|
@ -765,20 +765,19 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
||||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||||
-> IO ()
|
-> IO ()
|
||||||
sendAndAck responseQueue sock' remainingSends = do
|
sendAndAck responseQueue sock remainingSends = do
|
||||||
sendMany sock $ Map.elems remainingSends
|
sendMany sock $ Map.elems remainingSends
|
||||||
-- if all requests have been acked/ responded to, return prematurely
|
-- if all requests have been acked/ responded to, return prematurely
|
||||||
recvLoop sock' responseQueue remainingSends Set.empty Nothing
|
recvLoop responseQueue remainingSends Set.empty Nothing
|
||||||
recvLoop :: Socket
|
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||||
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
|
||||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||||
-> Set.Set Integer -- ^ already received response part numbers
|
-> Set.Set Integer -- ^ already received response part numbers
|
||||||
-> Maybe Integer -- ^ total number of response parts if already known
|
-> Maybe Integer -- ^ total number of response parts if already known
|
||||||
-> IO ()
|
-> IO ()
|
||||||
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
|
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
|
||||||
-- 65535 is maximum length of UDP packets, as long as
|
-- 65535 is maximum length of UDP packets, as long as
|
||||||
-- no IPv6 jumbograms are used
|
-- no IPv6 jumbograms are used
|
||||||
response <- deserialiseMessage <$> recv sock' 65535
|
response <- deserialiseMessage <$> recv sock 65535
|
||||||
case response of
|
case response of
|
||||||
Right msg@Response{} -> do
|
Right msg@Response{} -> do
|
||||||
atomically $ writeTBQueue responseQueue msg
|
atomically $ writeTBQueue responseQueue msg
|
||||||
|
@ -788,9 +787,9 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
newReceivedParts = Set.insert (part msg) receivedPartNums
|
||||||
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
||||||
then pure ()
|
then pure ()
|
||||||
else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts
|
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
|
||||||
-- drop errors and invalid messages
|
-- drop errors and invalid messages
|
||||||
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
|
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
|
||||||
|
|
||||||
|
|
||||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||||
|
|
Loading…
Reference in a new issue