Refactor predecessors and successors to hold RemoteNodeStates
- neighbour nodes need to be contacted reliably - Only holding NodeIDs requires a cache lookup for getting hostname and port. This is brittle as the entry could've been purged from cache. - refactored ASN.1 scheme, types and add/ sort/ remove implementations closes #46
This commit is contained in:
parent
67cba1b69b
commit
f15d83baff
|
@ -62,8 +62,8 @@ NodeCache ::= SEQUENCE OF CacheEntry
|
|||
JoinRequestPayload ::= NULL
|
||||
|
||||
JoinResponsePayload ::= SEQUENCE {
|
||||
successors SEQUENCE OF NodeID,
|
||||
predecessors SEQUENCE OF NodeID,
|
||||
successors SEQUENCE OF NodeState,
|
||||
predecessors SEQUENCE OF NodeState,
|
||||
cache NodeCache
|
||||
}
|
||||
|
||||
|
@ -82,14 +82,14 @@ QueryIDResponsePayload ::= SEQUENCE {
|
|||
StabiliseRequestPayload ::= NULL
|
||||
|
||||
StabiliseResponsePayload ::= SEQUENCE {
|
||||
successors SEQUENCE OF NodeID,
|
||||
predecessors SEQUENCE OF NodeID
|
||||
successors SEQUENCE OF NodeState,
|
||||
predecessors SEQUENCE OF NodeState
|
||||
-- ToDo: transfer of handled key data, if newly responsible for it
|
||||
}
|
||||
|
||||
LeaveRequestPayload ::= SEQUENCE {
|
||||
successors SEQUENCE OF NodeID,
|
||||
predecessors SEQUENCE OF NodeID
|
||||
successors SEQUENCE OF NodeState,
|
||||
predecessors SEQUENCE OF NodeState
|
||||
-- ToDo: transfer of own data to newly responsible node
|
||||
}
|
||||
|
||||
|
|
|
@ -130,20 +130,20 @@ encodePayload LeaveResponsePayload = [Null]
|
|||
encodePayload payload'@LeaveRequestPayload{} =
|
||||
Start Sequence
|
||||
: Start Sequence
|
||||
: fmap (IntVal . getNodeID) (leaveSuccessors payload')
|
||||
: concatMap encodeNodeState (leaveSuccessors payload')
|
||||
<> [End Sequence
|
||||
, Start Sequence]
|
||||
<> fmap (IntVal . getNodeID) (leavePredecessors payload')
|
||||
<> concatMap encodeNodeState (leavePredecessors payload')
|
||||
<> [End Sequence
|
||||
, End Sequence]
|
||||
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
|
||||
encodePayload payload'@StabiliseResponsePayload{} =
|
||||
Start Sequence
|
||||
: Start Sequence
|
||||
: fmap (IntVal . getNodeID) (stabiliseSuccessors payload')
|
||||
: concatMap encodeNodeState (stabiliseSuccessors payload')
|
||||
<> [End Sequence
|
||||
, Start Sequence]
|
||||
<> fmap (IntVal . getNodeID) (stabilisePredecessors payload')
|
||||
<> concatMap encodeNodeState (stabilisePredecessors payload')
|
||||
<> [End Sequence
|
||||
, End Sequence]
|
||||
encodePayload payload'@StabiliseRequestPayload = [Null]
|
||||
|
@ -170,10 +170,10 @@ encodePayload payload'@QueryIDRequestPayload{} = [
|
|||
encodePayload payload'@JoinResponsePayload{} =
|
||||
Start Sequence
|
||||
: Start Sequence
|
||||
: fmap (IntVal . getNodeID) (joinSuccessors payload')
|
||||
: concatMap encodeNodeState (joinSuccessors payload')
|
||||
<> [End Sequence
|
||||
, Start Sequence]
|
||||
<> fmap (IntVal . getNodeID) (joinPredecessors payload')
|
||||
<> concatMap encodeNodeState (joinPredecessors payload')
|
||||
<> [End Sequence
|
||||
, Start Sequence]
|
||||
<> concatMap encodeCacheEntry (joinCache payload')
|
||||
|
@ -368,8 +368,8 @@ parseJoinRequest = do
|
|||
|
||||
parseJoinResponse :: ParseASN1 ActionPayload
|
||||
parseJoinResponse = onNextContainer Sequence $ do
|
||||
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
cache <- parseNodeCache
|
||||
pure $ JoinResponsePayload {
|
||||
joinSuccessors = succ'
|
||||
|
@ -404,8 +404,8 @@ parseStabiliseRequest = do
|
|||
|
||||
parseStabiliseResponse :: ParseASN1 ActionPayload
|
||||
parseStabiliseResponse = onNextContainer Sequence $ do
|
||||
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pure $ StabiliseResponsePayload {
|
||||
stabiliseSuccessors = succ'
|
||||
, stabilisePredecessors = pred'
|
||||
|
@ -413,8 +413,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do
|
|||
|
||||
parseLeaveRequest :: ParseASN1 ActionPayload
|
||||
parseLeaveRequest = onNextContainer Sequence $ do
|
||||
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
|
||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pure $ LeaveRequestPayload {
|
||||
leaveSuccessors = succ'
|
||||
, leavePredecessors = pred'
|
||||
|
|
|
@ -73,7 +73,7 @@ import Debug.Trace (trace)
|
|||
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
|
||||
queryLocalCache ownState nCache lBestNodes targetID
|
||||
-- as target ID falls between own ID and first predecessor, it is handled by this node
|
||||
| (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (headMay preds) = FOUND . toRemoteNodeState $ ownState
|
||||
| (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (getNid <$> headMay preds) = FOUND . toRemoteNodeState $ ownState
|
||||
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
|
||||
-- the closest succeeding node (like with the p initiated parallel queries
|
||||
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
|
||||
|
@ -264,8 +264,8 @@ respondLeave nsSTM msgSet = do
|
|||
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID
|
||||
writeTVar nsSTM $
|
||||
-- add predecessors and successors of leaving node to own lists
|
||||
setPredecessors (delete senderID $ requestPreds <> predecessors nsSnap)
|
||||
. setSuccessors (delete senderID $ requestSuccs <> successors nsSnap) $ nsSnap
|
||||
setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap)
|
||||
. setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
|
||||
-- TODO: handle handover of key data
|
||||
let leaveResponse = Response {
|
||||
requestID = requestID aRequestPart
|
||||
|
@ -337,7 +337,7 @@ respondJoin nsSTM msgSet = do
|
|||
then do
|
||||
-- if yes, adjust own predecessors/ successors and return those in a response
|
||||
let
|
||||
newPreds = getNid senderNS:predecessors nsSnap
|
||||
newPreds = senderNS:predecessors nsSnap
|
||||
joinedNS = setPredecessors newPreds nsSnap
|
||||
responsePayload = JoinResponsePayload {
|
||||
joinSuccessors = successors joinedNS
|
||||
|
@ -381,28 +381,28 @@ requestJoin toJoinOn ownStateSTM =
|
|||
(cacheInsertQ, joinedState) <- atomically $ do
|
||||
stateSnap <- readTVar ownStateSTM
|
||||
let
|
||||
(cacheInsertQ, joinedStateUnsorted) = foldl'
|
||||
(\(insertQ, nsAcc) msg ->
|
||||
(cacheInsertQ, predAccSet, succAccSet) = foldl'
|
||||
(\(insertQ, predAccSet', succAccSet') msg ->
|
||||
let
|
||||
insertQ' = maybe insertQ (\msgPl ->
|
||||
-- collect list of insertion statements into global cache
|
||||
queueAddEntries (joinCache msgPl) : insertQ
|
||||
) $ payload msg
|
||||
-- add received predecessors and successors
|
||||
addPreds ns' = maybe ns' (\msgPl ->
|
||||
setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns'
|
||||
) $ payload msg
|
||||
addSuccs ns' = maybe ns' (\msgPl ->
|
||||
setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns'
|
||||
) $ payload msg
|
||||
-- collect received predecessors and successors
|
||||
predAccSet'' = maybe predAccSet' (
|
||||
foldr' Set.insert predAccSet' . joinPredecessors
|
||||
) $ payload msg
|
||||
succAccSet'' = maybe succAccSet' (
|
||||
foldr' Set.insert succAccSet' . joinSuccessors
|
||||
) $ payload msg
|
||||
in
|
||||
(insertQ', addSuccs . addPreds $ nsAcc)
|
||||
(insertQ', predAccSet'', succAccSet'')
|
||||
)
|
||||
-- reset predecessors and successors
|
||||
([], setPredecessors [] . setSuccessors [] $ ownState)
|
||||
([], Set.empty, Set.empty)
|
||||
responses
|
||||
-- sort successors and predecessors
|
||||
newState = setSuccessors (take (kNeighbours joinedStateUnsorted) . sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (take (kNeighbours joinedStateUnsorted) . sortBy (flip localCompare) $ predecessors joinedStateUnsorted) $ joinedStateUnsorted
|
||||
-- sort, slice and set the accumulated successors and predecessors
|
||||
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
|
||||
writeTVar ownStateSTM newState
|
||||
pure (cacheInsertQ, newState)
|
||||
-- execute the cache insertions
|
||||
|
|
|
@ -32,6 +32,7 @@ module Hash2Pub.FediChordTypes (
|
|||
) where
|
||||
|
||||
import Control.Exception
|
||||
import Data.Function (on)
|
||||
import Data.List (delete, nub, sortBy)
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Maybe (fromMaybe, isJust, mapMaybe)
|
||||
|
@ -123,6 +124,9 @@ data RemoteNodeState = RemoteNodeState
|
|||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
instance Ord RemoteNodeState where
|
||||
a `compare` b = nid a `compare` nid b
|
||||
|
||||
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes
|
||||
data LocalNodeState = LocalNodeState
|
||||
{ nodeState :: RemoteNodeState
|
||||
|
@ -131,9 +135,9 @@ data LocalNodeState = LocalNodeState
|
|||
-- ^ EpiChord node cache with expiry times for nodes
|
||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||
, successors :: [NodeID] -- could be a set instead as these are ordered as well
|
||||
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
|
||||
-- ^ successor nodes in ascending order by distance
|
||||
, predecessors :: [NodeID]
|
||||
, predecessors :: [RemoteNodeState]
|
||||
-- ^ predecessor nodes in ascending order by distance
|
||||
, kNeighbours :: Int
|
||||
-- ^ desired length of predecessor and successor list
|
||||
|
@ -213,12 +217,12 @@ instance Typeable a => Show (TQueue a) where
|
|||
show x = show (typeOf x)
|
||||
|
||||
-- | convenience function that updates the successors of a 'LocalNodeState'
|
||||
setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState
|
||||
setSuccessors succ' ns = ns {successors = take (kNeighbours ns) . nub . sortBy localCompare $ succ'}
|
||||
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
|
||||
setSuccessors succ' ns = ns {successors = take (kNeighbours ns) . nub . sortBy (localCompare `on` getNid) $ succ'}
|
||||
|
||||
-- | convenience function that updates the predecessors of a 'LocalNodeState'
|
||||
setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState
|
||||
setPredecessors pred' ns = ns {predecessors = take (kNeighbours ns) . nub . sortBy (flip localCompare) $ pred'}
|
||||
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState
|
||||
setPredecessors pred' ns = ns {predecessors = take (kNeighbours ns) . nub . sortBy (flip (localCompare `on` getNid)) $ pred'}
|
||||
|
||||
type NodeCache = Map.Map NodeID CacheEntry
|
||||
|
||||
|
|
|
@ -53,8 +53,8 @@ data ActionPayload = QueryIDRequestPayload
|
|||
}
|
||||
| JoinRequestPayload
|
||||
| LeaveRequestPayload
|
||||
{ leaveSuccessors :: [NodeID]
|
||||
, leavePredecessors :: [NodeID]
|
||||
{ leaveSuccessors :: [RemoteNodeState]
|
||||
, leavePredecessors :: [RemoteNodeState]
|
||||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
|
@ -62,14 +62,14 @@ data ActionPayload = QueryIDRequestPayload
|
|||
{ queryResult :: QueryResponse
|
||||
}
|
||||
| JoinResponsePayload
|
||||
{ joinSuccessors :: [NodeID]
|
||||
, joinPredecessors :: [NodeID]
|
||||
{ joinSuccessors :: [RemoteNodeState]
|
||||
, joinPredecessors :: [RemoteNodeState]
|
||||
, joinCache :: [RemoteCacheEntry]
|
||||
}
|
||||
| LeaveResponsePayload
|
||||
| StabiliseResponsePayload
|
||||
{ stabiliseSuccessors :: [NodeID]
|
||||
, stabilisePredecessors :: [NodeID]
|
||||
{ stabiliseSuccessors :: [RemoteNodeState]
|
||||
, stabilisePredecessors :: [RemoteNodeState]
|
||||
}
|
||||
| PingResponsePayload
|
||||
{ pingNodeStates :: [RemoteNodeState]
|
||||
|
|
|
@ -119,7 +119,7 @@ spec = do
|
|||
let
|
||||
emptyCache = initCache
|
||||
nid1 = toNodeID 2^(23::Integer)+1
|
||||
node1 = setPredecessors [nid4] . setNid nid1 <$> exampleLocalNode
|
||||
node1 = setPredecessors [node4] . setNid nid1 <$> exampleLocalNode
|
||||
nid2 = toNodeID 2^(230::Integer)+12
|
||||
node2 = exampleNodeState { nid = nid2}
|
||||
nid3 = toNodeID 2^(25::Integer)+10
|
||||
|
@ -152,15 +152,15 @@ spec = do
|
|||
describe "Messages can be encoded to and decoded from ASN.1" $ do
|
||||
-- define test messages
|
||||
let
|
||||
someNodeIDs = fmap fromInteger [3..12]
|
||||
someNodes = fmap (flip setNid exampleNodeState . fromInteger) [3..12]
|
||||
qidReqPayload = QueryIDRequestPayload {
|
||||
queryTargetID = nid exampleNodeState
|
||||
, queryLBestNodes = 3
|
||||
}
|
||||
jReqPayload = JoinRequestPayload
|
||||
lReqPayload = LeaveRequestPayload {
|
||||
leaveSuccessors = someNodeIDs
|
||||
, leavePredecessors = someNodeIDs
|
||||
leaveSuccessors = someNodes
|
||||
, leavePredecessors = someNodes
|
||||
}
|
||||
stabReqPayload = StabiliseRequestPayload
|
||||
pingReqPayload = PingRequestPayload
|
||||
|
@ -174,8 +174,8 @@ spec = do
|
|||
]
|
||||
}
|
||||
jResPayload = JoinResponsePayload {
|
||||
joinSuccessors = someNodeIDs
|
||||
, joinPredecessors = someNodeIDs
|
||||
joinSuccessors = someNodes
|
||||
, joinPredecessors = someNodes
|
||||
, joinCache = [
|
||||
RemoteCacheEntry exampleNodeState (toEnum 23420001)
|
||||
, RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0)
|
||||
|
@ -183,7 +183,7 @@ spec = do
|
|||
}
|
||||
lResPayload = LeaveResponsePayload
|
||||
stabResPayload = StabiliseResponsePayload {
|
||||
stabiliseSuccessors = someNodeIDs
|
||||
stabiliseSuccessors = someNodes
|
||||
, stabilisePredecessors = []
|
||||
}
|
||||
pingResPayload = PingResponsePayload {
|
||||
|
@ -213,8 +213,8 @@ spec = do
|
|||
|
||||
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
|
||||
largeMessage = responseWith Join $ JoinResponsePayload {
|
||||
joinSuccessors = fromInteger <$> [-20..150]
|
||||
, joinPredecessors = fromInteger <$> [5..11]
|
||||
joinSuccessors = flip setNid exampleNodeState . fromInteger <$> [-20..150]
|
||||
, joinPredecessors = flip setNid exampleNodeState . fromInteger <$> [5..11]
|
||||
, joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]]
|
||||
}
|
||||
|
||||
|
@ -235,7 +235,7 @@ spec = do
|
|||
it "messages too large for a single packet can (often) be split into multiple parts" $ do
|
||||
-- TODO: once splitting works more efficient, test for exact number or payload, see #18
|
||||
length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True
|
||||
length (serialiseMessage 6000 largeMessage) `shouldBe` 1
|
||||
length (serialiseMessage 60000 largeMessage) `shouldBe` 1
|
||||
it "message part numbering starts at the submitted part number" $ do
|
||||
isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True
|
||||
let startAt5 = serialiseMessage 600 (largeMessage {part = 5})
|
||||
|
|
Loading…
Reference in a new issue