Compare commits
2 commits
fc8aa3e330
...
ab9d593a1b
Author | SHA1 | Date | |
---|---|---|---|
|
ab9d593a1b | ||
|
f1b15d5a9e |
|
@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md
|
|||
|
||||
common deps
|
||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
|
||||
ghc-options: -Wall
|
||||
ghc-options: -Wall -Wpartial-fields
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -516,19 +516,18 @@ requestJoin toJoinOn ownStateSTM = do
|
|||
([], Set.empty, Set.empty)
|
||||
responses
|
||||
-- sort, slice and set the accumulated successors and predecessors
|
||||
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
|
||||
-- the contacted node itself is a successor as well and, with few
|
||||
-- nodes, can be a predecessor as well
|
||||
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
|
||||
writeTVar ownStateSTM newState
|
||||
pure (cacheInsertQ, newState)
|
||||
-- execute the cache insertions
|
||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||
if responses == Set.empty
|
||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||
else if null (predecessors joinedState) && null (successors joinedState)
|
||||
then pure $ Left "join error: no predecessors or successors"
|
||||
-- successful join
|
||||
else do
|
||||
-- wait for migration data to be completely received
|
||||
waitForMigrationFrom (nodeService prn) (getNid ownState)
|
||||
waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
|
||||
pure $ Right ownStateSTM
|
||||
)
|
||||
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
|
@ -659,8 +658,7 @@ requestStabilise ns neighbour = do
|
|||
)
|
||||
([],[]) respSet
|
||||
-- update successfully responded neighbour in cache
|
||||
now <- getPOSIXTime
|
||||
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
|
||||
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
|
||||
pure $ if null responsePreds && null responseSuccs
|
||||
then Left "no neighbours returned"
|
||||
else Right (responsePreds, responseSuccs)
|
||||
|
@ -827,6 +825,18 @@ queueDeleteEntry :: NodeID
|
|||
-> IO ()
|
||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||
|
||||
|
||||
-- | enqueue the timestamp update and verification marking of an entry in the
|
||||
-- global 'NodeCache'.
|
||||
queueUpdateVerifieds :: Foldable c
|
||||
=> c NodeID
|
||||
-> LocalNodeState s
|
||||
-> IO ()
|
||||
queueUpdateVerifieds nIds ns = do
|
||||
now <- getPOSIXTime
|
||||
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
|
||||
markCacheEntryAsVerified (Just now) nid'
|
||||
|
||||
-- | retry an IO action at most *i* times until it delivers a result
|
||||
attempts :: Int -- ^ number of retries *i*
|
||||
-> IO (Maybe a) -- ^ action to retry
|
||||
|
|
|
@ -10,16 +10,10 @@ module Hash2Pub.PostService where
|
|||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.MVar
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TChan
|
||||
import Control.Concurrent.STM.TChan
|
||||
import Control.Concurrent.STM.TQueue
|
||||
import Control.Concurrent.STM.TVar
|
||||
import Control.Exception (Exception (..), try)
|
||||
import Control.Monad (foldM, forM, forM_, forever, when, void)
|
||||
import Control.Monad.IO.Class (liftIO)
|
||||
import Control.Monad.STM
|
||||
import Data.Bifunctor
|
||||
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
||||
import qualified Data.ByteString.UTF8 as BSU
|
||||
|
@ -40,7 +34,6 @@ import Text.Read (readEither)
|
|||
import qualified Network.Wai.Handler.Warp as Warp
|
||||
import Servant
|
||||
import Servant.Client
|
||||
import Servant.Server
|
||||
|
||||
import Hash2Pub.FediChordTypes
|
||||
import Hash2Pub.RingMap
|
||||
|
@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d
|
|||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||
-> IO (Either String ()) -- Either signals success or failure
|
||||
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
||||
-- collect tag intearval
|
||||
-- collect tag interval
|
||||
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||
-- extract subscribers and posts
|
||||
|
|
Loading…
Reference in a new issue