implement send-receive-acknowledge-retry loop for requests
This commit is contained in:
parent
0e6f126b3b
commit
8d18f952cd
|
@ -121,41 +121,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
|||
|
||||
-- ====== message send and receive operations ======
|
||||
|
||||
requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||
-- 1. do a local lookup for the l closest nodes
|
||||
-- 2. create l sockets
|
||||
-- 3. send a message async concurrently to all l nodes
|
||||
-- 4. collect the results, insert them into cache
|
||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
requestQueryID ns targetID = do
|
||||
cacheSnapshot <- readIORef $ getNodeCacheRef ns
|
||||
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||
-- FOUND can only be returned if targetID is owned by local node
|
||||
case localResult of
|
||||
FOUND thisNode -> return thisNode
|
||||
FORWARD nodeSet ->
|
||||
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
responses = mapM
|
||||
--requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||
---- 1. do a local lookup for the l closest nodes
|
||||
---- 2. create l sockets
|
||||
---- 3. send a message async concurrently to all l nodes
|
||||
---- 4. collect the results, insert them into cache
|
||||
---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
--requestQueryID ns targetID = do
|
||||
-- cacheSnapshot <- readIORef $ getNodeCacheRef ns
|
||||
-- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||
-- -- FOUND can only be returned if targetID is owned by local node
|
||||
-- case localResult of
|
||||
-- FOUND thisNode -> return thisNode
|
||||
-- FORWARD nodeSet ->
|
||||
-- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||
-- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
-- responses = mapM
|
||||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||
sendRequestTo :: Int -- ^ timeout in seconds
|
||||
-> Int -- ^ number of retries
|
||||
-> FediChordMessage -- ^ the message to be sent
|
||||
-> Socket -- ^ connected socket to use for sending
|
||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||
sendRequestTo timeout attempts msg sock = do
|
||||
sendRequestTo timeoutMillis numAttempts msg sock = do
|
||||
let requests = serialiseMessage 1200 msg
|
||||
-- create a queue for passing received response messages back, even after a timeout
|
||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||
-- start sendAndAck with timeout
|
||||
-- ToDo: make attempts and timeout configurable
|
||||
attempts 3 . timeout 5000 $ do
|
||||
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
|
||||
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
||||
recvdParts <- atomically $ flushTBQueue responseQ
|
||||
-- PLACEHOLDER
|
||||
pure Set.empty
|
||||
where
|
||||
-- state reingeben: state = noch nicht geackte messages, result = responses
|
||||
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
|
||||
sendAndAck sock = do
|
||||
remainingSends <- get
|
||||
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> IO ()
|
||||
sendAndAck responseQueue sock remainingSends = do
|
||||
sendMany sock $ Map.elems remainingSends
|
||||
-- timeout pro receive socket, danach catMaybes
|
||||
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
|
||||
replicateM
|
||||
-- if all requests have been acked/ responded to, return prematurely
|
||||
recvLoop responseQueue remainingSends Set.empty
|
||||
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> Set.Set Integer -- ^ already received response part numbers
|
||||
-> IO ()
|
||||
recvLoop responseQueue remainingSends' receivedPartNums = do
|
||||
-- 65535 is maximum length of UDP packets, as long as
|
||||
-- no IPv6 jumbograms are used
|
||||
response <- deserialiseMessage <$> recv sock 65535
|
||||
case response of
|
||||
-- drop errors
|
||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
|
||||
Right msg -> do
|
||||
atomically $ writeTBQueue responseQueue msg
|
||||
let
|
||||
newRemaining = Map.delete (part msg) remainingSends'
|
||||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
||||
-- ToDo: handle responses with more parts than the request
|
||||
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
|
||||
then pure ()
|
||||
else recvLoop responseQueue newRemaining receivedPartNums
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue