diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 2d195e3..2cc2d84 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays - ghc-options: -Wall -Wpartial-fields + ghc-options: -Wall diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 45af727..bc5d5e3 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -516,19 +516,20 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - -- the contacted node itself is a successor as well and, with few - -- nodes, can be a predecessor as well - newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap + newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid toJoinOn) - pure $ Right ownStateSTM + else if null (predecessors joinedState) && null (successors joinedState) + then pure $ Left "join error: no predecessors or successors" + -- successful join + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid ownState) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) @@ -658,7 +659,8 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) + now <- getPOSIXTime + maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -825,18 +827,6 @@ queueDeleteEntry :: NodeID -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete - --- | enqueue the timestamp update and verification marking of an entry in the --- global 'NodeCache'. -queueUpdateVerifieds :: Foldable c - => c NodeID - -> LocalNodeState s - -> IO () -queueUpdateVerifieds nIds ns = do - now <- getPOSIXTime - forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ - markCacheEntryAsVerified (Just now) nid' - -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 92f784a..fe013a0 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,10 +10,16 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async +import Control.Concurrent.MVar import Control.Concurrent.STM +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TVar import Control.Exception (Exception (..), try) import Control.Monad (foldM, forM, forM_, forever, when, void) import Control.Monad.IO.Class (liftIO) +import Control.Monad.STM import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU @@ -34,6 +40,7 @@ import Text.Read (readEither) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client +import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -348,7 +355,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag interval + -- collect tag intearval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts