Compare commits

...

2 commits

Author SHA1 Message Date
Trolli Schmittlauch 2542091379 adjust rest of code to new message structure 2020-05-30 13:52:06 +02:00
Trolli Schmittlauch f8d444d5b6 FediChordMessage: last part has falg instead of parts number in each msg
Motivation: Including the number of parts in each message part requires
the total number of parts to be known in advance, making dynamic
responses based on the received data difficult
2020-05-30 13:07:28 +02:00
5 changed files with 38 additions and 30 deletions

View file

@ -4,14 +4,16 @@ NodeID ::= INTEGER (0..115792089237316195423570985008687907853269984665640564039
Domain ::= VisibleString Domain ::= VisibleString
Partnum ::= INTEGER (0..150)
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
Request ::= SEQUENCE { Request ::= SEQUENCE {
action Action, action Action,
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
sender NodeState, sender NodeState,
parts INTEGER (1..150), -- number of message parts part Partnum, -- part number of this message, starts at 1
part INTEGER (1..150), -- part number of this message, starts at 1 finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
actionPayload CHOICE { actionPayload CHOICE {
queryIDRequestPayload QueryIDRequestPayload, queryIDRequestPayload QueryIDRequestPayload,
joinRequestPayload JoinRequestPayload, joinRequestPayload JoinRequestPayload,
@ -27,8 +29,8 @@ Request ::= SEQUENCE {
Response ::= SEQUENCE { Response ::= SEQUENCE {
responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
senderID NodeID, senderID NodeID,
parts INTEGER (0..150), part Partnum,
part INTEGER (0..150), finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
action Action, action Action,
actionPayload CHOICE { actionPayload CHOICE {
queryIDResponsePayload QueryIDResponsePayload, queryIDResponsePayload QueryIDResponsePayload,

View file

@ -103,7 +103,7 @@ serialiseMessage maxBytesLength msg =
modifyMessage i (partNum, pl) pls = (partNum, msg { modifyMessage i (partNum, pl) pls = (partNum, msg {
part = partNum part = partNum
, payload = Just pl , payload = Just pl
, parts = fromIntegral i , isFinalPart = partNum == fromIntegral i
}):pls }):pls
-- part starts at 1 -- part starts at 1
payloadParts :: Int -> Maybe [(Integer, ActionPayload)] payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
@ -216,23 +216,22 @@ encodeQueryResult FORWARD{} = Enumerated 1
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
-> [ASN1] -> [ASN1]
encodeMessage encodeMessage
(Request requestID sender parts part action requestPayload) = (Request requestID sender part isFinalPart action requestPayload) =
Start Sequence Start Sequence
: (Enumerated . fromIntegral . fromEnum $ action) : (Enumerated . fromIntegral . fromEnum $ action)
: IntVal requestID : IntVal requestID
: encodeNodeState sender : encodeNodeState sender
<> [ <> [IntVal part
IntVal parts , Boolean isFinalPart]
, IntVal part ]
<> maybe [] encodePayload requestPayload <> maybe [] encodePayload requestPayload
<> [End Sequence] <> [End Sequence]
encodeMessage encodeMessage
(Response responseTo senderID parts part action responsePayload) = [ (Response responseTo senderID part isFinalPart action responsePayload) = [
Start Sequence Start Sequence
, IntVal responseTo , IntVal responseTo
, IntVal . getNodeID $ senderID , IntVal . getNodeID $ senderID
, IntVal parts
, IntVal part , IntVal part
, Boolean isFinalPart
, Enumerated . fromIntegral . fromEnum $ action] , Enumerated . fromIntegral . fromEnum $ action]
<> maybe [] encodePayload responsePayload <> maybe [] encodePayload responsePayload
<> [End Sequence] <> [End Sequence]
@ -265,8 +264,8 @@ parseRequest :: Action -> ParseASN1 FediChordMessage
parseRequest action = do parseRequest action = do
requestID <- parseInteger requestID <- parseInteger
sender <- parseNodeState sender <- parseNodeState
parts <- parseInteger
part <- parseInteger part <- parseInteger
isFinalPart <- parseBool
hasPayload <- hasNext hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDRequest QueryID -> parseQueryIDRequest
@ -275,13 +274,13 @@ parseRequest action = do
Stabilise -> parseStabiliseRequest Stabilise -> parseStabiliseRequest
Ping -> parsePingRequest Ping -> parsePingRequest
pure $ Request requestID sender parts part action payload pure $ Request requestID sender part isFinalPart action payload
parseResponse :: Integer -> ParseASN1 FediChordMessage parseResponse :: Integer -> ParseASN1 FediChordMessage
parseResponse responseTo = do parseResponse responseTo = do
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
parts <- parseInteger
part <- parseInteger part <- parseInteger
isFinalPart <- parseBool
action <- parseEnum :: ParseASN1 Action action <- parseEnum :: ParseASN1 Action
hasPayload <- hasNext hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of payload <- if not hasPayload then pure Nothing else Just <$> case action of
@ -291,7 +290,14 @@ parseResponse responseTo = do
Stabilise -> parseStabiliseResponse Stabilise -> parseStabiliseResponse
Ping -> parsePingResponse Ping -> parsePingResponse
pure $ Response responseTo senderID parts part action payload pure $ Response responseTo senderID part isFinalPart action payload
parseBool :: ParseASN1 Bool
parseBool = do
i <- getNext
case i of
Boolean parsed -> pure parsed
x -> throwParseError $ "Expected Boolean but got " <> show x
parseInteger :: ParseASN1 Integer parseInteger :: ParseASN1 Integer
parseInteger = do parseInteger = do

View file

@ -177,7 +177,7 @@ requestJoin :: NodeState a => a -- ^ currently responsible node to b
-> IO (Either String LocalNodeState) -- ^ node after join with all its new information -> IO (Either String LocalNodeState) -- ^ node after join with all its new information
requestJoin toJoinOn ownState = requestJoin toJoinOn ownState =
bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
joinedStateUnsorted <- foldM joinedStateUnsorted <- foldM
(\nsAcc msg -> case payload msg of (\nsAcc msg -> case payload msg of
Nothing -> pure nsAcc Nothing -> pure nsAcc
@ -259,7 +259,7 @@ sendQueryIdMessage :: NodeID -- ^ target key ID to look u
-> IO (Set.Set FediChordMessage) -- ^ responses -> IO (Set.Set FediChordMessage) -- ^ responses
sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns) sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns)
where where
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID) lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
@ -281,7 +281,6 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
recvdParts <- atomically $ flushTBQueue responseQ recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts pure $ Set.fromList recvdParts
where where
-- state reingeben: state = noch nicht geackte messages, result = responses
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
@ -289,27 +288,28 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
sendAndAck responseQueue sock remainingSends = do sendAndAck responseQueue sock remainingSends = do
sendMany sock $ Map.elems remainingSends sendMany sock $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely -- if all requests have been acked/ responded to, return prematurely
recvLoop responseQueue remainingSends Set.empty recvLoop responseQueue remainingSends Set.empty Nothing
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers -> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known
-> IO () -> IO ()
recvLoop responseQueue remainingSends' receivedPartNums = do recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as -- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used -- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535 response <- deserialiseMessage <$> recv sock 65535
case response of case response of
-- drop errors Right msg@Response{} -> do
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
Right msg -> do
atomically $ writeTBQueue responseQueue msg atomically $ writeTBQueue responseQueue msg
let let
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends' newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums newReceivedParts = Set.insert (part msg) receivedPartNums
-- ToDo: handle responses with more parts than the request if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
then pure () then pure ()
else recvLoop responseQueue newRemaining receivedPartNums else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
-- drop errors and invalid messages
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache

View file

@ -21,8 +21,8 @@ data Action = QueryID
data FediChordMessage = Request data FediChordMessage = Request
{ requestID :: Integer { requestID :: Integer
, sender :: RemoteNodeState , sender :: RemoteNodeState
, parts :: Integer
, part :: Integer , part :: Integer
, isFinalPart :: Bool
-- ^ part starts at 1 -- ^ part starts at 1
, action :: Action , action :: Action
, payload :: Maybe ActionPayload , payload :: Maybe ActionPayload
@ -30,8 +30,8 @@ data FediChordMessage = Request
| Response | Response
{ responseTo :: Integer { responseTo :: Integer
, senderID :: NodeID , senderID :: NodeID
, parts :: Integer
, part :: Integer , part :: Integer
, isFinalPart :: Bool
, action :: Action , action :: Action
, payload :: Maybe ActionPayload , payload :: Maybe ActionPayload
} }

View file

@ -196,16 +196,16 @@ spec = do
requestTemplate = Request { requestTemplate = Request {
requestID = 2342 requestID = 2342
, sender = exampleNodeState , sender = exampleNodeState
, parts = 1
, part = 1 , part = 1
, isFinalPart = True
, action = undefined , action = undefined
, payload = undefined , payload = undefined
} }
responseTemplate = Response { responseTemplate = Response {
responseTo = 2342 responseTo = 2342
, senderID = nid exampleNodeState , senderID = nid exampleNodeState
, parts = 1
, part = 1 , part = 1
, isFinalPart = True
, action = undefined , action = undefined
, payload = undefined , payload = undefined
} }