Compare commits
2 commits
fc8aa3e330
...
ab9d593a1b
Author | SHA1 | Date | |
---|---|---|---|
|
ab9d593a1b | ||
|
f1b15d5a9e |
|
@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md
|
||||||
|
|
||||||
common deps
|
common deps
|
||||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
|
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
|
||||||
ghc-options: -Wall
|
ghc-options: -Wall -Wpartial-fields
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -516,20 +516,19 @@ requestJoin toJoinOn ownStateSTM = do
|
||||||
([], Set.empty, Set.empty)
|
([], Set.empty, Set.empty)
|
||||||
responses
|
responses
|
||||||
-- sort, slice and set the accumulated successors and predecessors
|
-- sort, slice and set the accumulated successors and predecessors
|
||||||
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
|
-- the contacted node itself is a successor as well and, with few
|
||||||
|
-- nodes, can be a predecessor as well
|
||||||
|
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
|
||||||
writeTVar ownStateSTM newState
|
writeTVar ownStateSTM newState
|
||||||
pure (cacheInsertQ, newState)
|
pure (cacheInsertQ, newState)
|
||||||
-- execute the cache insertions
|
-- execute the cache insertions
|
||||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||||
if responses == Set.empty
|
if responses == Set.empty
|
||||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||||
else if null (predecessors joinedState) && null (successors joinedState)
|
else do
|
||||||
then pure $ Left "join error: no predecessors or successors"
|
-- wait for migration data to be completely received
|
||||||
-- successful join
|
waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
|
||||||
else do
|
pure $ Right ownStateSTM
|
||||||
-- wait for migration data to be completely received
|
|
||||||
waitForMigrationFrom (nodeService prn) (getNid ownState)
|
|
||||||
pure $ Right ownStateSTM
|
|
||||||
)
|
)
|
||||||
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||||
|
|
||||||
|
@ -659,8 +658,7 @@ requestStabilise ns neighbour = do
|
||||||
)
|
)
|
||||||
([],[]) respSet
|
([],[]) respSet
|
||||||
-- update successfully responded neighbour in cache
|
-- update successfully responded neighbour in cache
|
||||||
now <- getPOSIXTime
|
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
|
||||||
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
|
|
||||||
pure $ if null responsePreds && null responseSuccs
|
pure $ if null responsePreds && null responseSuccs
|
||||||
then Left "no neighbours returned"
|
then Left "no neighbours returned"
|
||||||
else Right (responsePreds, responseSuccs)
|
else Right (responsePreds, responseSuccs)
|
||||||
|
@ -827,6 +825,18 @@ queueDeleteEntry :: NodeID
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||||
|
|
||||||
|
|
||||||
|
-- | enqueue the timestamp update and verification marking of an entry in the
|
||||||
|
-- global 'NodeCache'.
|
||||||
|
queueUpdateVerifieds :: Foldable c
|
||||||
|
=> c NodeID
|
||||||
|
-> LocalNodeState s
|
||||||
|
-> IO ()
|
||||||
|
queueUpdateVerifieds nIds ns = do
|
||||||
|
now <- getPOSIXTime
|
||||||
|
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
|
||||||
|
markCacheEntryAsVerified (Just now) nid'
|
||||||
|
|
||||||
-- | retry an IO action at most *i* times until it delivers a result
|
-- | retry an IO action at most *i* times until it delivers a result
|
||||||
attempts :: Int -- ^ number of retries *i*
|
attempts :: Int -- ^ number of retries *i*
|
||||||
-> IO (Maybe a) -- ^ action to retry
|
-> IO (Maybe a) -- ^ action to retry
|
||||||
|
|
|
@ -10,16 +10,10 @@ module Hash2Pub.PostService where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.MVar
|
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.STM.TChan
|
|
||||||
import Control.Concurrent.STM.TChan
|
|
||||||
import Control.Concurrent.STM.TQueue
|
|
||||||
import Control.Concurrent.STM.TVar
|
|
||||||
import Control.Exception (Exception (..), try)
|
import Control.Exception (Exception (..), try)
|
||||||
import Control.Monad (foldM, forM, forM_, forever, when, void)
|
import Control.Monad (foldM, forM, forM_, forever, when, void)
|
||||||
import Control.Monad.IO.Class (liftIO)
|
import Control.Monad.IO.Class (liftIO)
|
||||||
import Control.Monad.STM
|
|
||||||
import Data.Bifunctor
|
import Data.Bifunctor
|
||||||
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
|
@ -40,7 +34,6 @@ import Text.Read (readEither)
|
||||||
import qualified Network.Wai.Handler.Warp as Warp
|
import qualified Network.Wai.Handler.Warp as Warp
|
||||||
import Servant
|
import Servant
|
||||||
import Servant.Client
|
import Servant.Client
|
||||||
import Servant.Server
|
|
||||||
|
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.FediChordTypes
|
||||||
import Hash2Pub.RingMap
|
import Hash2Pub.RingMap
|
||||||
|
@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d
|
||||||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||||
-> IO (Either String ()) -- Either signals success or failure
|
-> IO (Either String ()) -- Either signals success or failure
|
||||||
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
||||||
-- collect tag intearval
|
-- collect tag interval
|
||||||
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
||||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||||
-- extract subscribers and posts
|
-- extract subscribers and posts
|
||||||
|
|
Loading…
Reference in a new issue