Compare commits
	
		
			2 commits
		
	
	
		
			fc8aa3e330
			...
			ab9d593a1b
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| ab9d593a1b | |||
| f1b15d5a9e | 
					 3 changed files with 22 additions and 19 deletions
				
			
		|  | @ -47,7 +47,7 @@ extra-source-files:  CHANGELOG.md | ||||||
| 
 | 
 | ||||||
| common deps | common deps | ||||||
|   build-depends:       base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays |   build-depends:       base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays | ||||||
|   ghc-options:         -Wall |   ghc-options:         -Wall -Wpartial-fields | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -516,20 +516,19 @@ requestJoin toJoinOn ownStateSTM = do | ||||||
|                     ([], Set.empty, Set.empty) |                     ([], Set.empty, Set.empty) | ||||||
|                     responses |                     responses | ||||||
|                 -- sort, slice and set the accumulated successors and predecessors |                 -- sort, slice and set the accumulated successors and predecessors | ||||||
|                 newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap |                 -- the contacted node itself is a successor as well and, with few  | ||||||
|  |                 -- nodes, can be a predecessor as well | ||||||
|  |                 newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap | ||||||
|             writeTVar ownStateSTM newState |             writeTVar ownStateSTM newState | ||||||
|             pure (cacheInsertQ, newState) |             pure (cacheInsertQ, newState) | ||||||
|         -- execute the cache insertions |         -- execute the cache insertions | ||||||
|         mapM_ (\f -> f joinedState) cacheInsertQ |         mapM_ (\f -> f joinedState) cacheInsertQ | ||||||
|         if responses == Set.empty |         if responses == Set.empty | ||||||
|                   then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) |                   then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) | ||||||
|            else if null (predecessors joinedState) && null (successors joinedState) |            else do | ||||||
|                 then pure $ Left "join error: no predecessors or successors" |                 -- wait for migration data to be completely received | ||||||
|                 -- successful join |                 waitForMigrationFrom (nodeService prn) (getNid toJoinOn) | ||||||
|                 else do |                 pure $ Right ownStateSTM | ||||||
|                     -- wait for migration data to be completely received |  | ||||||
|                     waitForMigrationFrom (nodeService prn) (getNid ownState) |  | ||||||
|                     pure $ Right ownStateSTM |  | ||||||
|                                    ) |                                    ) | ||||||
|         `catch` (\e -> pure . Left $ displayException (e :: IOException)) |         `catch` (\e -> pure . Left $ displayException (e :: IOException)) | ||||||
| 
 | 
 | ||||||
|  | @ -659,8 +658,7 @@ requestStabilise ns neighbour = do | ||||||
|                                                       ) |                                                       ) | ||||||
|                                                       ([],[]) respSet |                                                       ([],[]) respSet | ||||||
|             -- update successfully responded neighbour in cache |             -- update successfully responded neighbour in cache | ||||||
|             now <- getPOSIXTime |             maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) | ||||||
|             maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) |  | ||||||
|             pure $ if null responsePreds && null responseSuccs |             pure $ if null responsePreds && null responseSuccs | ||||||
|                       then Left "no neighbours returned" |                       then Left "no neighbours returned" | ||||||
|                       else Right (responsePreds, responseSuccs) |                       else Right (responsePreds, responseSuccs) | ||||||
|  | @ -827,6 +825,18 @@ queueDeleteEntry :: NodeID | ||||||
|                  -> IO () |                  -> IO () | ||||||
| queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | -- | enqueue the timestamp update and verification marking of an entry in the | ||||||
|  | -- global 'NodeCache'. | ||||||
|  | queueUpdateVerifieds :: Foldable c | ||||||
|  |                      => c NodeID | ||||||
|  |                      -> LocalNodeState s | ||||||
|  |                      -> IO () | ||||||
|  | queueUpdateVerifieds nIds ns = do | ||||||
|  |     now <- getPOSIXTime | ||||||
|  |     forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ | ||||||
|  |         markCacheEntryAsVerified (Just now) nid' | ||||||
|  | 
 | ||||||
| -- | retry an IO action at most *i* times until it delivers a result | -- | retry an IO action at most *i* times until it delivers a result | ||||||
| attempts :: Int             -- ^ number of retries *i* | attempts :: Int             -- ^ number of retries *i* | ||||||
|          -> IO (Maybe a)    -- ^ action to retry |          -> IO (Maybe a)    -- ^ action to retry | ||||||
|  |  | ||||||
|  | @ -10,16 +10,10 @@ module Hash2Pub.PostService where | ||||||
| 
 | 
 | ||||||
| import           Control.Concurrent | import           Control.Concurrent | ||||||
| import           Control.Concurrent.Async | import           Control.Concurrent.Async | ||||||
| import           Control.Concurrent.MVar |  | ||||||
| import           Control.Concurrent.STM | import           Control.Concurrent.STM | ||||||
| import           Control.Concurrent.STM.TChan |  | ||||||
| import           Control.Concurrent.STM.TChan |  | ||||||
| import           Control.Concurrent.STM.TQueue |  | ||||||
| import           Control.Concurrent.STM.TVar |  | ||||||
| import           Control.Exception             (Exception (..), try) | import           Control.Exception             (Exception (..), try) | ||||||
| import           Control.Monad                 (foldM, forM, forM_, forever, when, void) | import           Control.Monad                 (foldM, forM, forM_, forever, when, void) | ||||||
| import           Control.Monad.IO.Class        (liftIO) | import           Control.Monad.IO.Class        (liftIO) | ||||||
| import           Control.Monad.STM |  | ||||||
| import           Data.Bifunctor | import           Data.Bifunctor | ||||||
| import qualified Data.ByteString.Lazy.UTF8     as BSUL | import qualified Data.ByteString.Lazy.UTF8     as BSUL | ||||||
| import qualified Data.ByteString.UTF8          as BSU | import qualified Data.ByteString.UTF8          as BSU | ||||||
|  | @ -40,7 +34,6 @@ import           Text.Read                     (readEither) | ||||||
| import qualified Network.Wai.Handler.Warp      as Warp | import qualified Network.Wai.Handler.Warp      as Warp | ||||||
| import           Servant | import           Servant | ||||||
| import           Servant.Client | import           Servant.Client | ||||||
| import           Servant.Server |  | ||||||
| 
 | 
 | ||||||
| import           Hash2Pub.FediChordTypes | import           Hash2Pub.FediChordTypes | ||||||
| import           Hash2Pub.RingMap | import           Hash2Pub.RingMap | ||||||
|  | @ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d | ||||||
|                            -> (String, Int)     -- ^ hostname and port of instance to deliver to |                            -> (String, Int)     -- ^ hostname and port of instance to deliver to | ||||||
|                            -> IO (Either String ())     -- Either signals success or failure |                            -> IO (Either String ())     -- Either signals success or failure | ||||||
| clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do | clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do | ||||||
|     -- collect tag intearval |     -- collect tag interval | ||||||
|     intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) |     intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) | ||||||
|     -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] |     -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] | ||||||
|     -- extract subscribers and posts |     -- extract subscribers and posts | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue