Compare commits
2 commits
fea9660f80
...
2542091379
Author | SHA1 | Date | |
---|---|---|---|
|
2542091379 | ||
|
f8d444d5b6 |
|
@ -4,14 +4,16 @@ NodeID ::= INTEGER (0..115792089237316195423570985008687907853269984665640564039
|
||||||
|
|
||||||
Domain ::= VisibleString
|
Domain ::= VisibleString
|
||||||
|
|
||||||
|
Partnum ::= INTEGER (0..150)
|
||||||
|
|
||||||
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
|
Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
|
||||||
|
|
||||||
Request ::= SEQUENCE {
|
Request ::= SEQUENCE {
|
||||||
action Action,
|
action Action,
|
||||||
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
||||||
sender NodeState,
|
sender NodeState,
|
||||||
parts INTEGER (1..150), -- number of message parts
|
part Partnum, -- part number of this message, starts at 1
|
||||||
part INTEGER (1..150), -- part number of this message, starts at 1
|
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
|
||||||
actionPayload CHOICE {
|
actionPayload CHOICE {
|
||||||
queryIDRequestPayload QueryIDRequestPayload,
|
queryIDRequestPayload QueryIDRequestPayload,
|
||||||
joinRequestPayload JoinRequestPayload,
|
joinRequestPayload JoinRequestPayload,
|
||||||
|
@ -27,8 +29,8 @@ Request ::= SEQUENCE {
|
||||||
Response ::= SEQUENCE {
|
Response ::= SEQUENCE {
|
||||||
responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
||||||
senderID NodeID,
|
senderID NodeID,
|
||||||
parts INTEGER (0..150),
|
part Partnum,
|
||||||
part INTEGER (0..150),
|
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this response
|
||||||
action Action,
|
action Action,
|
||||||
actionPayload CHOICE {
|
actionPayload CHOICE {
|
||||||
queryIDResponsePayload QueryIDResponsePayload,
|
queryIDResponsePayload QueryIDResponsePayload,
|
||||||
|
|
|
@ -103,7 +103,7 @@ serialiseMessage maxBytesLength msg =
|
||||||
modifyMessage i (partNum, pl) pls = (partNum, msg {
|
modifyMessage i (partNum, pl) pls = (partNum, msg {
|
||||||
part = partNum
|
part = partNum
|
||||||
, payload = Just pl
|
, payload = Just pl
|
||||||
, parts = fromIntegral i
|
, isFinalPart = partNum == fromIntegral i
|
||||||
}):pls
|
}):pls
|
||||||
-- part starts at 1
|
-- part starts at 1
|
||||||
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
|
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
|
||||||
|
@ -216,23 +216,22 @@ encodeQueryResult FORWARD{} = Enumerated 1
|
||||||
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
|
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
|
||||||
-> [ASN1]
|
-> [ASN1]
|
||||||
encodeMessage
|
encodeMessage
|
||||||
(Request requestID sender parts part action requestPayload) =
|
(Request requestID sender part isFinalPart action requestPayload) =
|
||||||
Start Sequence
|
Start Sequence
|
||||||
: (Enumerated . fromIntegral . fromEnum $ action)
|
: (Enumerated . fromIntegral . fromEnum $ action)
|
||||||
: IntVal requestID
|
: IntVal requestID
|
||||||
: encodeNodeState sender
|
: encodeNodeState sender
|
||||||
<> [
|
<> [IntVal part
|
||||||
IntVal parts
|
, Boolean isFinalPart]
|
||||||
, IntVal part ]
|
|
||||||
<> maybe [] encodePayload requestPayload
|
<> maybe [] encodePayload requestPayload
|
||||||
<> [End Sequence]
|
<> [End Sequence]
|
||||||
encodeMessage
|
encodeMessage
|
||||||
(Response responseTo senderID parts part action responsePayload) = [
|
(Response responseTo senderID part isFinalPart action responsePayload) = [
|
||||||
Start Sequence
|
Start Sequence
|
||||||
, IntVal responseTo
|
, IntVal responseTo
|
||||||
, IntVal . getNodeID $ senderID
|
, IntVal . getNodeID $ senderID
|
||||||
, IntVal parts
|
|
||||||
, IntVal part
|
, IntVal part
|
||||||
|
, Boolean isFinalPart
|
||||||
, Enumerated . fromIntegral . fromEnum $ action]
|
, Enumerated . fromIntegral . fromEnum $ action]
|
||||||
<> maybe [] encodePayload responsePayload
|
<> maybe [] encodePayload responsePayload
|
||||||
<> [End Sequence]
|
<> [End Sequence]
|
||||||
|
@ -265,8 +264,8 @@ parseRequest :: Action -> ParseASN1 FediChordMessage
|
||||||
parseRequest action = do
|
parseRequest action = do
|
||||||
requestID <- parseInteger
|
requestID <- parseInteger
|
||||||
sender <- parseNodeState
|
sender <- parseNodeState
|
||||||
parts <- parseInteger
|
|
||||||
part <- parseInteger
|
part <- parseInteger
|
||||||
|
isFinalPart <- parseBool
|
||||||
hasPayload <- hasNext
|
hasPayload <- hasNext
|
||||||
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
||||||
QueryID -> parseQueryIDRequest
|
QueryID -> parseQueryIDRequest
|
||||||
|
@ -275,13 +274,13 @@ parseRequest action = do
|
||||||
Stabilise -> parseStabiliseRequest
|
Stabilise -> parseStabiliseRequest
|
||||||
Ping -> parsePingRequest
|
Ping -> parsePingRequest
|
||||||
|
|
||||||
pure $ Request requestID sender parts part action payload
|
pure $ Request requestID sender part isFinalPart action payload
|
||||||
|
|
||||||
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
||||||
parseResponse responseTo = do
|
parseResponse responseTo = do
|
||||||
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
|
senderID <- fromInteger <$> parseInteger :: ParseASN1 NodeID
|
||||||
parts <- parseInteger
|
|
||||||
part <- parseInteger
|
part <- parseInteger
|
||||||
|
isFinalPart <- parseBool
|
||||||
action <- parseEnum :: ParseASN1 Action
|
action <- parseEnum :: ParseASN1 Action
|
||||||
hasPayload <- hasNext
|
hasPayload <- hasNext
|
||||||
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
payload <- if not hasPayload then pure Nothing else Just <$> case action of
|
||||||
|
@ -291,7 +290,14 @@ parseResponse responseTo = do
|
||||||
Stabilise -> parseStabiliseResponse
|
Stabilise -> parseStabiliseResponse
|
||||||
Ping -> parsePingResponse
|
Ping -> parsePingResponse
|
||||||
|
|
||||||
pure $ Response responseTo senderID parts part action payload
|
pure $ Response responseTo senderID part isFinalPart action payload
|
||||||
|
|
||||||
|
parseBool :: ParseASN1 Bool
|
||||||
|
parseBool = do
|
||||||
|
i <- getNext
|
||||||
|
case i of
|
||||||
|
Boolean parsed -> pure parsed
|
||||||
|
x -> throwParseError $ "Expected Boolean but got " <> show x
|
||||||
|
|
||||||
parseInteger :: ParseASN1 Integer
|
parseInteger :: ParseASN1 Integer
|
||||||
parseInteger = do
|
parseInteger = do
|
||||||
|
|
|
@ -177,7 +177,7 @@ requestJoin :: NodeState a => a -- ^ currently responsible node to b
|
||||||
-> IO (Either String LocalNodeState) -- ^ node after join with all its new information
|
-> IO (Either String LocalNodeState) -- ^ node after join with all its new information
|
||||||
requestJoin toJoinOn ownState =
|
requestJoin toJoinOn ownState =
|
||||||
bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
||||||
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock
|
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
||||||
joinedStateUnsorted <- foldM
|
joinedStateUnsorted <- foldM
|
||||||
(\nsAcc msg -> case payload msg of
|
(\nsAcc msg -> case payload msg of
|
||||||
Nothing -> pure nsAcc
|
Nothing -> pure nsAcc
|
||||||
|
@ -259,7 +259,7 @@ sendQueryIdMessage :: NodeID -- ^ target key ID to look u
|
||||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||||
sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns)
|
sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns)
|
||||||
where
|
where
|
||||||
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID)
|
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
|
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
|
||||||
|
|
||||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||||
|
@ -281,7 +281,6 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
recvdParts <- atomically $ flushTBQueue responseQ
|
recvdParts <- atomically $ flushTBQueue responseQ
|
||||||
pure $ Set.fromList recvdParts
|
pure $ Set.fromList recvdParts
|
||||||
where
|
where
|
||||||
-- state reingeben: state = noch nicht geackte messages, result = responses
|
|
||||||
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
||||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||||
|
@ -289,27 +288,28 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
sendAndAck responseQueue sock remainingSends = do
|
sendAndAck responseQueue sock remainingSends = do
|
||||||
sendMany sock $ Map.elems remainingSends
|
sendMany sock $ Map.elems remainingSends
|
||||||
-- if all requests have been acked/ responded to, return prematurely
|
-- if all requests have been acked/ responded to, return prematurely
|
||||||
recvLoop responseQueue remainingSends Set.empty
|
recvLoop responseQueue remainingSends Set.empty Nothing
|
||||||
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||||
-> Set.Set Integer -- ^ already received response part numbers
|
-> Set.Set Integer -- ^ already received response part numbers
|
||||||
|
-> Maybe Integer -- ^ total number of response parts if already known
|
||||||
-> IO ()
|
-> IO ()
|
||||||
recvLoop responseQueue remainingSends' receivedPartNums = do
|
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
|
||||||
-- 65535 is maximum length of UDP packets, as long as
|
-- 65535 is maximum length of UDP packets, as long as
|
||||||
-- no IPv6 jumbograms are used
|
-- no IPv6 jumbograms are used
|
||||||
response <- deserialiseMessage <$> recv sock 65535
|
response <- deserialiseMessage <$> recv sock 65535
|
||||||
case response of
|
case response of
|
||||||
-- drop errors
|
Right msg@Response{} -> do
|
||||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
|
|
||||||
Right msg -> do
|
|
||||||
atomically $ writeTBQueue responseQueue msg
|
atomically $ writeTBQueue responseQueue msg
|
||||||
let
|
let
|
||||||
|
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
|
||||||
newRemaining = Map.delete (part msg) remainingSends'
|
newRemaining = Map.delete (part msg) remainingSends'
|
||||||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
newReceivedParts = Set.insert (part msg) receivedPartNums
|
||||||
-- ToDo: handle responses with more parts than the request
|
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
||||||
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
|
|
||||||
then pure ()
|
then pure ()
|
||||||
else recvLoop responseQueue newRemaining receivedPartNums
|
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
|
||||||
|
-- drop errors and invalid messages
|
||||||
|
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
|
||||||
|
|
||||||
|
|
||||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||||
|
|
|
@ -21,8 +21,8 @@ data Action = QueryID
|
||||||
data FediChordMessage = Request
|
data FediChordMessage = Request
|
||||||
{ requestID :: Integer
|
{ requestID :: Integer
|
||||||
, sender :: RemoteNodeState
|
, sender :: RemoteNodeState
|
||||||
, parts :: Integer
|
|
||||||
, part :: Integer
|
, part :: Integer
|
||||||
|
, isFinalPart :: Bool
|
||||||
-- ^ part starts at 1
|
-- ^ part starts at 1
|
||||||
, action :: Action
|
, action :: Action
|
||||||
, payload :: Maybe ActionPayload
|
, payload :: Maybe ActionPayload
|
||||||
|
@ -30,8 +30,8 @@ data FediChordMessage = Request
|
||||||
| Response
|
| Response
|
||||||
{ responseTo :: Integer
|
{ responseTo :: Integer
|
||||||
, senderID :: NodeID
|
, senderID :: NodeID
|
||||||
, parts :: Integer
|
|
||||||
, part :: Integer
|
, part :: Integer
|
||||||
|
, isFinalPart :: Bool
|
||||||
, action :: Action
|
, action :: Action
|
||||||
, payload :: Maybe ActionPayload
|
, payload :: Maybe ActionPayload
|
||||||
}
|
}
|
||||||
|
|
|
@ -196,16 +196,16 @@ spec = do
|
||||||
requestTemplate = Request {
|
requestTemplate = Request {
|
||||||
requestID = 2342
|
requestID = 2342
|
||||||
, sender = exampleNodeState
|
, sender = exampleNodeState
|
||||||
, parts = 1
|
|
||||||
, part = 1
|
, part = 1
|
||||||
|
, isFinalPart = True
|
||||||
, action = undefined
|
, action = undefined
|
||||||
, payload = undefined
|
, payload = undefined
|
||||||
}
|
}
|
||||||
responseTemplate = Response {
|
responseTemplate = Response {
|
||||||
responseTo = 2342
|
responseTo = 2342
|
||||||
, senderID = nid exampleNodeState
|
, senderID = nid exampleNodeState
|
||||||
, parts = 1
|
|
||||||
, part = 1
|
, part = 1
|
||||||
|
, isFinalPart = True
|
||||||
, action = undefined
|
, action = undefined
|
||||||
, payload = undefined
|
, payload = undefined
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue