diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fba3f20..fb50c98 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -177,7 +177,7 @@ requestJoin :: NodeState a => a -- ^ currently responsible node to b -> IO (Either String LocalNodeState) -- ^ node after join with all its new information requestJoin toJoinOn ownState = bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do - responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock joinedStateUnsorted <- foldM (\nsAcc msg -> case payload msg of Nothing -> pure nsAcc @@ -259,7 +259,7 @@ sendQueryIdMessage :: NodeID -- ^ target key ID to look u -> IO (Set.Set FediChordMessage) -- ^ responses sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns) where - lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID) + lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } -- | Generic function for sending a request over a connected socket and collecting the response. @@ -281,7 +281,6 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do recvdParts <- atomically $ flushTBQueue responseQ pure $ Set.fromList recvdParts where - -- state reingeben: state = noch nicht geackte messages, result = responses sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts @@ -289,27 +288,28 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do sendAndAck responseQueue sock remainingSends = do sendMany sock $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely - recvLoop responseQueue remainingSends Set.empty + recvLoop responseQueue remainingSends Set.empty Nothing recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers + -> Maybe Integer -- ^ total number of response parts if already known -> IO () - recvLoop responseQueue remainingSends' receivedPartNums = do + recvLoop responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as -- no IPv6 jumbograms are used response <- deserialiseMessage <$> recv sock 65535 case response of - -- drop errors - Left _ -> recvLoop responseQueue remainingSends' receivedPartNums - Right msg -> do + Right msg@Response{} -> do atomically $ writeTBQueue responseQueue msg let + newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newRemaining = Map.delete (part msg) remainingSends' newReceivedParts = Set.insert (part msg) receivedPartNums - -- ToDo: handle responses with more parts than the request - if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg) + if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts then pure () - else recvLoop responseQueue newRemaining receivedPartNums + else recvLoop responseQueue newRemaining receivedPartNums newTotalParts + -- drop errors and invalid messages + Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 784c14e..629c7c2 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -196,16 +196,16 @@ spec = do requestTemplate = Request { requestID = 2342 , sender = exampleNodeState - , parts = 1 , part = 1 + , isFinalPart = True , action = undefined , payload = undefined } responseTemplate = Response { responseTo = 2342 , senderID = nid exampleNodeState - , parts = 1 , part = 1 + , isFinalPart = True , action = undefined , payload = undefined }