Compare commits

..

No commits in common. "9bf7365a2c4864b5bc8888652cdc8e2e71f2a48d" and "30bf0529ed2d972fc0aa672bfa9dde874bbf06ff" have entirely different histories.

5 changed files with 85 additions and 66 deletions

View file

@ -11,7 +11,6 @@ Action ::= ENUMERATED {queryID, join, leave, stabilise, ping, queryLoad}
Request ::= SEQUENCE { Request ::= SEQUENCE {
action Action, action Action,
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer
receiverID NodeID,
sender NodeState, sender NodeState,
part Partnum, -- part number of this message, starts at 1 part Partnum, -- part number of this message, starts at 1
finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest finalPart BOOLEAN, -- flag indicating this `part` to be the last of this reuest
@ -105,13 +104,13 @@ PingRequestPayload ::= NULL -- do not include a node/ vserver ID, so that
PingResponsePayload ::= SEQUENCE OF NodeState PingResponsePayload ::= SEQUENCE OF NodeState
LoadRequestPayload ::= SEQUENCE { LoadRequestPayload ::= SEQUENCE {
upperSegmentBound NodeID lowerBound NodeID,
upperBound NodeID
} }
LoadResponsePayload ::= SEQUENCE { LoadResponsePayload ::= SEQUENCE {
loadSum REAL, loadSum REAL,
remainingLoadTarget REAL, remainingLoadTarget REAL
lowerBound NodeID
} }
END END

View file

@ -186,14 +186,14 @@ encodePayload payload'@PingResponsePayload{} =
<> [End Sequence] <> [End Sequence]
encodePayload payload'@LoadRequestPayload{} = encodePayload payload'@LoadRequestPayload{} =
[ Start Sequence [ Start Sequence
, IntVal . getNodeID $ loadSegmentUpperBound payload' , IntVal . getNodeID $ loadLowerBound payload'
, IntVal . getNodeID $ loadUpperBound payload'
, End Sequence , End Sequence
] ]
encodePayload payload'@LoadResponsePayload{} = encodePayload payload'@LoadResponsePayload{} =
[ Start Sequence [ Start Sequence
, Real $ loadSum payload' , Real $ loadSum payload'
, Real $ loadRemainingTarget payload' , Real $ loadRemainingTarget payload'
, IntVal . getNodeID $ loadSegmentLowerBound payload'
, End Sequence , End Sequence
] ]
@ -227,11 +227,10 @@ encodeQueryResult FORWARD{} = Enumerated 1
encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded encodeMessage :: FediChordMessage -- ^ the 'FediChordMessage to be encoded
-> [ASN1] -> [ASN1]
encodeMessage encodeMessage
(Request requestID receiverID sender part isFinalPart action requestPayload) = (Request requestID sender part isFinalPart action requestPayload) =
Start Sequence Start Sequence
: (Enumerated . fromIntegral . fromEnum $ action) : (Enumerated . fromIntegral . fromEnum $ action)
: IntVal requestID : IntVal requestID
: (IntVal . getNodeID $ receiverID)
: encodeNodeState sender : encodeNodeState sender
<> [IntVal part <> [IntVal part
, Boolean isFinalPart] , Boolean isFinalPart]
@ -275,7 +274,6 @@ parseMessage = do
parseRequest :: Action -> ParseASN1 FediChordMessage parseRequest :: Action -> ParseASN1 FediChordMessage
parseRequest action = do parseRequest action = do
requestID <- parseInteger requestID <- parseInteger
receiverID' <- fromInteger <$> parseInteger
sender <- parseNodeState sender <- parseNodeState
part <- parseInteger part <- parseInteger
isFinalPart <- parseBool isFinalPart <- parseBool
@ -288,7 +286,7 @@ parseRequest action = do
Ping -> parsePingRequestPayload Ping -> parsePingRequestPayload
QueryLoad -> parseLoadRequestPayload QueryLoad -> parseLoadRequestPayload
pure $ Request requestID receiverID' sender part isFinalPart action payload pure $ Request requestID sender part isFinalPart action payload
parseResponse :: Integer -> ParseASN1 FediChordMessage parseResponse :: Integer -> ParseASN1 FediChordMessage
parseResponse requestID = do parseResponse requestID = do
@ -463,19 +461,19 @@ parsePingResponsePayload = onNextContainer Sequence $ do
parseLoadRequestPayload :: ParseASN1 ActionPayload parseLoadRequestPayload :: ParseASN1 ActionPayload
parseLoadRequestPayload = onNextContainer Sequence $ do parseLoadRequestPayload = onNextContainer Sequence $ do
loadLowerBound' <- fromInteger <$> parseInteger
loadUpperBound' <- fromInteger <$> parseInteger loadUpperBound' <- fromInteger <$> parseInteger
pure LoadRequestPayload pure LoadRequestPayload
{ loadSegmentUpperBound = loadUpperBound' { loadLowerBound = loadLowerBound'
, loadUpperBound = loadUpperBound'
} }
parseLoadResponsePayload :: ParseASN1 ActionPayload parseLoadResponsePayload :: ParseASN1 ActionPayload
parseLoadResponsePayload = onNextContainer Sequence $ do parseLoadResponsePayload = onNextContainer Sequence $ do
loadSum' <- parseReal loadSum' <- parseReal
loadRemainingTarget' <- parseReal loadRemainingTarget' <- parseReal
loadSegmentLowerBound' <- fromInteger <$> parseInteger
pure LoadResponsePayload pure LoadResponsePayload
{ loadSum = loadSum' { loadSum = loadSum'
, loadRemainingTarget = loadRemainingTarget' , loadRemainingTarget = loadRemainingTarget'
, loadSegmentLowerBound = loadSegmentLowerBound'
} }

View file

@ -15,7 +15,6 @@ module Hash2Pub.DHTProtocol
, Action(..) , Action(..)
, ActionPayload(..) , ActionPayload(..)
, FediChordMessage(..) , FediChordMessage(..)
, mkRequest
, maximumParts , maximumParts
, sendQueryIdMessages , sendQueryIdMessages
, requestQueryID , requestQueryID
@ -485,21 +484,6 @@ respondJoin nsSTM msgSet = do
-- ....... request sending ....... -- ....... request sending .......
-- | defautl constructor for request messages, fills standard values like
-- part number to avoid code repition
mkRequest :: LocalNodeState s -> NodeID -> Action -> Maybe ActionPayload -> (Integer -> FediChordMessage)
mkRequest ns targetID action pl rid = Request
{ requestID = rid
, receiverID = targetID
, sender = toRemoteNodeState ns
-- part number and final flag can be changed by ASN1 encoder to make packet
-- fit the MTU restrictions
, part = 1
, isFinalPart = True
, action = action
, payload = pl
}
-- | send a join request and return the joined 'LocalNodeState' including neighbours -- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState -> LocalNodeStateSTM s -- ^ joining NodeState
@ -511,7 +495,7 @@ requestJoin toJoinOn ownStateSTM = do
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information -- extract own state for getting request information
responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ownState (getNid toJoinOn) Join (Just JoinRequestPayload)) sock responses <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do (cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM stateSnap <- readTVar ownStateSTM
let let
@ -644,7 +628,7 @@ lookupMessage :: Integral i
-> LocalNodeState s -- ^ sender node state -> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter -> Maybe i -- ^ optionally provide a different l parameter
-> (Integer -> FediChordMessage) -> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = mkRequest ns targetID QueryID (Just $ pl ns targetID) lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
where where
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
@ -657,7 +641,16 @@ requestStabilise :: LocalNodeState s -- ^ sending node
requestStabilise ns neighbour = do requestStabilise ns neighbour = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid neighbour) Stabilise (Just StabiliseRequestPayload)) responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Stabilise
, payload = Just StabiliseRequestPayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either either
-- forward IO error messages -- forward IO error messages
@ -692,11 +685,16 @@ requestLeave ns doMigration target = do
, leavePredecessors = predecessors ns , leavePredecessors = predecessors ns
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
responses <- bracket responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
(mkSendSocket srcAddr (getDomain target) (getDhtPort target)) Request {
close requestID = rid
(fmap Right , sender = toRemoteNodeState ns
. sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Leave (Just leavePayload)) , part = 1
, isFinalPart = False
, action = Leave
, payload = Just leavePayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either either
-- forward IO error messages -- forward IO error messages
@ -713,7 +711,16 @@ requestPing ns target = do
let srcAddr = confIP nodeConf let srcAddr = confIP nodeConf
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do (\sock -> do
resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) Ping (Just PingRequestPayload)) sock resp <- sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Ping
, payload = Just PingRequestPayload
}
) sock
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock (SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
pure $ Right (peerAddr, resp) pure $ Right (peerAddr, resp)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
@ -744,21 +751,32 @@ requestPing ns target = do
requestQueryLoad :: (MonadError String m, MonadIO m) requestQueryLoad :: (MonadError String m, MonadIO m)
=> LocalNodeState s => LocalNodeState s
-> NodeID -> NodeID
-> NodeID
-> RemoteNodeState -> RemoteNodeState
-> m SegmentLoadStats -> m SegmentLoadStats
requestQueryLoad ns upperIdBound target = do requestQueryLoad ns lowerIdBound upperIdBound target = do
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
let let
srcAddr = confIP nodeConf srcAddr = confIP nodeConf
loadReqPl = LoadRequestPayload loadPl = LoadRequestPayload
{ loadSegmentUpperBound = upperIdBound { loadLowerBound = lowerIdBound
, loadUpperBound = upperIdBound
} }
responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (mkRequest ns (getNid target) QueryLoad (Just loadReqPl)) (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = QueryLoad
, payload = Just loadPl
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException)) ) `catch` (\e -> pure . Left $ displayException (e :: IOException))
responseMsgSet <- liftEither responses responseMsgSet <- liftEither responses
-- throws an error if an exception happened -- throws an error if an exception happened
loadResPl <- maybe (throwError "no load response payload found") pure loadPl <- maybe (throwError "no load response payload found") pure
(foldr' (\msg acc -> case payload msg of (foldr' (\msg acc -> case payload msg of
-- just extract the first found LoadResponsePayload -- just extract the first found LoadResponsePayload
Just pl@LoadResponsePayload{} | isNothing acc -> Just pl Just pl@LoadResponsePayload{} | isNothing acc -> Just pl
@ -768,10 +786,10 @@ requestQueryLoad ns upperIdBound target = do
responseMsgSet responseMsgSet
) )
pure SegmentLoadStats pure SegmentLoadStats
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl { segmentLowerKeyBound = lowerIdBound
, segmentUpperKeyBound = upperIdBound , segmentUpperKeyBound = upperIdBound
, segmentLoad = loadSum loadResPl , segmentLoad = loadSum loadPl
, segmentOwnerLoadTarget = loadRemainingTarget loadResPl , segmentOwnerLoadTarget = loadRemainingTarget loadPl
} }

View file

@ -21,7 +21,6 @@ data Action = QueryID
data FediChordMessage = Request data FediChordMessage = Request
{ requestID :: Integer { requestID :: Integer
, receiverID :: NodeID
, sender :: RemoteNodeState , sender :: RemoteNodeState
, part :: Integer , part :: Integer
, isFinalPart :: Bool , isFinalPart :: Bool
@ -59,10 +58,6 @@ data ActionPayload = QueryIDRequestPayload
} }
| StabiliseRequestPayload | StabiliseRequestPayload
| PingRequestPayload | PingRequestPayload
| LoadRequestPayload
{ loadSegmentUpperBound :: NodeID
-- ^ upper bound of segment interested in,
}
| QueryIDResponsePayload | QueryIDResponsePayload
{ queryResult :: QueryResponse { queryResult :: QueryResponse
} }
@ -79,10 +74,13 @@ data ActionPayload = QueryIDRequestPayload
| PingResponsePayload | PingResponsePayload
{ pingNodeStates :: [RemoteNodeState] { pingNodeStates :: [RemoteNodeState]
} }
| LoadRequestPayload
{ loadLowerBound :: NodeID
, loadUpperBound :: NodeID
}
| LoadResponsePayload | LoadResponsePayload
{ loadSum :: Double { loadSum :: Double
, loadRemainingTarget :: Double , loadRemainingTarget :: Double
, loadSegmentLowerBound :: NodeID
} }
deriving (Show, Eq) deriving (Show, Eq)

View file

@ -222,14 +222,21 @@ spec = do
] ]
} }
qLoadReqPayload = LoadRequestPayload qLoadReqPayload = LoadRequestPayload
{ loadSegmentUpperBound = 1025 { loadLowerBound = fromInteger 12
, loadUpperBound = fromInteger 1025
} }
qLoadResPayload = LoadResponsePayload qLoadResPayload = LoadResponsePayload
{ loadSum = 3.141 { loadSum = 3.141
, loadRemainingTarget = -1.337 , loadRemainingTarget = -1.337
, loadSegmentLowerBound = 12
} }
requestTemplate = Request {
requestID = 2342
, sender = exampleNodeState
, part = 1
, isFinalPart = True
, action = undefined
, payload = undefined
}
responseTemplate = Response { responseTemplate = Response {
requestID = 2342 requestID = 2342
, senderID = nid exampleNodeState , senderID = nid exampleNodeState
@ -238,7 +245,7 @@ spec = do
, action = undefined , action = undefined
, payload = undefined , payload = undefined
} }
requestWith senderNode a pa = mkRequest senderNode 4545 a (Just pa) $ 2342 requestWith a pa = requestTemplate {action = a, payload = Just pa}
responseWith a pa = responseTemplate {action = a, payload = Just pa} responseWith a pa = responseTemplate {action = a, payload = Just pa}
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
@ -249,19 +256,18 @@ spec = do
} }
it "messages are encoded and decoded correctly from and to ASN1" $ do it "messages are encoded and decoded correctly from and to ASN1" $ do
localNS <- exampleLocalNode encodeDecodeAndCheck $ requestWith QueryID qidReqPayload
encodeDecodeAndCheck $ requestWith localNS QueryID qidReqPayload encodeDecodeAndCheck $ requestWith Join jReqPayload
encodeDecodeAndCheck $ requestWith localNS Join jReqPayload encodeDecodeAndCheck $ requestWith Leave lReqPayload
encodeDecodeAndCheck $ requestWith localNS Leave lReqPayload encodeDecodeAndCheck $ requestWith Stabilise stabReqPayload
encodeDecodeAndCheck $ requestWith localNS Stabilise stabReqPayload encodeDecodeAndCheck $ requestWith Ping pingReqPayload
encodeDecodeAndCheck $ requestWith localNS Ping pingReqPayload
encodeDecodeAndCheck $ requestWith localNS QueryLoad qLoadReqPayload
encodeDecodeAndCheck $ responseWith QueryID qidResPayload1 encodeDecodeAndCheck $ responseWith QueryID qidResPayload1
encodeDecodeAndCheck $ responseWith QueryID qidResPayload2 encodeDecodeAndCheck $ responseWith QueryID qidResPayload2
encodeDecodeAndCheck $ responseWith Join jResPayload encodeDecodeAndCheck $ responseWith Join jResPayload
encodeDecodeAndCheck $ responseWith Leave lResPayload encodeDecodeAndCheck $ responseWith Leave lResPayload
encodeDecodeAndCheck $ responseWith Stabilise stabResPayload encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
encodeDecodeAndCheck $ responseWith Ping pingResPayload encodeDecodeAndCheck $ responseWith Ping pingResPayload
encodeDecodeAndCheck $ requestWith QueryLoad qLoadReqPayload
encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload encodeDecodeAndCheck $ responseWith QueryLoad qLoadResPayload
it "messages are encoded and decoded to ASN.1 DER properly" $ it "messages are encoded and decoded to ASN.1 DER properly" $
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)