indicate in LeaveRequest whether to expect a migration
this information is used to decide whether to await an incoming migration in `respondLeave`
This commit is contained in:
parent
b8cedada48
commit
6982a0b245
|
@ -89,8 +89,8 @@ StabiliseResponsePayload ::= SEQUENCE {
|
|||
|
||||
LeaveRequestPayload ::= SEQUENCE {
|
||||
successors SEQUENCE OF NodeState,
|
||||
predecessors SEQUENCE OF NodeState
|
||||
-- ToDo: transfer of own data to newly responsible node
|
||||
predecessors SEQUENCE OF NodeState,
|
||||
doMigration BOOLEAN
|
||||
}
|
||||
|
||||
LeaveResponsePayload ::= NULL -- just a confirmation
|
||||
|
|
|
@ -38,6 +38,7 @@ splitPayload numParts pl@LeaveRequestPayload{} = [
|
|||
LeaveRequestPayload {
|
||||
leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1)
|
||||
, leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1)
|
||||
, leaveDoMigration = leaveDoMigration pl
|
||||
} | thisPart <- [1..numParts] ]
|
||||
splitPayload numParts pl@StabiliseResponsePayload{} = [
|
||||
StabiliseResponsePayload {
|
||||
|
@ -134,9 +135,8 @@ encodePayload payload'@LeaveRequestPayload{} =
|
|||
<> [End Sequence
|
||||
, Start Sequence]
|
||||
<> concatMap encodeNodeState (leavePredecessors payload')
|
||||
<> [End Sequence
|
||||
, End Sequence]
|
||||
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
|
||||
<> [End Sequence]
|
||||
<> [Boolean (leaveDoMigration payload'), End Sequence]
|
||||
encodePayload payload'@StabiliseResponsePayload{} =
|
||||
Start Sequence
|
||||
: Start Sequence
|
||||
|
@ -144,8 +144,7 @@ encodePayload payload'@StabiliseResponsePayload{} =
|
|||
<> [End Sequence
|
||||
, Start Sequence]
|
||||
<> concatMap encodeNodeState (stabilisePredecessors payload')
|
||||
<> [End Sequence
|
||||
, End Sequence]
|
||||
<> [End Sequence, End Sequence]
|
||||
encodePayload payload'@StabiliseRequestPayload = [Null]
|
||||
encodePayload payload'@QueryIDResponsePayload{} =
|
||||
let
|
||||
|
@ -415,9 +414,11 @@ parseLeaveRequest :: ParseASN1 ActionPayload
|
|||
parseLeaveRequest = onNextContainer Sequence $ do
|
||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||
doMigration <- parseBool
|
||||
pure $ LeaveRequestPayload {
|
||||
leaveSuccessors = succ'
|
||||
, leavePredecessors = pred'
|
||||
, leaveDoMigration = doMigration
|
||||
}
|
||||
|
||||
parseLeaveResponse :: ParseASN1 ActionPayload
|
||||
|
|
|
@ -48,7 +48,7 @@ import Control.Concurrent.STM.TBQueue
|
|||
import Control.Concurrent.STM.TQueue
|
||||
import Control.Concurrent.STM.TVar
|
||||
import Control.Exception
|
||||
import Control.Monad (foldM, forM, forM_, when)
|
||||
import Control.Monad (foldM, forM, forM_, void, when)
|
||||
import qualified Data.ByteString as BS
|
||||
import Data.Either (rights)
|
||||
import Data.Foldable (foldl', foldr')
|
||||
|
@ -352,8 +352,7 @@ respondQueryID nsSTM msgSet = do
|
|||
|
||||
-- | Respond to a Leave request by removing the leaving node from local data structures
|
||||
-- and confirming with response.
|
||||
-- TODO: copy over key data from leaver and confirm
|
||||
respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||
respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||
respondLeave nsSTM msgSet = do
|
||||
-- combine payload of all parts
|
||||
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
||||
|
@ -371,7 +370,6 @@ respondLeave nsSTM msgSet = do
|
|||
-- add predecessors and successors of leaving node to own lists
|
||||
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
|
||||
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
|
||||
-- TODO: handle handover of key data
|
||||
let leaveResponse = Response {
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
|
@ -381,6 +379,10 @@ respondLeave nsSTM msgSet = do
|
|||
, payload = Just LeaveResponsePayload
|
||||
}
|
||||
pure leaveResponse
|
||||
-- if awaiting an incoming service data migration, collect the lock without blocking this thread
|
||||
when (maybe False leaveDoMigration (payload aRequestPart)) $ do
|
||||
ownService <- atomically $ nodeService <$> ((readTVar nsSTM) >>= (readTVar . parentRealNode))
|
||||
void (forkIO $ waitForMigrationFrom ownService leaveSenderID)
|
||||
pure $ serialiseMessage sendMessageSize responseMsg
|
||||
|
||||
-- | respond to stabilise requests by returning successor and predecessor list
|
||||
|
@ -663,13 +665,15 @@ requestStabilise ns neighbour = do
|
|||
-- Service data transfer needs to be done separately, as not all neighbours
|
||||
-- that need to know about the leaving handle the new service data.
|
||||
requestLeave :: LocalNodeState s
|
||||
-> Bool -- whether to migrate service data
|
||||
-> RemoteNodeState -- target node
|
||||
-> IO (Either String ()) -- error or success
|
||||
requestLeave ns target = do
|
||||
requestLeave ns doMigration target = do
|
||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||
let leavePayload = LeaveRequestPayload {
|
||||
leaveSuccessors = successors ns
|
||||
, leavePredecessors = predecessors ns
|
||||
, leaveDoMigration = doMigration
|
||||
}
|
||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
|
||||
Request {
|
||||
|
|
|
@ -276,12 +276,12 @@ fediChordVserverLeave ns = do
|
|||
-- former could be worked around
|
||||
|
||||
-- send a leave message to all neighbours
|
||||
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns
|
||||
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns False
|
||||
where
|
||||
sendUntilSuccess i = maybe
|
||||
(pure $ Left "Exhausted all successors")
|
||||
(\neighb -> do
|
||||
leaveResponse <- requestLeave ns neighb
|
||||
leaveResponse <- requestLeave ns True neighb
|
||||
case leaveResponse of
|
||||
Left _ -> sendUntilSuccess (i+1)
|
||||
-- return first successfully contacted neighbour,
|
||||
|
|
|
@ -55,6 +55,7 @@ data ActionPayload = QueryIDRequestPayload
|
|||
| LeaveRequestPayload
|
||||
{ leaveSuccessors :: [RemoteNodeState]
|
||||
, leavePredecessors :: [RemoteNodeState]
|
||||
, leaveDoMigration :: Bool
|
||||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
|
|
Loading…
Reference in a new issue