From fce5ff9153ba0efc7494ca915626d93e69e83c33 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 18 Aug 2020 00:17:13 +0200 Subject: [PATCH] implement service data migration for stabilise --- src/Hash2Pub/DHTProtocol.hs | 5 ++--- src/Hash2Pub/FediChord.hs | 30 ++++++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 972059f..bd7953f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -381,7 +381,7 @@ respondLeave nsSTM msgSet = do pure leaveResponse -- if awaiting an incoming service data migration, collect the lock without blocking this thread when (maybe False leaveDoMigration (payload aRequestPart)) $ do - ownService <- atomically $ nodeService <$> ((readTVar nsSTM) >>= (readTVar . parentRealNode)) + ownService <- atomically $ nodeService <$> (readTVar nsSTM >>= (readTVar . parentRealNode)) void (forkIO $ waitForMigrationFrom ownService leaveSenderID) pure $ serialiseMessage sendMessageSize responseMsg @@ -425,8 +425,7 @@ respondPing nsSTM msgSet = do } pure $ serialiseMessage sendMessageSize pingResponse --- this modifies node state, so locking and IO seems to be necessary. --- Still try to keep as much code as possible pure + respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do -- atomically read and modify the node state according to the parsed request diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 399ddfd..15563de 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -465,9 +465,9 @@ checkCacheSliceInvariants ns -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. -stabiliseThread :: LocalNodeStateSTM s -> IO () +stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () stabiliseThread nsSTM = forever $ do - ns <- readTVarIO nsSTM + oldNs <- readTVarIO nsSTM putStrLn "stabilise run: begin" @@ -478,8 +478,8 @@ stabiliseThread nsSTM = forever $ do -- don't contact all neighbours unless the previous one failed/ Left ed - predStabilise <- stabiliseClosestResponder ns predecessors 1 [] - succStabilise <- stabiliseClosestResponder ns predecessors 1 [] + predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] let (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise @@ -518,6 +518,28 @@ stabiliseThread nsSTM = forever $ do writeTVar nsSTM $ addSuccessors [nextEntry] latestNs ) + newNs <- readTVarIO nsSTM + + let + oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs + newPredecessor = headMay $ predecessors newNs + -- manage need for service data migration: + maybe (pure ()) (\newPredecessor' -> + when ( + isJust newPredecessor + && oldPredecessor /= newPredecessor' + -- case: predecessor has changed in some way => own responsibility has changed in some way + -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data + -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy + -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it + && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) + migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') + -- TODO: deal with migration failure, e.g retry + pure () + ) + newPredecessor + putStrLn "stabilise run: end" -- TODO: make delay configurable threadDelay (60 * 10^6)