From f1b15d5a9e82c944092b65f5330e4835027539bf Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 26 Aug 2020 17:43:32 +0200 Subject: [PATCH] bugfix: fix join by adding join node and waiting for it - additionally to adding neighbours of join node, add the join node itself as a neighbour as well - wait for migrations from the node --- src/Hash2Pub/DHTProtocol.hs | 15 +++++++-------- src/Hash2Pub/PostService.hs | 9 +-------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index bc5d5e3..61fe7cd 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -516,20 +516,19 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap + -- the contacted node itself is a successor as well and, with few + -- nodes, can be a predecessor as well + newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else if null (predecessors joinedState) && null (successors joinedState) - then pure $ Left "join error: no predecessors or successors" - -- successful join - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid ownState) - pure $ Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid toJoinOn) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index fe013a0..92f784a 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,16 +10,10 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async -import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar import Control.Exception (Exception (..), try) import Control.Monad (foldM, forM, forM_, forever, when, void) import Control.Monad.IO.Class (liftIO) -import Control.Monad.STM import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU @@ -40,7 +34,6 @@ import Text.Read (readEither) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client -import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag intearval + -- collect tag interval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts