From f8d444d5b66956fdefbdad2c8ed485df8daa7693 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 30 May 2020 13:07:28 +0200 Subject: [PATCH 1/2] FediChordMessage: last part has falg instead of parts number in each msg Motivation: Including the number of parts in each message part requires the total number of parts to be known in advance, making dynamic responses based on the received data difficult --- FediChord.asn1 | 10 ++++++---- src/Hash2Pub/ASN1Coding.hs | 28 +++++++++++++++++----------- src/Hash2Pub/ProtocolTypes.hs | 4 ++-- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/FediChord.asn1 b/FediChord.asn1 index 254fc95..a907bb1 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -4,14 +4,16 @@ NodeID ::= INTEGER (0..115792089237316195423570985008687907853269984665640564039 Domain ::= VisibleString +Partnum ::= INTEGER (0..150) + Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} Request ::= SEQUENCE { action Action, requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer sender NodeState, - parts INTEGER (1..150), -- number of message parts - part INTEGER (1..150), -- part number of this message, starts at 1 + part Partnum, -- part number of this message, starts at 1 + finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest actionPayload CHOICE { queryIDRequestPayload QueryIDRequestPayload, joinRequestPayload JoinRequestPayload, @@ -27,8 +29,8 @@ Request ::= SEQUENCE { Response ::= SEQUENCE { responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer senderID NodeID, - parts INTEGER (0..150), - part INTEGER (0..150), + part Partnum, + finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response action Action, actionPayload CHOICE { queryIDResponsePayload QueryIDResponsePayload, diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index abf749b..6bbb9df 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -103,7 +103,7 @@ serialiseMessage maxBytesLength msg = modifyMessage i (partNum, pl) pls = (partNum, msg { part = partNum , payload = Just pl - , parts = fromIntegral i + , isFinalPart = partNum == fromIntegral i }):pls -- part starts at 1 payloadParts :: Int -> Maybe [(Integer, ActionPayload)] @@ -216,23 +216,22 @@ encodeQueryResult FORWARD{} = Enumerated 1 encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded -> [ASN1] encodeMessage - (Request requestID sender parts part action requestPayload) = + (Request requestID sender part isFinalPart action requestPayload) = Start Sequence : (Enumerated . fromIntegral . fromEnum $ action) : IntVal requestID : encodeNodeState sender - <> [ - IntVal parts - , IntVal part ] + <> [IntVal part + , Boolean isFinalPart] <> maybe [] encodePayload requestPayload <> [End Sequence] encodeMessage - (Response responseTo senderID parts part action responsePayload) = [ + (Response responseTo senderID part isFinalPart action responsePayload) = [ Start Sequence , IntVal responseTo , IntVal . getNodeID $ senderID - , IntVal parts , IntVal part + , Boolean isFinalPart , Enumerated . fromIntegral . fromEnum $ action] <> maybe [] encodePayload responsePayload <> [End Sequence] @@ -265,8 +264,8 @@ parseRequest :: Action -> ParseASN1 FediChordMessage parseRequest action = do requestID <- parseInteger sender <- parseNodeState - parts <- parseInteger part <- parseInteger + isFinalPart <- parseBool hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of QueryID -> parseQueryIDRequest @@ -275,13 +274,13 @@ parseRequest action = do Stabilise -> parseStabiliseRequest Ping -> parsePingRequest - pure $ Request requestID sender parts part action payload + pure $ Request requestID sender part isFinalPart action payload parseResponse :: Integer -> ParseASN1 FediChordMessage parseResponse responseTo = do senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID - parts <- parseInteger part <- parseInteger + isFinalPart <- parseBool action <- parseEnum :: ParseASN1 Action hasPayload <- hasNext payload <- if not hasPayload then pure Nothing else Just <$> case action of @@ -291,7 +290,14 @@ parseResponse responseTo = do Stabilise -> parseStabiliseResponse Ping -> parsePingResponse - pure $ Response responseTo senderID parts part action payload + pure $ Response responseTo senderID part isFinalPart action payload + +parseBool :: ParseASN1 Bool +parseBool = do + i <- getNext + case i of + Boolean parsed -> pure parsed + x -> throwParseError $ "Expected Boolean but got " <> show x parseInteger :: ParseASN1 Integer parseInteger = do diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index 9203bdd..bab3866 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -21,8 +21,8 @@ data Action = QueryID data FediChordMessage = Request { requestID :: Integer , sender :: RemoteNodeState - , parts :: Integer , part :: Integer + , isFinalPart :: Bool -- ^ part starts at 1 , action :: Action , payload :: Maybe ActionPayload @@ -30,8 +30,8 @@ data FediChordMessage = Request | Response { responseTo :: Integer , senderID :: NodeID - , parts :: Integer , part :: Integer + , isFinalPart :: Bool , action :: Action , payload :: Maybe ActionPayload } From 254209137999da5265e9f4d3ff46b5bb7d373545 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 30 May 2020 13:52:06 +0200 Subject: [PATCH 2/2] adjust rest of code to new message structure --- src/Hash2Pub/DHTProtocol.hs | 22 +++++++++++----------- test/FediChordSpec.hs | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fba3f20..fb50c98 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -177,7 +177,7 @@ requestJoin :: NodeState a => a -- ^ currently responsible node to b -> IO (Either String LocalNodeState) -- ^ node after join with all its new information requestJoin toJoinOn ownState = bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do - responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock joinedStateUnsorted <- foldM (\nsAcc msg -> case payload msg of Nothing -> pure nsAcc @@ -259,7 +259,7 @@ sendQueryIdMessage :: NodeID -- ^ target key ID to look u -> IO (Set.Set FediChordMessage) -- ^ responses sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns) where - lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID) + lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } -- | Generic function for sending a request over a connected socket and collecting the response. @@ -281,7 +281,6 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do recvdParts <- atomically $ flushTBQueue responseQ pure $ Set.fromList recvdParts where - -- state reingeben: state = noch nicht geackte messages, result = responses sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts @@ -289,27 +288,28 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do sendAndAck responseQueue sock remainingSends = do sendMany sock $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely - recvLoop responseQueue remainingSends Set.empty + recvLoop responseQueue remainingSends Set.empty Nothing recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers + -> Maybe Integer -- ^ total number of response parts if already known -> IO () - recvLoop responseQueue remainingSends' receivedPartNums = do + recvLoop responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as -- no IPv6 jumbograms are used response <- deserialiseMessage <$> recv sock 65535 case response of - -- drop errors - Left _ -> recvLoop responseQueue remainingSends' receivedPartNums - Right msg -> do + Right msg@Response{} -> do atomically $ writeTBQueue responseQueue msg let + newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newRemaining = Map.delete (part msg) remainingSends' newReceivedParts = Set.insert (part msg) receivedPartNums - -- ToDo: handle responses with more parts than the request - if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg) + if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts then pure () - else recvLoop responseQueue newRemaining receivedPartNums + else recvLoop responseQueue newRemaining receivedPartNums newTotalParts + -- drop errors and invalid messages + Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 784c14e..629c7c2 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -196,16 +196,16 @@ spec = do requestTemplate = Request { requestID = 2342 , sender = exampleNodeState - , parts = 1 , part = 1 + , isFinalPart = True , action = undefined , payload = undefined } responseTemplate = Response { responseTo = 2342 , senderID = nid exampleNodeState - , parts = 1 , part = 1 + , isFinalPart = True , action = undefined , payload = undefined }