wait for migration to complete on join

also clean up migration entry after success
This commit is contained in:
Trolli Schmittlauch 2020-08-17 00:22:37 +02:00
parent 414564705a
commit c49c1a89c9
3 changed files with 23 additions and 14 deletions

View file

@ -482,7 +482,7 @@ respondJoin nsSTM msgSet = do
-- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do
@ -521,12 +521,15 @@ requestJoin toJoinOn ownStateSTM = do
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
pure $ if responses == Set.empty
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState)
then Left "join error: no predecessors or successors"
then pure $ Left "join error: no predecessors or successors"
-- successful join
else Right ownStateSTM
else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
)
`catch` (\e -> pure . Left $ displayException (e :: IOException))

View file

@ -152,7 +152,8 @@ nodeStateInit realNodeSTM = do
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position.
fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
fediChordBootstrapJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
@ -170,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM s -> IO ()
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap
@ -201,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining nsSTM = do
bss <- atomically $ do
nsSnap <- readTVar nsSTM
@ -249,7 +250,8 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
-- | join a node to the DHT using the global node cache
-- node's position.
fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
fediChordVserverJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordVserverJoin nsSTM = do
@ -304,7 +306,7 @@ fediChordVserverLeave ns = do
-- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop
where
loop = do

View file

@ -231,7 +231,11 @@ subscriptionDelivery serv senderID subList = do
-- TODO: potentially log this
:: STM (Either String ()))
-- TODO: should this always signal migration finished to avoid deadlocksP
liftIO $ putMVar syncMVar ()
liftIO $ putMVar syncMVar () -- wakes up waiting thread
liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed
-- delete this migration from ongoing ones
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.delete (fromInteger senderID)
case res of
Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure ""