From 8d18f952cd12db6b8383f585f15d11cbf4f3d224 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 19 May 2020 17:53:13 +0200 Subject: [PATCH] implement send-receive-acknowledge-retry loop for requests --- src/Hash2Pub/DHTProtocol.hs | 78 +++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 486721c..95ad9e8 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -121,41 +121,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc -- ====== message send and receive operations ====== -requestQueryID :: NodeState -> NodeID -> IO NodeState --- 1. do a local lookup for the l closest nodes --- 2. create l sockets --- 3. send a message async concurrently to all l nodes --- 4. collect the results, insert them into cache --- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -requestQueryID ns targetID = do - cacheSnapshot <- readIORef $ getNodeCacheRef ns - let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID - -- FOUND can only be returned if targetID is owned by local node - case localResult of - FOUND thisNode -> return thisNode - FORWARD nodeSet -> - sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet - -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 - responses = mapM +--requestQueryID :: NodeState -> NodeID -> IO NodeState +---- 1. do a local lookup for the l closest nodes +---- 2. create l sockets +---- 3. send a message async concurrently to all l nodes +---- 4. collect the results, insert them into cache +---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) +--requestQueryID ns targetID = do +-- cacheSnapshot <- readIORef $ getNodeCacheRef ns +-- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID +-- -- FOUND can only be returned if targetID is owned by local node +-- case localResult of +-- FOUND thisNode -> return thisNode +-- FORWARD nodeSet -> +-- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet +-- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 +-- responses = mapM +-- | Generic function for sending a request over a connected socket and collecting the response. +-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. sendRequestTo :: Int -- ^ timeout in seconds -> Int -- ^ number of retries -> FediChordMessage -- ^ the message to be sent -> Socket -- ^ connected socket to use for sending -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo timeout attempts msg sock = do +sendRequestTo timeoutMillis numAttempts msg sock = do let requests = serialiseMessage 1200 msg + -- create a queue for passing received response messages back, even after a timeout + responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets + -- start sendAndAck with timeout -- ToDo: make attempts and timeout configurable - attempts 3 . timeout 5000 $ do + attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests + -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. + recvdParts <- atomically $ flushTBQueue responseQ + -- PLACEHOLDER + pure Set.empty where -- state reingeben: state = noch nicht geackte messages, result = responses - sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage) - sendAndAck sock = do - remainingSends <- get + sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses + -> Socket -- ^ the socket used for sending and receiving for this particular remote node + -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts + -> IO () + sendAndAck responseQueue sock remainingSends = do sendMany sock $ Map.elems remainingSends - -- timeout pro receive socket, danach catMaybes - -- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen - replicateM + -- if all requests have been acked/ responded to, return prematurely + recvLoop responseQueue remainingSends Set.empty + recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses + -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts + -> Set.Set Integer -- ^ already received response part numbers + -> IO () + recvLoop responseQueue remainingSends' receivedPartNums = do + -- 65535 is maximum length of UDP packets, as long as + -- no IPv6 jumbograms are used + response <- deserialiseMessage <$> recv sock 65535 + case response of + -- drop errors + Left _ -> recvLoop responseQueue remainingSends' receivedPartNums + Right msg -> do + atomically $ writeTBQueue responseQueue msg + let + newRemaining = Map.delete (part msg) remainingSends' + newReceivedParts = Set.insert (part msg) receivedPartNums + -- ToDo: handle responses with more parts than the request + if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg) + then pure () + else recvLoop responseQueue newRemaining receivedPartNums