Compare commits
2 commits
30bf0529ed
...
9bf7365a2c
Author | SHA1 | Date | |
---|---|---|---|
9bf7365a2c | |||
5e745cd035 |
5 changed files with 66 additions and 85 deletions
|
@ -11,6 +11,7 @@ Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad}
|
|||
Request ::= SEQUENCE {
|
||||
action Action,
|
||||
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
|
||||
receiverID NodeID,
|
||||
sender NodeState,
|
||||
part Partnum, -- part number of this message, starts at 1
|
||||
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
|
||||
|
@ -104,13 +105,13 @@ PingRequestPayload ::= NULL -- do not include a node/ vserver ID, so that
|
|||
PingResponsePayload ::= SEQUENCE OF NodeState
|
||||
|
||||
LoadRequestPayload ::= SEQUENCE {
|
||||
lowerBound NodeID,
|
||||
upperBound NodeID
|
||||
upperSegmentBound NodeID
|
||||
}
|
||||
|
||||
LoadResponsePayload ::= SEQUENCE {
|
||||
loadSum REAL,
|
||||
remainingLoadTarget REAL
|
||||
remainingLoadTarget REAL,
|
||||
lowerBound NodeID
|
||||
}
|
||||
|
||||
END
|
||||
|
|
|
@ -186,14 +186,14 @@ encodePayload payload'@PingResponsePayload{} =
|
|||
<> [End Sequence]
|
||||
encodePayload payload'@LoadRequestPayload{} =
|
||||
[ Start Sequence
|
||||
, IntVal . getNodeID $ loadLowerBound payload'
|
||||
, IntVal . getNodeID $ loadUpperBound payload'
|
||||
, IntVal . getNodeID $ loadSegmentUpperBound payload'
|
||||
, End Sequence
|
||||
]
|
||||
encodePayload payload'@LoadResponsePayload{} =
|
||||
[ Start Sequence
|
||||
, Real $ loadSum payload'
|
||||
, Real $ loadRemainingTarget payload'
|
||||
, IntVal . getNodeID $ loadSegmentLowerBound payload'
|
||||
, End Sequence
|
||||
]
|
||||
|
||||
|
@ -227,10 +227,11 @@ encodeQueryResult FORWARD{} = Enumerated 1
|
|||
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
|
||||
-> [ASN1]
|
||||
encodeMessage
|
||||
(Request requestID sender part isFinalPart action requestPayload) =
|
||||
(Request requestID receiverID sender part isFinalPart action requestPayload) =
|
||||
Start Sequence
|
||||
: (Enumerated . fromIntegral . fromEnum $ action)
|
||||
: IntVal requestID
|
||||
: (IntVal . getNodeID $ receiverID)
|
||||
: encodeNodeState sender
|
||||
<> [IntVal part
|
||||
, Boolean isFinalPart]
|
||||
|
@ -274,6 +275,7 @@ parseMessage = do
|
|||
parseRequest :: Action -> ParseASN1 FediChordMessage
|
||||
parseRequest action = do
|
||||
requestID <- parseInteger
|
||||
receiverID' <- fromInteger <$> parseInteger
|
||||
sender <- parseNodeState
|
||||
part <- parseInteger
|
||||
isFinalPart <- parseBool
|
||||
|
@ -286,7 +288,7 @@ parseRequest action = do
|
|||
Ping -> parsePingRequestPayload
|
||||
QueryLoad -> parseLoadRequestPayload
|
||||
|
||||
pure $ Request requestID sender part isFinalPart action payload
|
||||
pure $ Request requestID receiverID' sender part isFinalPart action payload
|
||||
|
||||
parseResponse :: Integer -> ParseASN1 FediChordMessage
|
||||
parseResponse requestID = do
|
||||
|
@ -461,19 +463,19 @@ parsePingResponsePayload = onNextContainer Sequence $ do
|
|||
|
||||
parseLoadRequestPayload :: ParseASN1 ActionPayload
|
||||
parseLoadRequestPayload = onNextContainer Sequence $ do
|
||||
loadLowerBound' <- fromInteger <$> parseInteger
|
||||
loadUpperBound' <- fromInteger <$> parseInteger
|
||||
pure LoadRequestPayload
|
||||
{ loadLowerBound = loadLowerBound'
|
||||
, loadUpperBound = loadUpperBound'
|
||||
{ loadSegmentUpperBound = loadUpperBound'
|
||||
}
|
||||
|
||||
parseLoadResponsePayload :: ParseASN1 ActionPayload
|
||||
parseLoadResponsePayload = onNextContainer Sequence $ do
|
||||
loadSum' <- parseReal
|
||||
loadRemainingTarget' <- parseReal
|
||||
loadSegmentLowerBound' <- fromInteger <$> parseInteger
|
||||
pure LoadResponsePayload
|
||||
{ loadSum = loadSum'
|
||||
, loadRemainingTarget = loadRemainingTarget'
|
||||
, loadSegmentLowerBound = loadSegmentLowerBound'
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ module Hash2Pub.DHTProtocol
|
|||
, Action(..)
|
||||
, ActionPayload(..)
|
||||
, FediChordMessage(..)
|
||||
, mkRequest
|
||||
, maximumParts
|
||||
, sendQueryIdMessages
|
||||
, requestQueryID
|
||||
|
@ -484,6 +485,21 @@ respondJoin nsSTM msgSet = do
|
|||
|
||||
-- ....... request sending .......
|
||||
|
||||
-- | defautl constructor for request messages, fills standard values like
|
||||
-- part number to avoid code repition
|
||||
mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage)
|
||||
mkRequest ns targetID action pl rid = Request
|
||||
{ requestID = rid
|
||||
, receiverID = targetID
|
||||
, sender = toRemoteNodeState ns
|
||||
-- part number and final flag can be changed by ASN1 encoder to make packet
|
||||
-- fit the MTU restrictions
|
||||
, part = 1
|
||||
, isFinalPart = True
|
||||
, action = action
|
||||
, payload = pl
|
||||
}
|
||||
|
||||
-- | send a join request and return the joined 'LocalNodeState' including neighbours
|
||||
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
|
||||
-> LocalNodeStateSTM s -- ^ joining NodeState
|
||||
|
@ -495,7 +511,7 @@ requestJoin toJoinOn ownStateSTM = do
|
|||
let srcAddr = confIP nodeConf
|
||||
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
||||
-- extract own state for getting request information
|
||||
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
||||
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock
|
||||
(cacheInsertQ, joinedState) <- atomically $ do
|
||||
stateSnap <- readTVar ownStateSTM
|
||||
let
|
||||
|
@ -628,7 +644,7 @@ lookupMessage :: Integral i
|
|||
-> LocalNodeState s -- ^ sender node state
|
||||
-> Maybe i -- ^ optionally provide a different l parameter
|
||||
-> (Integer -> FediChordMessage)
|
||||
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||
lookupMessage targetID ns lParam = mkRequest ns targetID QueryID (Just $ pl ns targetID)
|
||||
where
|
||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
|
||||
|
||||
|
@ -641,16 +657,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node
|
|||
requestStabilise ns neighbour = do
|
||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||
let srcAddr = confIP nodeConf
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Stabilise
|
||||
, payload = Just StabiliseRequestPayload
|
||||
}
|
||||
)
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload))
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
either
|
||||
-- forward IO error messages
|
||||
|
@ -685,17 +692,12 @@ requestLeave ns doMigration target = do
|
|||
, leavePredecessors = predecessors ns
|
||||
, leaveDoMigration = doMigration
|
||||
}
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Leave
|
||||
, payload = Just leavePayload
|
||||
}
|
||||
)
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
responses <- bracket
|
||||
(mkSendSocket srcAddr (getDomain target) (getDhtPort target))
|
||||
close
|
||||
(fmap Right
|
||||
. sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload))
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
either
|
||||
-- forward IO error messages
|
||||
(pure . Left)
|
||||
|
@ -711,16 +713,7 @@ requestPing ns target = do
|
|||
let srcAddr = confIP nodeConf
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
||||
(\sock -> do
|
||||
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Ping
|
||||
, payload = Just PingRequestPayload
|
||||
}
|
||||
) sock
|
||||
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock
|
||||
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
|
||||
pure $ Right (peerAddr, resp)
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
|
@ -751,32 +744,21 @@ requestPing ns target = do
|
|||
requestQueryLoad :: (MonadError String m, MonadIO m)
|
||||
=> LocalNodeState s
|
||||
-> NodeID
|
||||
-> NodeID
|
||||
-> RemoteNodeState
|
||||
-> m SegmentLoadStats
|
||||
requestQueryLoad ns lowerIdBound upperIdBound target = do
|
||||
requestQueryLoad ns upperIdBound target = do
|
||||
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
|
||||
let
|
||||
srcAddr = confIP nodeConf
|
||||
loadPl = LoadRequestPayload
|
||||
{ loadLowerBound = lowerIdBound
|
||||
, loadUpperBound = upperIdBound
|
||||
loadReqPl = LoadRequestPayload
|
||||
{ loadSegmentUpperBound = upperIdBound
|
||||
}
|
||||
responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
||||
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = QueryLoad
|
||||
, payload = Just loadPl
|
||||
}
|
||||
)
|
||||
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl))
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
responseMsgSet <- liftEither responses
|
||||
-- throws an error if an exception happened
|
||||
loadPl <- maybe (throwError "no load response payload found") pure
|
||||
loadResPl <- maybe (throwError "no load response payload found") pure
|
||||
(foldr' (\msg acc -> case payload msg of
|
||||
-- just extract the first found LoadResponsePayload
|
||||
Just pl@LoadResponsePayload{} | isNothing acc -> Just pl
|
||||
|
@ -786,10 +768,10 @@ requestQueryLoad ns lowerIdBound upperIdBound target = do
|
|||
responseMsgSet
|
||||
)
|
||||
pure SegmentLoadStats
|
||||
{ segmentLowerKeyBound = lowerIdBound
|
||||
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
|
||||
, segmentUpperKeyBound = upperIdBound
|
||||
, segmentLoad = loadSum loadPl
|
||||
, segmentOwnerLoadTarget = loadRemainingTarget loadPl
|
||||
, segmentLoad = loadSum loadResPl
|
||||
, segmentOwnerLoadTarget = loadRemainingTarget loadResPl
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ data Action = QueryID
|
|||
|
||||
data FediChordMessage = Request
|
||||
{ requestID :: Integer
|
||||
, receiverID :: NodeID
|
||||
, sender :: RemoteNodeState
|
||||
, part :: Integer
|
||||
, isFinalPart :: Bool
|
||||
|
@ -58,6 +59,10 @@ data ActionPayload = QueryIDRequestPayload
|
|||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
| LoadRequestPayload
|
||||
{ loadSegmentUpperBound :: NodeID
|
||||
-- ^ upper bound of segment interested in,
|
||||
}
|
||||
| QueryIDResponsePayload
|
||||
{ queryResult :: QueryResponse
|
||||
}
|
||||
|
@ -74,13 +79,10 @@ data ActionPayload = QueryIDRequestPayload
|
|||
| PingResponsePayload
|
||||
{ pingNodeStates :: [RemoteNodeState]
|
||||
}
|
||||
| LoadRequestPayload
|
||||
{ loadLowerBound :: NodeID
|
||||
, loadUpperBound :: NodeID
|
||||
}
|
||||
| LoadResponsePayload
|
||||
{ loadSum :: Double
|
||||
, loadRemainingTarget :: Double
|
||||
{ loadSum :: Double
|
||||
, loadRemainingTarget :: Double
|
||||
, loadSegmentLowerBound :: NodeID
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
|
|
|
@ -222,21 +222,14 @@ spec = do
|
|||
]
|
||||
}
|
||||
qLoadReqPayload = LoadRequestPayload
|
||||
{ loadLowerBound = fromInteger 12
|
||||
, loadUpperBound = fromInteger 1025
|
||||
{ loadSegmentUpperBound = 1025
|
||||
}
|
||||
qLoadResPayload = LoadResponsePayload
|
||||
{ loadSum = 3.141
|
||||
, loadRemainingTarget = -1.337
|
||||
, loadSegmentLowerBound = 12
|
||||
}
|
||||
requestTemplate = Request {
|
||||
requestID = 2342
|
||||
, sender = exampleNodeState
|
||||
, part = 1
|
||||
, isFinalPart = True
|
||||
, action = undefined
|
||||
, payload = undefined
|
||||
}
|
||||
|
||||
responseTemplate = Response {
|
||||
requestID = 2342
|
||||
, senderID = nid exampleNodeState
|
||||
|
@ -245,7 +238,7 @@ spec = do
|
|||
, action = undefined
|
||||
, payload = undefined
|
||||
}
|
||||
requestWith a pa = requestTemplate {action = a, payload = Just pa}
|
||||
requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) $ 2342
|
||||
responseWith a pa = responseTemplate {action = a, payload = Just pa}
|
||||
|
||||
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
|
||||
|
@ -256,18 +249,19 @@ spec = do
|
|||
}
|
||||
|
||||
it "messages are encoded and decoded correctly from and to ASN1" $ do
|
||||
encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Join jReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Leave lReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload
|
||||
encodeDecodeAndCheck $ requestWith Ping pingReqPayload
|
||||
localNS <- exampleLocalNode
|
||||
encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Join jReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload
|
||||
encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload
|
||||
encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
|
||||
encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
|
||||
encodeDecodeAndCheck $ responseWith Join jResPayload
|
||||
encodeDecodeAndCheck $ responseWith Leave lResPayload
|
||||
encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
|
||||
encodeDecodeAndCheck $ responseWith Ping pingResPayload
|
||||
encodeDecodeAndCheck $ requestWith QueryLoad qLoadReqPayload
|
||||
encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload
|
||||
it "messages are encoded and decoded to ASN.1 DER properly" $
|
||||
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue