Compare commits

...

2 commits

Author SHA1 Message Date
Trolli Schmittlauch ab9d593a1b bugfix: fix wrong partial Response sender access
- replaces improper record field access of `sender`, only existing in a
  Request, by `senderID` of a Response
- fixes the resulting exception-crash
- adds new function that enqueues a verification mark and timestamp bump
  of an existing cache entry
2020-08-27 00:27:36 +02:00
Trolli Schmittlauch f1b15d5a9e bugfix: fix join by adding join node and waiting for it
- additionally to adding neighbours of join node, add the join node
  itself as a neighbour as well
- wait for migrations from the node
2020-08-26 17:43:32 +02:00
3 changed files with 22 additions and 19 deletions

View file

@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
ghc-options: -Wall ghc-options: -Wall -Wpartial-fields

View file

@ -516,20 +516,19 @@ requestJoin toJoinOn ownStateSTM = do
([], Set.empty, Set.empty) ([], Set.empty, Set.empty)
responses responses
-- sort, slice and set the accumulated successors and predecessors -- sort, slice and set the accumulated successors and predecessors
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap -- the contacted node itself is a successor as well and, with few
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState) else do
then pure $ Left "join error: no predecessors or successors" -- wait for migration data to be completely received
-- successful join waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
else do pure $ Right ownStateSTM
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
) )
`catch` (\e -> pure . Left $ displayException (e :: IOException)) `catch` (\e -> pure . Left $ displayException (e :: IOException))
@ -659,8 +658,7 @@ requestStabilise ns neighbour = do
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache -- update successfully responded neighbour in cache
now <- getPOSIXTime maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)
@ -827,6 +825,18 @@ queueDeleteEntry :: NodeID
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | enqueue the timestamp update and verification marking of an entry in the
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry

View file

@ -10,16 +10,10 @@ module Hash2Pub.PostService where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception (Exception (..), try) import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, when, void) import Control.Monad (foldM, forM, forM_, forever, when, void)
import Control.Monad.IO.Class (liftIO) import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import Data.Bifunctor import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
@ -40,7 +34,6 @@ import Text.Read (readEither)
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
import Servant.Client import Servant.Client
import Servant.Server
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap import Hash2Pub.RingMap
@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d
-> (String, Int) -- ^ hostname and port of instance to deliver to -> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure -> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag intearval -- collect tag interval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts -- extract subscribers and posts