From 0660bce29949b7237bbdf403a146a1910799c28a Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 31 May 2020 23:21:27 +0200 Subject: [PATCH] acknowledge parts when receiving partial requests --- src/Hash2Pub/DHTProtocol.hs | 20 +++++++++++++++++++- src/Hash2Pub/FediChord.hs | 7 +++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index ed1e5d4..5ff87b0 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -21,6 +21,7 @@ module Hash2Pub.DHTProtocol , mkSendSocket , mkServerSocket , handleIncomingRequest + , ackRequest ) where @@ -150,8 +151,23 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc isJoined_ :: LocalNodeState -> Bool isJoined_ ns = not . all null $ [successors ns, predecessors ns] +-- | the size limit to be used when serialising messages for sending +sendMessageSize :: Num i => i +sendMessageSize = 1200 + -- ====== message send and receive operations ====== +-- encode the response to a request that just signals successful receipt +ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString +ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { + responseTo = requestID req + , senderID = ownID + , part = part req + , isFinalPart = False + , action = action req + , payload = Nothing + } + handleIncomingRequest :: LocalNodeState -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue @@ -165,6 +181,8 @@ handleIncomingRequest ns sendQ msgSet sourceAddr = do -- distinguish on whether and how to respond -- create and enqueue ACK -- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on + -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash + -- TODO: test case: mixed message types of parts -- PLACEHOLDER pure () @@ -273,7 +291,7 @@ sendRequestTo :: Int -- ^ timeout in seconds sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- give the message a random request ID randomID <- randomRIO (0, 2^32-1) - let requests = serialiseMessage 1200 $ msgIncomplete randomID + let requests = serialiseMessage sendMessageSize $ msgIncomplete randomID -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 8a367f2..c8b2b2e 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -64,7 +64,7 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue -import Control.Monad (forever) +import Control.Monad (forM_, forever) import Crypto.Hash import qualified Data.ByteArray as BA import qualified Data.ByteString as BS @@ -255,7 +255,7 @@ fediMessageHandler sendQ recvQ ns = do (\validMsg -> case validMsg of aRequest@Request{} - -- if not a multipart message, handle immediately. Response is at the same time a confirmation + -- if not a multipart message, handle immediately. Response is at the same time an ACK | part aRequest == 1 && isFinalPart aRequest -> forkIO (handleIncomingRequest ns sendQ (Set.singleton aRequest) sourceAddr) >> pure () -- otherwise collect all message parts first before handling the whole request @@ -276,6 +276,9 @@ fediMessageHandler sendQ recvQ ns = do rMapState -- put map back into MVar, end of critical section putMVar requestMap newMapState + -- ACK the received part + forM_ (ackRequest (getNid ns) aRequest) $ + \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) -- if all parts received, then handle request. let (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState