From c49c1a89c9c5774a810f31c963bb54221901682a Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 17 Aug 2020 00:22:37 +0200 Subject: [PATCH] wait for migration to complete on join also clean up migration entry after success --- src/Hash2Pub/DHTProtocol.hs | 13 ++++++++----- src/Hash2Pub/FediChord.hs | 18 ++++++++++-------- src/Hash2Pub/PostService.hs | 6 +++++- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 52ea5ba..13dd434 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -482,7 +482,7 @@ respondJoin nsSTM msgSet = do -- ....... request sending ....... -- | send a join request and return the joined 'LocalNodeState' including neighbours -requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted +requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted -> LocalNodeStateSTM s -- ^ joining NodeState -> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information requestJoin toJoinOn ownStateSTM = do @@ -521,12 +521,15 @@ requestJoin toJoinOn ownStateSTM = do pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ - pure $ if responses == Set.empty - then Left $ "join error: got no response from " <> show (getNid toJoinOn) + if responses == Set.empty + then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) else if null (predecessors joinedState) && null (successors joinedState) - then Left "join error: no predecessors or successors" + then pure $ Left "join error: no predecessors or successors" -- successful join - else Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid ownState) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 8d25186..f544061 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -152,9 +152,10 @@ nodeStateInit realNodeSTM = do -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- for resolving the new node's position. -fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a +fediChordBootstrapJoin :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordBootstrapJoin nsSTM bootstrapNode = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined @@ -170,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: LocalNodeStateSTM s -> IO () +convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () convergenceSampleThread nsSTM = forever $ do nsSnap <- readTVarIO nsSTM parentNode <- readTVarIO $ parentRealNode nsSnap @@ -201,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) +tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) tryBootstrapJoining nsSTM = do bss <- atomically $ do nsSnap <- readTVar nsSTM @@ -249,8 +250,9 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a +fediChordVserverJoin :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the local 'NodeState' + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do ns <- readTVarIO nsSTM @@ -304,7 +306,7 @@ fediChordVserverLeave ns = do -- | Wait for new cache entries to appear and then try joining on them. -- Exits after successful joining. -joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO () +joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () joinOnNewEntriesThread nsSTM = loop where loop = do diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 548469e..c277327 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -231,7 +231,11 @@ subscriptionDelivery serv senderID subList = do -- TODO: potentially log this :: STM (Either String ())) -- TODO: should this always signal migration finished to avoid deadlocksP - liftIO $ putMVar syncMVar () + liftIO $ putMVar syncMVar () -- wakes up waiting thread + liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed + -- delete this migration from ongoing ones + liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $ + HMap.delete (fromInteger senderID) case res of Left err -> throwError err410 {errBody = BSUL.fromString err} Right _ -> pure ""