diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 2cc2d84..2d195e3 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays - ghc-options: -Wall + ghc-options: -Wall -Wpartial-fields diff --git a/app/Experiment.hs b/app/Experiment.hs index 51b8e88..deb4cae 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -1,16 +1,16 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE OverloadedStrings #-} module Main where -import System.Random -import Control.Concurrent -import Control.Monad (forM_) -import Control.Monad.State.Class -import Control.Monad.State.Strict (evalStateT) -import Control.Monad.IO.Class -import qualified Network.HTTP.Client as HTTP +import Control.Concurrent +import Control.Monad (forM_) +import Control.Monad.IO.Class +import Control.Monad.State.Class +import Control.Monad.State.Strict (evalStateT) +import qualified Network.HTTP.Client as HTTP +import System.Random -import Hash2Pub.PostService (clientPublishPost, Hashtag) +import Hash2Pub.PostService (Hashtag, clientPublishPost) -- placeholder post data definition diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index e22834a..fa5a54a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -49,9 +49,11 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) +import Control.Monad.Except (MonadError (..), runExceptT) +import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString as BS import Data.Either (rights) -import Data.Foldable (foldl', foldr') +import Data.Foldable (foldl', foldr', foldrM) import Data.Functor.Identity import Data.IP (IPv6, fromHostAddress6, toHostAddress6) @@ -514,28 +516,28 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap + -- the contacted node itself is a successor as well and, with few + -- nodes, can be a predecessor as well + newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else if null (predecessors joinedState) && null (successors joinedState) - then pure $ Left "join error: no predecessors or successors" - -- successful join - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid ownState) - pure $ Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid toJoinOn) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node +requestQueryID :: (MonadIO m, MonadError String m) + => LocalNodeState s -- ^ NodeState of the querying node -> NodeID -- ^ target key ID to look up - -> IO RemoteNodeState -- ^ the node responsible for handling that key + -> m RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes @@ -543,23 +545,23 @@ requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- TODO: deal with lookup failures requestQueryID ns targetID = do - firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns + firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns -- TODO: make maxAttempts configurable queryIdLookupLoop firstCacheSnapshot ns 50 targetID -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState +queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState -- return node itself as default fallback value against infinite recursion. -- TODO: consider using an Either instead of a default value -queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns +queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID -- FOUND can only be returned if targetID is owned by local node case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- getPOSIXTime + responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- liftIO getPOSIXTime -- check for a FOUND and return it case responseEntries of FOUND foundNode -> pure foundNode @@ -593,8 +595,10 @@ sendQueryIdMessages targetID ns lParam targets = do -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing now <- getPOSIXTime -- collect cache entries from all responses - foldM (\acc resp -> do - let entrySet = case queryResult <$> payload resp of + foldrM (\resp acc -> do + let + responseResult = queryResult <$> payload resp + entrySet = case responseResult of Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FORWARD resultset) -> resultset _ -> Set.empty @@ -604,10 +608,15 @@ sendQueryIdMessages targetID ns lParam targets = do -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result - isFound@FOUND{} -> isFound - FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet + FOUND{} -> acc + FORWARD accSet + | maybe False isFound responseResult -> fromJust responseResult + | otherwise -> FORWARD $ entrySet `Set.union` accSet ) (FORWARD Set.empty) responses + where + isFound FOUND{} = True + isFound _ = False -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i @@ -649,8 +658,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - now <- getPOSIXTime - maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -766,14 +774,14 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> IO () sendAndAck responseQueue sock' remainingSends = do - sendMany sock $ Map.elems remainingSends + sendMany sock' $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely recvLoop sock' responseQueue remainingSends Set.empty Nothing recvLoop :: Socket -> TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers - -> Maybe Integer -- ^ total number of response parts if already known + -> Maybe Integer -- ^ total number of response parts if already known -> IO () recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as @@ -786,10 +794,11 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newRemaining = Map.delete (part msg) remainingSends' newReceivedParts = Set.insert (part msg) receivedPartNums - if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts + if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts then pure () - else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts + else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts -- drop errors and invalid messages + Right Request{} -> pure () -- expecting a response, not a request Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts @@ -816,6 +825,18 @@ queueDeleteEntry :: NodeID -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete + +-- | enqueue the timestamp update and verification marking of an entry in the +-- global 'NodeCache'. +queueUpdateVerifieds :: Foldable c + => c NodeID + -> LocalNodeState s + -> IO () +queueUpdateVerifieds nIds ns = do + now <- getPOSIXTime + forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ + markCacheEntryAsVerified (Just now) nid' + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 54c5e9a..45d0bf9 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -166,6 +166,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do currentlyResponsible <- liftEither lookupResp liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node + liftIO $ putStrLn "send a bootstrap Join" joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult @@ -244,26 +245,24 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache resp - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure $ Right currentlyResponsible + currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure currentlyResponsible -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: Service s (RealNodeSTM s) +fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeStateSTM s -- ^ the local 'NodeState' - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a + -> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do - ns <- readTVarIO nsSTM + ns <- liftIO $ readTVarIO nsSTM -- 1. get routed to the currently responsible node currentlyResponsible <- requestQueryID ns $ getNid ns - putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - joinResult <- requestJoin currentlyResponsible nsSTM - case joinResult of - Left err -> pure . Left $ "Error joining on " <> err - Right joinedNS -> pure . Right $ joinedNS + joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM + liftEither joinResult fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do @@ -323,7 +322,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- fediChordVserverJoin nsSTM + joinResult <- runExceptT $ fediChordVserverJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -504,18 +503,26 @@ stabiliseThread nsSTM = forever $ do -- try looking up additional neighbours if list too short forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry ) forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry ) newNs <- readTVarIO nsSTM @@ -638,7 +645,7 @@ requestMapPurge :: MVar RequestMap -> IO () requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime - putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> + putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> now - ts < responsePurgeAge ) rMapState threadDelay $ round responsePurgeAge * 2 * 10^6 @@ -757,7 +764,7 @@ getKeyResponsibility nodeSTM lookupKey = do -- new entry. -- If no vserver is active in the DHT, 'Nothing' is returned. updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) -updateLookupCache nodeSTM lookupKey = do +updateLookupCache nodeSTM keyToLookup = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM let firstVs = headMay (vservers node) @@ -767,18 +774,25 @@ updateLookupCache nodeSTM lookupKey = do pure (node, lookupSource) maybe (do -- if no local node available, delete cache entry and return Nothing - atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey + atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup pure Nothing ) (\n -> do -- start a lookup from the node, update the cache with the lookup result and return it - newResponsible <- requestQueryID n lookupKey - let newEntry = (getDomain newResponsible, getServicePort newResponsible) - now <- getPOSIXTime - -- atomic update against lost updates - atomically $ modifyTVar' (lookupCacheSTM node) $ - Map.insert lookupKey (CacheEntry False newEntry now) - pure $ Just newEntry + -- TODO: better retry management, because having no vserver joined yet should + -- be treated differently than other reasons for not getting a result. + newResponsible <- runExceptT $ requestQueryID n keyToLookup + either + (const $ pure Nothing) + (\result -> do + let newEntry = (getDomain result, getServicePort result) + now <- getPOSIXTime + -- atomic update against lost updates + atomically $ modifyTVar' (lookupCacheSTM node) $ + Map.insert keyToLookup (CacheEntry False newEntry now) + pure $ Just newEntry + ) + newResponsible ) lookupSource diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index fe013a0..81cf552 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,37 +10,30 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async -import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar -import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever, when, void) -import Control.Monad.IO.Class (liftIO) -import Control.Monad.STM +import Control.Exception (Exception (..), try) +import Control.Monad (foldM, forM, forM_, forever, void, + when) +import Control.Monad.IO.Class (liftIO) import Data.Bifunctor -import qualified Data.ByteString.Lazy.UTF8 as BSUL -import qualified Data.ByteString.UTF8 as BSU -import qualified Data.HashMap.Strict as HMap -import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe, isJust) -import Data.String (fromString) -import qualified Data.Text.Lazy as Txt -import Data.Text.Normalize (NormalizationMode (NFC), - normalize) +import qualified Data.ByteString.Lazy.UTF8 as BSUL +import qualified Data.ByteString.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet +import Data.Maybe (fromMaybe, isJust) +import Data.String (fromString) +import qualified Data.Text.Lazy as Txt +import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX -import Data.Typeable (Typeable) -import qualified Network.HTTP.Client as HTTP -import qualified Network.HTTP.Types as HTTPT +import Data.Typeable (Typeable) +import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Types as HTTPT import System.Random -import Text.Read (readEither) +import Text.Read (readEither) -import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client -import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag intearval + -- collect tag interval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts