From f1b15d5a9e82c944092b65f5330e4835027539bf Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 26 Aug 2020 17:43:32 +0200 Subject: [PATCH 1/2] bugfix: fix join by adding join node and waiting for it - additionally to adding neighbours of join node, add the join node itself as a neighbour as well - wait for migrations from the node --- src/Hash2Pub/DHTProtocol.hs | 15 +++++++-------- src/Hash2Pub/PostService.hs | 9 +-------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index bc5d5e3..61fe7cd 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -516,20 +516,19 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap + -- the contacted node itself is a successor as well and, with few + -- nodes, can be a predecessor as well + newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else if null (predecessors joinedState) && null (successors joinedState) - then pure $ Left "join error: no predecessors or successors" - -- successful join - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid ownState) - pure $ Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid toJoinOn) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index fe013a0..92f784a 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,16 +10,10 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async -import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar import Control.Exception (Exception (..), try) import Control.Monad (foldM, forM, forM_, forever, when, void) import Control.Monad.IO.Class (liftIO) -import Control.Monad.STM import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU @@ -40,7 +34,6 @@ import Text.Read (readEither) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client -import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag intearval + -- collect tag interval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts From ab9d593a1bcf91a1d6626c18ab938bcef80bb986 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 27 Aug 2020 00:25:02 +0200 Subject: [PATCH 2/2] bugfix: fix wrong partial Response sender access - replaces improper record field access of `sender`, only existing in a Request, by `senderID` of a Response - fixes the resulting exception-crash - adds new function that enqueues a verification mark and timestamp bump of an existing cache entry --- Hash2Pub.cabal | 2 +- src/Hash2Pub/DHTProtocol.hs | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 2cc2d84..2d195e3 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays - ghc-options: -Wall + ghc-options: -Wall -Wpartial-fields diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 61fe7cd..45af727 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -658,8 +658,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - now <- getPOSIXTime - maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -826,6 +825,18 @@ queueDeleteEntry :: NodeID -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete + +-- | enqueue the timestamp update and verification marking of an entry in the +-- global 'NodeCache'. +queueUpdateVerifieds :: Foldable c + => c NodeID + -> LocalNodeState s + -> IO () +queueUpdateVerifieds nIds ns = do + now <- getPOSIXTime + forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ + markCacheEntryAsVerified (Just now) nid' + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry