Compare commits

..

No commits in common. "f15d83baffe2870d432553c625b42a890d17cbce" and "4e359775ec3d42e43e7640cc8bd8de3305590559" have entirely different histories.

6 changed files with 58 additions and 62 deletions

View file

@ -62,8 +62,8 @@ NodeCache ::= SEQUENCE OF CacheEntry
JoinRequestPayload ::= NULL JoinRequestPayload ::= NULL
JoinResponsePayload ::= SEQUENCE { JoinResponsePayload ::= SEQUENCE {
successors SEQUENCE OF NodeState, successors SEQUENCE OF NodeID,
predecessors SEQUENCE OF NodeState, predecessors SEQUENCE OF NodeID,
cache NodeCache cache NodeCache
} }
@ -82,14 +82,14 @@ QueryIDResponsePayload ::= SEQUENCE {
StabiliseRequestPayload ::= NULL StabiliseRequestPayload ::= NULL
StabiliseResponsePayload ::= SEQUENCE { StabiliseResponsePayload ::= SEQUENCE {
successors SEQUENCE OF NodeState, successors SEQUENCE OF NodeID,
predecessors SEQUENCE OF NodeState predecessors SEQUENCE OF NodeID
-- ToDo: transfer of handled key data, if newly responsible for it -- ToDo: transfer of handled key data, if newly responsible for it
} }
LeaveRequestPayload ::= SEQUENCE { LeaveRequestPayload ::= SEQUENCE {
successors SEQUENCE OF NodeState, successors SEQUENCE OF NodeID,
predecessors SEQUENCE OF NodeState predecessors SEQUENCE OF NodeID
-- ToDo: transfer of own data to newly responsible node -- ToDo: transfer of own data to newly responsible node
} }

View file

@ -130,20 +130,20 @@ encodePayload LeaveResponsePayload = [Null]
encodePayload payload'@LeaveRequestPayload{} = encodePayload payload'@LeaveRequestPayload{} =
Start Sequence Start Sequence
: Start Sequence : Start Sequence
: concatMap encodeNodeState (leaveSuccessors payload') : fmap (IntVal . getNodeID) (leaveSuccessors payload')
<> [End Sequence <> [End Sequence
, Start Sequence] , Start Sequence]
<> concatMap encodeNodeState (leavePredecessors payload') <> fmap (IntVal . getNodeID) (leavePredecessors payload')
<> [End Sequence <> [End Sequence
, End Sequence] , End Sequence]
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal -- currently StabiliseResponsePayload and LeaveRequestPayload are equal
encodePayload payload'@StabiliseResponsePayload{} = encodePayload payload'@StabiliseResponsePayload{} =
Start Sequence Start Sequence
: Start Sequence : Start Sequence
: concatMap encodeNodeState (stabiliseSuccessors payload') : fmap (IntVal . getNodeID) (stabiliseSuccessors payload')
<> [End Sequence <> [End Sequence
, Start Sequence] , Start Sequence]
<> concatMap encodeNodeState (stabilisePredecessors payload') <> fmap (IntVal . getNodeID) (stabilisePredecessors payload')
<> [End Sequence <> [End Sequence
, End Sequence] , End Sequence]
encodePayload payload'@StabiliseRequestPayload = [Null] encodePayload payload'@StabiliseRequestPayload = [Null]
@ -170,10 +170,10 @@ encodePayload payload'@QueryIDRequestPayload{} = [
encodePayload payload'@JoinResponsePayload{} = encodePayload payload'@JoinResponsePayload{} =
Start Sequence Start Sequence
: Start Sequence : Start Sequence
: concatMap encodeNodeState (joinSuccessors payload') : fmap (IntVal . getNodeID) (joinSuccessors payload')
<> [End Sequence <> [End Sequence
, Start Sequence] , Start Sequence]
<> concatMap encodeNodeState (joinPredecessors payload') <> fmap (IntVal . getNodeID) (joinPredecessors payload')
<> [End Sequence <> [End Sequence
, Start Sequence] , Start Sequence]
<> concatMap encodeCacheEntry (joinCache payload') <> concatMap encodeCacheEntry (joinCache payload')
@ -368,8 +368,8 @@ parseJoinRequest = do
parseJoinResponse :: ParseASN1 ActionPayload parseJoinResponse :: ParseASN1 ActionPayload
parseJoinResponse = onNextContainer Sequence $ do parseJoinResponse = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
cache <- parseNodeCache cache <- parseNodeCache
pure $ JoinResponsePayload { pure $ JoinResponsePayload {
joinSuccessors = succ' joinSuccessors = succ'
@ -404,8 +404,8 @@ parseStabiliseRequest = do
parseStabiliseResponse :: ParseASN1 ActionPayload parseStabiliseResponse :: ParseASN1 ActionPayload
parseStabiliseResponse = onNextContainer Sequence $ do parseStabiliseResponse = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pure $ StabiliseResponsePayload { pure $ StabiliseResponsePayload {
stabiliseSuccessors = succ' stabiliseSuccessors = succ'
, stabilisePredecessors = pred' , stabilisePredecessors = pred'
@ -413,8 +413,8 @@ parseStabiliseResponse = onNextContainer Sequence $ do
parseLeaveRequest :: ParseASN1 ActionPayload parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do parseLeaveRequest = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pure $ LeaveRequestPayload { pure $ LeaveRequestPayload {
leaveSuccessors = succ' leaveSuccessors = succ'
, leavePredecessors = pred' , leavePredecessors = pred'

View file

@ -73,7 +73,7 @@ import Debug.Trace (trace)
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node -- as target ID falls between own ID and first predecessor, it is handled by this node
| (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (getNid <$> headMay preds) = FOUND . toRemoteNodeState $ ownState | (targetID `localCompare` ownID) `elem` [LT, EQ] && maybe False (\p -> targetID `localCompare` p == GT) (headMay preds) = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
@ -264,8 +264,8 @@ respondLeave nsSTM msgSet = do
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID
writeTVar nsSTM $ writeTVar nsSTM $
-- add predecessors and successors of leaving node to own lists -- add predecessors and successors of leaving node to own lists
setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap) setPredecessors (delete senderID $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap . setSuccessors (delete senderID $ requestSuccs <> successors nsSnap) $ nsSnap
-- TODO: handle handover of key data -- TODO: handle handover of key data
let leaveResponse = Response { let leaveResponse = Response {
requestID = requestID aRequestPart requestID = requestID aRequestPart
@ -337,7 +337,7 @@ respondJoin nsSTM msgSet = do
then do then do
-- if yes, adjust own predecessors/ successors and return those in a response -- if yes, adjust own predecessors/ successors and return those in a response
let let
newPreds = senderNS:predecessors nsSnap newPreds = getNid senderNS:predecessors nsSnap
joinedNS = setPredecessors newPreds nsSnap joinedNS = setPredecessors newPreds nsSnap
responsePayload = JoinResponsePayload { responsePayload = JoinResponsePayload {
joinSuccessors = successors joinedNS joinSuccessors = successors joinedNS
@ -381,28 +381,28 @@ requestJoin toJoinOn ownStateSTM =
(cacheInsertQ, joinedState) <- atomically $ do (cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM stateSnap <- readTVar ownStateSTM
let let
(cacheInsertQ, predAccSet, succAccSet) = foldl' (cacheInsertQ, joinedStateUnsorted) = foldl'
(\(insertQ, predAccSet', succAccSet') msg -> (\(insertQ, nsAcc) msg ->
let let
insertQ' = maybe insertQ (\msgPl -> insertQ' = maybe insertQ (\msgPl ->
-- collect list of insertion statements into global cache -- collect list of insertion statements into global cache
queueAddEntries (joinCache msgPl) : insertQ queueAddEntries (joinCache msgPl) : insertQ
) $ payload msg ) $ payload msg
-- collect received predecessors and successors -- add received predecessors and successors
predAccSet'' = maybe predAccSet' ( addPreds ns' = maybe ns' (\msgPl ->
foldr' Set.insert predAccSet' . joinPredecessors setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns'
) $ payload msg ) $ payload msg
succAccSet'' = maybe succAccSet' ( addSuccs ns' = maybe ns' (\msgPl ->
foldr' Set.insert succAccSet' . joinSuccessors setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns'
) $ payload msg ) $ payload msg
in in
(insertQ', predAccSet'', succAccSet'') (insertQ', addSuccs . addPreds $ nsAcc)
) )
-- reset predecessors and successors -- reset predecessors and successors
([], Set.empty, Set.empty) ([], setPredecessors [] . setSuccessors [] $ ownState)
responses responses
-- sort, slice and set the accumulated successors and predecessors -- sort successors and predecessors
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap newState = setSuccessors (take (kNeighbours joinedStateUnsorted) . sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (take (kNeighbours joinedStateUnsorted) . sortBy (flip localCompare) $ predecessors joinedStateUnsorted) $ joinedStateUnsorted
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions

View file

@ -32,7 +32,6 @@ module Hash2Pub.FediChordTypes (
) where ) where
import Control.Exception import Control.Exception
import Data.Function (on)
import Data.List (delete, nub, sortBy) import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust, mapMaybe) import Data.Maybe (fromMaybe, isJust, mapMaybe)
@ -124,9 +123,6 @@ data RemoteNodeState = RemoteNodeState
} }
deriving (Show, Eq) deriving (Show, Eq)
instance Ord RemoteNodeState where
a `compare` b = nid a `compare` nid b
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes -- | represents a node and encapsulates all data and parameters that are not present for remote nodes
data LocalNodeState = LocalNodeState data LocalNodeState = LocalNodeState
{ nodeState :: RemoteNodeState { nodeState :: RemoteNodeState
@ -135,9 +131,9 @@ data LocalNodeState = LocalNodeState
-- ^ EpiChord node cache with expiry times for nodes -- ^ EpiChord node cache with expiry times for nodes
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache) , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and -- ^ cache updates are not written directly to the 'nodeCache' but queued and
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well , successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance -- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState] , predecessors :: [NodeID]
-- ^ predecessor nodes in ascending order by distance -- ^ predecessor nodes in ascending order by distance
, kNeighbours :: Int , kNeighbours :: Int
-- ^ desired length of predecessor and successor list -- ^ desired length of predecessor and successor list
@ -217,12 +213,12 @@ instance Typeable a => Show (TQueue a) where
show x = show (typeOf x) show x = show (typeOf x)
-- | convenience function that updates the successors of a 'LocalNodeState' -- | convenience function that updates the successors of a 'LocalNodeState'
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState setSuccessors :: [NodeID] -> LocalNodeState -> LocalNodeState
setSuccessors succ' ns = ns {successors = take (kNeighbours ns) . nub . sortBy (localCompare `on` getNid) $ succ'} setSuccessors succ' ns = ns {successors = take (kNeighbours ns) . nub . sortBy localCompare $ succ'}
-- | convenience function that updates the predecessors of a 'LocalNodeState' -- | convenience function that updates the predecessors of a 'LocalNodeState'
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState setPredecessors :: [NodeID] -> LocalNodeState -> LocalNodeState
setPredecessors pred' ns = ns {predecessors = take (kNeighbours ns) . nub . sortBy (flip (localCompare `on` getNid)) $ pred'} setPredecessors pred' ns = ns {predecessors = take (kNeighbours ns) . nub . sortBy (flip localCompare) $ pred'}
type NodeCache = Map.Map NodeID CacheEntry type NodeCache = Map.Map NodeID CacheEntry

View file

@ -30,7 +30,7 @@ data FediChordMessage = Request
, payload :: Maybe ActionPayload , payload :: Maybe ActionPayload
} }
| Response | Response
{ requestID :: Integer { requestID :: Integer
, senderID :: NodeID , senderID :: NodeID
, part :: Integer , part :: Integer
, isFinalPart :: Bool , isFinalPart :: Bool
@ -53,8 +53,8 @@ data ActionPayload = QueryIDRequestPayload
} }
| JoinRequestPayload | JoinRequestPayload
| LeaveRequestPayload | LeaveRequestPayload
{ leaveSuccessors :: [RemoteNodeState] { leaveSuccessors :: [NodeID]
, leavePredecessors :: [RemoteNodeState] , leavePredecessors :: [NodeID]
} }
| StabiliseRequestPayload | StabiliseRequestPayload
| PingRequestPayload | PingRequestPayload
@ -62,14 +62,14 @@ data ActionPayload = QueryIDRequestPayload
{ queryResult :: QueryResponse { queryResult :: QueryResponse
} }
| JoinResponsePayload | JoinResponsePayload
{ joinSuccessors :: [RemoteNodeState] { joinSuccessors :: [NodeID]
, joinPredecessors :: [RemoteNodeState] , joinPredecessors :: [NodeID]
, joinCache :: [RemoteCacheEntry] , joinCache :: [RemoteCacheEntry]
} }
| LeaveResponsePayload | LeaveResponsePayload
| StabiliseResponsePayload | StabiliseResponsePayload
{ stabiliseSuccessors :: [RemoteNodeState] { stabiliseSuccessors :: [NodeID]
, stabilisePredecessors :: [RemoteNodeState] , stabilisePredecessors :: [NodeID]
} }
| PingResponsePayload | PingResponsePayload
{ pingNodeStates :: [RemoteNodeState] { pingNodeStates :: [RemoteNodeState]

View file

@ -119,7 +119,7 @@ spec = do
let let
emptyCache = initCache emptyCache = initCache
nid1 = toNodeID 2^(23::Integer)+1 nid1 = toNodeID 2^(23::Integer)+1
node1 = setPredecessors [node4] . setNid nid1 <$> exampleLocalNode node1 = setPredecessors [nid4] . setNid nid1 <$> exampleLocalNode
nid2 = toNodeID 2^(230::Integer)+12 nid2 = toNodeID 2^(230::Integer)+12
node2 = exampleNodeState { nid = nid2} node2 = exampleNodeState { nid = nid2}
nid3 = toNodeID 2^(25::Integer)+10 nid3 = toNodeID 2^(25::Integer)+10
@ -152,15 +152,15 @@ spec = do
describe "Messages can be encoded to and decoded from ASN.1" $ do describe "Messages can be encoded to and decoded from ASN.1" $ do
-- define test messages -- define test messages
let let
someNodes = fmap (flip setNid exampleNodeState . fromInteger) [3..12] someNodeIDs = fmap fromInteger [3..12]
qidReqPayload = QueryIDRequestPayload { qidReqPayload = QueryIDRequestPayload {
queryTargetID = nid exampleNodeState queryTargetID = nid exampleNodeState
, queryLBestNodes = 3 , queryLBestNodes = 3
} }
jReqPayload = JoinRequestPayload jReqPayload = JoinRequestPayload
lReqPayload = LeaveRequestPayload { lReqPayload = LeaveRequestPayload {
leaveSuccessors = someNodes leaveSuccessors = someNodeIDs
, leavePredecessors = someNodes , leavePredecessors = someNodeIDs
} }
stabReqPayload = StabiliseRequestPayload stabReqPayload = StabiliseRequestPayload
pingReqPayload = PingRequestPayload pingReqPayload = PingRequestPayload
@ -174,8 +174,8 @@ spec = do
] ]
} }
jResPayload = JoinResponsePayload { jResPayload = JoinResponsePayload {
joinSuccessors = someNodes joinSuccessors = someNodeIDs
, joinPredecessors = someNodes , joinPredecessors = someNodeIDs
, joinCache = [ , joinCache = [
RemoteCacheEntry exampleNodeState (toEnum 23420001) RemoteCacheEntry exampleNodeState (toEnum 23420001)
, RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0) , RemoteCacheEntry (exampleNodeState {nid = fromInteger (-5)}) (toEnum 0)
@ -183,7 +183,7 @@ spec = do
} }
lResPayload = LeaveResponsePayload lResPayload = LeaveResponsePayload
stabResPayload = StabiliseResponsePayload { stabResPayload = StabiliseResponsePayload {
stabiliseSuccessors = someNodes stabiliseSuccessors = someNodeIDs
, stabilisePredecessors = [] , stabilisePredecessors = []
} }
pingResPayload = PingResponsePayload { pingResPayload = PingResponsePayload {
@ -213,8 +213,8 @@ spec = do
encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg encodeDecodeAndCheck msg = runParseASN1 parseMessage (encodeMessage msg) `shouldBe` pure msg
largeMessage = responseWith Join $ JoinResponsePayload { largeMessage = responseWith Join $ JoinResponsePayload {
joinSuccessors = flip setNid exampleNodeState . fromInteger <$> [-20..150] joinSuccessors = fromInteger <$> [-20..150]
, joinPredecessors = flip setNid exampleNodeState . fromInteger <$> [5..11] , joinPredecessors = fromInteger <$> [5..11]
, joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]] , joinCache = [ RemoteCacheEntry (exampleNodeState {nid = node}) 290001 | node <- [50602,506011..60000]]
} }
@ -235,7 +235,7 @@ spec = do
it "messages too large for a single packet can (often) be split into multiple parts" $ do it "messages too large for a single packet can (often) be split into multiple parts" $ do
-- TODO: once splitting works more efficient, test for exact number or payload, see #18 -- TODO: once splitting works more efficient, test for exact number or payload, see #18
length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True length (serialiseMessage 600 largeMessage) > 1 `shouldBe` True
length (serialiseMessage 60000 largeMessage) `shouldBe` 1 length (serialiseMessage 6000 largeMessage) `shouldBe` 1
it "message part numbering starts at the submitted part number" $ do it "message part numbering starts at the submitted part number" $ do
isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True isJust (Map.lookup 1 (serialiseMessage 600 largeMessage)) `shouldBe` True
let startAt5 = serialiseMessage 600 (largeMessage {part = 5}) let startAt5 = serialiseMessage 600 (largeMessage {part = 5})