acknowledge parts when receiving partial requests

This commit is contained in:
Trolli Schmittlauch 2020-05-31 23:21:27 +02:00
parent 88104de9bf
commit 0660bce299
2 changed files with 24 additions and 3 deletions

View file

@ -21,6 +21,7 @@ module Hash2Pub.DHTProtocol
, mkSendSocket , mkSendSocket
, mkServerSocket , mkServerSocket
, handleIncomingRequest , handleIncomingRequest
, ackRequest
) )
where where
@ -150,8 +151,23 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
isJoined_ :: LocalNodeState -> Bool isJoined_ :: LocalNodeState -> Bool
isJoined_ ns = not . all null $ [successors ns, predecessors ns] isJoined_ ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i
sendMessageSize = 1200
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
responseTo = requestID req
, senderID = ownID
, part = part req
, isFinalPart = False
, action = action req
, payload = Nothing
}
handleIncomingRequest :: LocalNodeState -- ^ the handling node handleIncomingRequest :: LocalNodeState -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ send queue
@ -165,6 +181,8 @@ handleIncomingRequest ns sendQ msgSet sourceAddr = do
-- distinguish on whether and how to respond -- distinguish on whether and how to respond
-- create and enqueue ACK -- create and enqueue ACK
-- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on -- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
-- TODO: test case: mixed message types of parts
-- PLACEHOLDER -- PLACEHOLDER
pure () pure ()
@ -273,7 +291,7 @@ sendRequestTo :: Int -- ^ timeout in seconds
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- give the message a random request ID -- give the message a random request ID
randomID <- randomRIO (0, 2^32-1) randomID <- randomRIO (0, 2^32-1)
let requests = serialiseMessage 1200 $ msgIncomplete randomID let requests = serialiseMessage sendMessageSize $ msgIncomplete randomID
-- create a queue for passing received response messages back, even after a timeout -- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout -- start sendAndAck with timeout

View file

@ -64,7 +64,7 @@ import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Monad (forever) import Control.Monad (forM_, forever)
import Crypto.Hash import Crypto.Hash
import qualified Data.ByteArray as BA import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
@ -255,7 +255,7 @@ fediMessageHandler sendQ recvQ ns = do
(\validMsg -> (\validMsg ->
case validMsg of case validMsg of
aRequest@Request{} aRequest@Request{}
-- if not a multipart message, handle immediately. Response is at the same time a confirmation -- if not a multipart message, handle immediately. Response is at the same time an ACK
| part aRequest == 1 && isFinalPart aRequest -> | part aRequest == 1 && isFinalPart aRequest ->
forkIO (handleIncomingRequest ns sendQ (Set.singleton aRequest) sourceAddr) >> pure () forkIO (handleIncomingRequest ns sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
-- otherwise collect all message parts first before handling the whole request -- otherwise collect all message parts first before handling the whole request
@ -276,6 +276,9 @@ fediMessageHandler sendQ recvQ ns = do
rMapState rMapState
-- put map back into MVar, end of critical section -- put map back into MVar, end of critical section
putMVar requestMap newMapState putMVar requestMap newMapState
-- ACK the received part
forM_ (ackRequest (getNid ns) aRequest) $
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
-- if all parts received, then handle request. -- if all parts received, then handle request.
let let
(RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState