bugfix: fix join by adding join node and waiting for it
- additionally to adding neighbours of join node, add the join node itself as a neighbour as well - wait for migrations from the node
This commit is contained in:
parent
fc8aa3e330
commit
f1b15d5a9e
|
@ -516,19 +516,18 @@ requestJoin toJoinOn ownStateSTM = do
|
||||||
([], Set.empty, Set.empty)
|
([], Set.empty, Set.empty)
|
||||||
responses
|
responses
|
||||||
-- sort, slice and set the accumulated successors and predecessors
|
-- sort, slice and set the accumulated successors and predecessors
|
||||||
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
|
-- the contacted node itself is a successor as well and, with few
|
||||||
|
-- nodes, can be a predecessor as well
|
||||||
|
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
|
||||||
writeTVar ownStateSTM newState
|
writeTVar ownStateSTM newState
|
||||||
pure (cacheInsertQ, newState)
|
pure (cacheInsertQ, newState)
|
||||||
-- execute the cache insertions
|
-- execute the cache insertions
|
||||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||||
if responses == Set.empty
|
if responses == Set.empty
|
||||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||||
else if null (predecessors joinedState) && null (successors joinedState)
|
|
||||||
then pure $ Left "join error: no predecessors or successors"
|
|
||||||
-- successful join
|
|
||||||
else do
|
else do
|
||||||
-- wait for migration data to be completely received
|
-- wait for migration data to be completely received
|
||||||
waitForMigrationFrom (nodeService prn) (getNid ownState)
|
waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
|
||||||
pure $ Right ownStateSTM
|
pure $ Right ownStateSTM
|
||||||
)
|
)
|
||||||
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||||
|
|
|
@ -10,16 +10,10 @@ module Hash2Pub.PostService where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.MVar
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.STM.TChan
|
|
||||||
import Control.Concurrent.STM.TChan
|
|
||||||
import Control.Concurrent.STM.TQueue
|
|
||||||
import Control.Concurrent.STM.TVar
|
|
||||||
import Control.Exception (Exception (..), try)
|
import Control.Exception (Exception (..), try)
|
||||||
import Control.Monad (foldM, forM, forM_, forever, when, void)
|
import Control.Monad (foldM, forM, forM_, forever, when, void)
|
||||||
import Control.Monad.IO.Class (liftIO)
|
import Control.Monad.IO.Class (liftIO)
|
||||||
import Control.Monad.STM
|
|
||||||
import Data.Bifunctor
|
import Data.Bifunctor
|
||||||
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
|
@ -40,7 +34,6 @@ import Text.Read (readEither)
|
||||||
import qualified Network.Wai.Handler.Warp as Warp
|
import qualified Network.Wai.Handler.Warp as Warp
|
||||||
import Servant
|
import Servant
|
||||||
import Servant.Client
|
import Servant.Client
|
||||||
import Servant.Server
|
|
||||||
|
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.FediChordTypes
|
||||||
import Hash2Pub.RingMap
|
import Hash2Pub.RingMap
|
||||||
|
@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d
|
||||||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||||
-> IO (Either String ()) -- Either signals success or failure
|
-> IO (Either String ()) -- Either signals success or failure
|
||||||
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
||||||
-- collect tag intearval
|
-- collect tag interval
|
||||||
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
||||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||||
-- extract subscribers and posts
|
-- extract subscribers and posts
|
||||||
|
|
Loading…
Reference in a new issue