Compare commits

..

No commits in common. "6982a0b245ebcae0ab4b9c4f8b7cad5b0061a254" and "c49c1a89c9c5774a810f31c963bb54221901682a" have entirely different histories.

6 changed files with 18 additions and 26 deletions

View file

@ -89,8 +89,8 @@ StabiliseResponsePayload ::= SEQUENCE {
LeaveRequestPayload ::= SEQUENCE { LeaveRequestPayload ::= SEQUENCE {
successors SEQUENCE OF NodeState, successors SEQUENCE OF NodeState,
predecessors SEQUENCE OF NodeState, predecessors SEQUENCE OF NodeState
doMigration BOOLEAN -- ToDo: transfer of own data to newly responsible node
} }
LeaveResponsePayload ::= NULL -- just a confirmation LeaveResponsePayload ::= NULL -- just a confirmation

View file

@ -38,7 +38,6 @@ splitPayload numParts pl@LeaveRequestPayload{} = [
LeaveRequestPayload { LeaveRequestPayload {
leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1) leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1)
, leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1) , leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1)
, leaveDoMigration = leaveDoMigration pl
} | thisPart <- [1..numParts] ] } | thisPart <- [1..numParts] ]
splitPayload numParts pl@StabiliseResponsePayload{} = [ splitPayload numParts pl@StabiliseResponsePayload{} = [
StabiliseResponsePayload { StabiliseResponsePayload {
@ -135,8 +134,9 @@ encodePayload payload'@LeaveRequestPayload{} =
<> [End Sequence <> [End Sequence
, Start Sequence] , Start Sequence]
<> concatMap encodeNodeState (leavePredecessors payload') <> concatMap encodeNodeState (leavePredecessors payload')
<> [End Sequence] <> [End Sequence
<> [Boolean (leaveDoMigration payload'), End Sequence] , End Sequence]
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
encodePayload payload'@StabiliseResponsePayload{} = encodePayload payload'@StabiliseResponsePayload{} =
Start Sequence Start Sequence
: Start Sequence : Start Sequence
@ -144,7 +144,8 @@ encodePayload payload'@StabiliseResponsePayload{} =
<> [End Sequence <> [End Sequence
, Start Sequence] , Start Sequence]
<> concatMap encodeNodeState (stabilisePredecessors payload') <> concatMap encodeNodeState (stabilisePredecessors payload')
<> [End Sequence, End Sequence] <> [End Sequence
, End Sequence]
encodePayload payload'@StabiliseRequestPayload = [Null] encodePayload payload'@StabiliseRequestPayload = [Null]
encodePayload payload'@QueryIDResponsePayload{} = encodePayload payload'@QueryIDResponsePayload{} =
let let
@ -414,11 +415,9 @@ parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do parseLeaveRequest = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState) succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState)
doMigration <- parseBool
pure $ LeaveRequestPayload { pure $ LeaveRequestPayload {
leaveSuccessors = succ' leaveSuccessors = succ'
, leavePredecessors = pred' , leavePredecessors = pred'
, leaveDoMigration = doMigration
} }
parseLeaveResponse :: ParseASN1 ActionPayload parseLeaveResponse :: ParseASN1 ActionPayload

View file

@ -48,7 +48,7 @@ import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when) import Control.Monad (foldM, forM, forM_, when)
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldl', foldr') import Data.Foldable (foldl', foldr')
@ -352,7 +352,8 @@ respondQueryID nsSTM msgSet = do
-- | Respond to a Leave request by removing the leaving node from local data structures -- | Respond to a Leave request by removing the leaving node from local data structures
-- and confirming with response. -- and confirming with response.
respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) -- TODO: copy over key data from leaver and confirm
respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave nsSTM msgSet = do respondLeave nsSTM msgSet = do
-- combine payload of all parts -- combine payload of all parts
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) -> let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
@ -370,6 +371,7 @@ respondLeave nsSTM msgSet = do
-- add predecessors and successors of leaving node to own lists -- add predecessors and successors of leaving node to own lists
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap) setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap . setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
-- TODO: handle handover of key data
let leaveResponse = Response { let leaveResponse = Response {
requestID = requestID aRequestPart requestID = requestID aRequestPart
, senderID = getNid nsSnap , senderID = getNid nsSnap
@ -379,10 +381,6 @@ respondLeave nsSTM msgSet = do
, payload = Just LeaveResponsePayload , payload = Just LeaveResponsePayload
} }
pure leaveResponse pure leaveResponse
-- if awaiting an incoming service data migration, collect the lock without blocking this thread
when (maybe False leaveDoMigration (payload aRequestPart)) $ do
ownService <- atomically $ nodeService <$> ((readTVar nsSTM) >>= (readTVar . parentRealNode))
void (forkIO $ waitForMigrationFrom ownService leaveSenderID)
pure $ serialiseMessage sendMessageSize responseMsg pure $ serialiseMessage sendMessageSize responseMsg
-- | respond to stabilise requests by returning successor and predecessor list -- | respond to stabilise requests by returning successor and predecessor list
@ -665,15 +663,13 @@ requestStabilise ns neighbour = do
-- Service data transfer needs to be done separately, as not all neighbours -- Service data transfer needs to be done separately, as not all neighbours
-- that need to know about the leaving handle the new service data. -- that need to know about the leaving handle the new service data.
requestLeave :: LocalNodeState s requestLeave :: LocalNodeState s
-> Bool -- whether to migrate service data
-> RemoteNodeState -- target node -> RemoteNodeState -- target node
-> IO (Either String ()) -- error or success -> IO (Either String ()) -- error or success
requestLeave ns doMigration target = do requestLeave ns target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let leavePayload = LeaveRequestPayload { let leavePayload = LeaveRequestPayload {
leaveSuccessors = successors ns leaveSuccessors = successors ns
, leavePredecessors = predecessors ns , leavePredecessors = predecessors ns
, leaveDoMigration = doMigration
} }
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid -> responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
Request { Request {

View file

@ -276,12 +276,12 @@ fediChordVserverLeave ns = do
-- former could be worked around -- former could be worked around
-- send a leave message to all neighbours -- send a leave message to all neighbours
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns False forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns
where where
sendUntilSuccess i = maybe sendUntilSuccess i = maybe
(pure $ Left "Exhausted all successors") (pure $ Left "Exhausted all successors")
(\neighb -> do (\neighb -> do
leaveResponse <- requestLeave ns True neighb leaveResponse <- requestLeave ns neighb
case leaveResponse of case leaveResponse of
Left _ -> sendUntilSuccess (i+1) Left _ -> sendUntilSuccess (i+1)
-- return first successfully contacted neighbour, -- return first successfully contacted neighbour,

View file

@ -232,12 +232,10 @@ subscriptionDelivery serv senderID subList = do
:: STM (Either String ())) :: STM (Either String ()))
-- TODO: should this always signal migration finished to avoid deadlocksP -- TODO: should this always signal migration finished to avoid deadlocksP
liftIO $ putMVar syncMVar () -- wakes up waiting thread liftIO $ putMVar syncMVar () -- wakes up waiting thread
-- allow response to be completed independently from waiting thread liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed
_ <- liftIO . forkIO $ do -- delete this migration from ongoing ones
putMVar syncMVar () -- blocks until waiting thread has resumed liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
-- delete this migration from ongoing ones HMap.delete (fromInteger senderID)
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.delete (fromInteger senderID)
case res of case res of
Left err -> throwError err410 {errBody = BSUL.fromString err} Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure "" Right _ -> pure ""

View file

@ -55,7 +55,6 @@ data ActionPayload = QueryIDRequestPayload
| LeaveRequestPayload | LeaveRequestPayload
{ leaveSuccessors :: [RemoteNodeState] { leaveSuccessors :: [RemoteNodeState]
, leavePredecessors :: [RemoteNodeState] , leavePredecessors :: [RemoteNodeState]
, leaveDoMigration :: Bool
} }
| StabiliseRequestPayload | StabiliseRequestPayload
| PingRequestPayload | PingRequestPayload