Compare commits
No commits in common. "6982a0b245ebcae0ab4b9c4f8b7cad5b0061a254" and "c49c1a89c9c5774a810f31c963bb54221901682a" have entirely different histories.
6982a0b245
...
c49c1a89c9
|
@ -89,8 +89,8 @@ StabiliseResponsePayload ::= SEQUENCE {
|
||||||
|
|
||||||
LeaveRequestPayload ::= SEQUENCE {
|
LeaveRequestPayload ::= SEQUENCE {
|
||||||
successors SEQUENCE OF NodeState,
|
successors SEQUENCE OF NodeState,
|
||||||
predecessors SEQUENCE OF NodeState,
|
predecessors SEQUENCE OF NodeState
|
||||||
doMigration BOOLEAN
|
-- ToDo: transfer of own data to newly responsible node
|
||||||
}
|
}
|
||||||
|
|
||||||
LeaveResponsePayload ::= NULL -- just a confirmation
|
LeaveResponsePayload ::= NULL -- just a confirmation
|
||||||
|
|
|
@ -38,7 +38,6 @@ splitPayload numParts pl@LeaveRequestPayload{} = [
|
||||||
LeaveRequestPayload {
|
LeaveRequestPayload {
|
||||||
leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1)
|
leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1)
|
||||||
, leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1)
|
, leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1)
|
||||||
, leaveDoMigration = leaveDoMigration pl
|
|
||||||
} | thisPart <- [1..numParts] ]
|
} | thisPart <- [1..numParts] ]
|
||||||
splitPayload numParts pl@StabiliseResponsePayload{} = [
|
splitPayload numParts pl@StabiliseResponsePayload{} = [
|
||||||
StabiliseResponsePayload {
|
StabiliseResponsePayload {
|
||||||
|
@ -135,8 +134,9 @@ encodePayload payload'@LeaveRequestPayload{} =
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, Start Sequence]
|
, Start Sequence]
|
||||||
<> concatMap encodeNodeState (leavePredecessors payload')
|
<> concatMap encodeNodeState (leavePredecessors payload')
|
||||||
<> [End Sequence]
|
<> [End Sequence
|
||||||
<> [Boolean (leaveDoMigration payload'), End Sequence]
|
, End Sequence]
|
||||||
|
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
|
||||||
encodePayload payload'@StabiliseResponsePayload{} =
|
encodePayload payload'@StabiliseResponsePayload{} =
|
||||||
Start Sequence
|
Start Sequence
|
||||||
: Start Sequence
|
: Start Sequence
|
||||||
|
@ -144,7 +144,8 @@ encodePayload payload'@StabiliseResponsePayload{} =
|
||||||
<> [End Sequence
|
<> [End Sequence
|
||||||
, Start Sequence]
|
, Start Sequence]
|
||||||
<> concatMap encodeNodeState (stabilisePredecessors payload')
|
<> concatMap encodeNodeState (stabilisePredecessors payload')
|
||||||
<> [End Sequence, End Sequence]
|
<> [End Sequence
|
||||||
|
, End Sequence]
|
||||||
encodePayload payload'@StabiliseRequestPayload = [Null]
|
encodePayload payload'@StabiliseRequestPayload = [Null]
|
||||||
encodePayload payload'@QueryIDResponsePayload{} =
|
encodePayload payload'@QueryIDResponsePayload{} =
|
||||||
let
|
let
|
||||||
|
@ -414,11 +415,9 @@ parseLeaveRequest :: ParseASN1 ActionPayload
|
||||||
parseLeaveRequest = onNextContainer Sequence $ do
|
parseLeaveRequest = onNextContainer Sequence $ do
|
||||||
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
succ' <- onNextContainer Sequence (getMany parseNodeState)
|
||||||
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
pred' <- onNextContainer Sequence (getMany parseNodeState)
|
||||||
doMigration <- parseBool
|
|
||||||
pure $ LeaveRequestPayload {
|
pure $ LeaveRequestPayload {
|
||||||
leaveSuccessors = succ'
|
leaveSuccessors = succ'
|
||||||
, leavePredecessors = pred'
|
, leavePredecessors = pred'
|
||||||
, leaveDoMigration = doMigration
|
|
||||||
}
|
}
|
||||||
|
|
||||||
parseLeaveResponse :: ParseASN1 ActionPayload
|
parseLeaveResponse :: ParseASN1 ActionPayload
|
||||||
|
|
|
@ -48,7 +48,7 @@ import Control.Concurrent.STM.TBQueue
|
||||||
import Control.Concurrent.STM.TQueue
|
import Control.Concurrent.STM.TQueue
|
||||||
import Control.Concurrent.STM.TVar
|
import Control.Concurrent.STM.TVar
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Control.Monad (foldM, forM, forM_, void, when)
|
import Control.Monad (foldM, forM, forM_, when)
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import Data.Either (rights)
|
import Data.Either (rights)
|
||||||
import Data.Foldable (foldl', foldr')
|
import Data.Foldable (foldl', foldr')
|
||||||
|
@ -352,7 +352,8 @@ respondQueryID nsSTM msgSet = do
|
||||||
|
|
||||||
-- | Respond to a Leave request by removing the leaving node from local data structures
|
-- | Respond to a Leave request by removing the leaving node from local data structures
|
||||||
-- and confirming with response.
|
-- and confirming with response.
|
||||||
respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
-- TODO: copy over key data from leaver and confirm
|
||||||
|
respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||||
respondLeave nsSTM msgSet = do
|
respondLeave nsSTM msgSet = do
|
||||||
-- combine payload of all parts
|
-- combine payload of all parts
|
||||||
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
||||||
|
@ -370,6 +371,7 @@ respondLeave nsSTM msgSet = do
|
||||||
-- add predecessors and successors of leaving node to own lists
|
-- add predecessors and successors of leaving node to own lists
|
||||||
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
|
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
|
||||||
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
|
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
|
||||||
|
-- TODO: handle handover of key data
|
||||||
let leaveResponse = Response {
|
let leaveResponse = Response {
|
||||||
requestID = requestID aRequestPart
|
requestID = requestID aRequestPart
|
||||||
, senderID = getNid nsSnap
|
, senderID = getNid nsSnap
|
||||||
|
@ -379,10 +381,6 @@ respondLeave nsSTM msgSet = do
|
||||||
, payload = Just LeaveResponsePayload
|
, payload = Just LeaveResponsePayload
|
||||||
}
|
}
|
||||||
pure leaveResponse
|
pure leaveResponse
|
||||||
-- if awaiting an incoming service data migration, collect the lock without blocking this thread
|
|
||||||
when (maybe False leaveDoMigration (payload aRequestPart)) $ do
|
|
||||||
ownService <- atomically $ nodeService <$> ((readTVar nsSTM) >>= (readTVar . parentRealNode))
|
|
||||||
void (forkIO $ waitForMigrationFrom ownService leaveSenderID)
|
|
||||||
pure $ serialiseMessage sendMessageSize responseMsg
|
pure $ serialiseMessage sendMessageSize responseMsg
|
||||||
|
|
||||||
-- | respond to stabilise requests by returning successor and predecessor list
|
-- | respond to stabilise requests by returning successor and predecessor list
|
||||||
|
@ -665,15 +663,13 @@ requestStabilise ns neighbour = do
|
||||||
-- Service data transfer needs to be done separately, as not all neighbours
|
-- Service data transfer needs to be done separately, as not all neighbours
|
||||||
-- that need to know about the leaving handle the new service data.
|
-- that need to know about the leaving handle the new service data.
|
||||||
requestLeave :: LocalNodeState s
|
requestLeave :: LocalNodeState s
|
||||||
-> Bool -- whether to migrate service data
|
|
||||||
-> RemoteNodeState -- target node
|
-> RemoteNodeState -- target node
|
||||||
-> IO (Either String ()) -- error or success
|
-> IO (Either String ()) -- error or success
|
||||||
requestLeave ns doMigration target = do
|
requestLeave ns target = do
|
||||||
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
let leavePayload = LeaveRequestPayload {
|
let leavePayload = LeaveRequestPayload {
|
||||||
leaveSuccessors = successors ns
|
leaveSuccessors = successors ns
|
||||||
, leavePredecessors = predecessors ns
|
, leavePredecessors = predecessors ns
|
||||||
, leaveDoMigration = doMigration
|
|
||||||
}
|
}
|
||||||
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
|
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
|
||||||
Request {
|
Request {
|
||||||
|
|
|
@ -276,12 +276,12 @@ fediChordVserverLeave ns = do
|
||||||
-- former could be worked around
|
-- former could be worked around
|
||||||
|
|
||||||
-- send a leave message to all neighbours
|
-- send a leave message to all neighbours
|
||||||
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns False
|
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns
|
||||||
where
|
where
|
||||||
sendUntilSuccess i = maybe
|
sendUntilSuccess i = maybe
|
||||||
(pure $ Left "Exhausted all successors")
|
(pure $ Left "Exhausted all successors")
|
||||||
(\neighb -> do
|
(\neighb -> do
|
||||||
leaveResponse <- requestLeave ns True neighb
|
leaveResponse <- requestLeave ns neighb
|
||||||
case leaveResponse of
|
case leaveResponse of
|
||||||
Left _ -> sendUntilSuccess (i+1)
|
Left _ -> sendUntilSuccess (i+1)
|
||||||
-- return first successfully contacted neighbour,
|
-- return first successfully contacted neighbour,
|
||||||
|
|
|
@ -232,12 +232,10 @@ subscriptionDelivery serv senderID subList = do
|
||||||
:: STM (Either String ()))
|
:: STM (Either String ()))
|
||||||
-- TODO: should this always signal migration finished to avoid deadlocksP
|
-- TODO: should this always signal migration finished to avoid deadlocksP
|
||||||
liftIO $ putMVar syncMVar () -- wakes up waiting thread
|
liftIO $ putMVar syncMVar () -- wakes up waiting thread
|
||||||
-- allow response to be completed independently from waiting thread
|
liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed
|
||||||
_ <- liftIO . forkIO $ do
|
-- delete this migration from ongoing ones
|
||||||
putMVar syncMVar () -- blocks until waiting thread has resumed
|
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
|
||||||
-- delete this migration from ongoing ones
|
HMap.delete (fromInteger senderID)
|
||||||
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
|
|
||||||
HMap.delete (fromInteger senderID)
|
|
||||||
case res of
|
case res of
|
||||||
Left err -> throwError err410 {errBody = BSUL.fromString err}
|
Left err -> throwError err410 {errBody = BSUL.fromString err}
|
||||||
Right _ -> pure ""
|
Right _ -> pure ""
|
||||||
|
|
|
@ -55,7 +55,6 @@ data ActionPayload = QueryIDRequestPayload
|
||||||
| LeaveRequestPayload
|
| LeaveRequestPayload
|
||||||
{ leaveSuccessors :: [RemoteNodeState]
|
{ leaveSuccessors :: [RemoteNodeState]
|
||||||
, leavePredecessors :: [RemoteNodeState]
|
, leavePredecessors :: [RemoteNodeState]
|
||||||
, leaveDoMigration :: Bool
|
|
||||||
}
|
}
|
||||||
| StabiliseRequestPayload
|
| StabiliseRequestPayload
|
||||||
| PingRequestPayload
|
| PingRequestPayload
|
||||||
|
|
Loading…
Reference in a new issue