From b23201a49c1915ec573a443040cbd30e977f3cda Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 25 Aug 2020 12:51:33 +0200 Subject: [PATCH] Make key lookups fail after request exhaustion instead of providing default Returning the own node as a default does not make sense in all contexts: Especially for bootstrap joining this can be harmful, so signalling instead that the lookup failed makes distinguishing on a case by case basis possible. Also contributes to #57 --- src/Hash2Pub/DHTProtocol.hs | 17 +++++---- src/Hash2Pub/FediChord.hs | 72 ++++++++++++++++++++++--------------- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fac5a3f..1cce94d 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -49,6 +49,8 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) +import Control.Monad.IO.Class (MonadIO(..)) +import Control.Monad.Except (MonadError(..), runExceptT) import qualified Data.ByteString as BS import Data.Either (rights) import Data.Foldable (foldl', foldr') @@ -533,9 +535,10 @@ requestJoin toJoinOn ownStateSTM = do -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node +requestQueryID :: (MonadIO m, MonadError String m) + => LocalNodeState s -- ^ NodeState of the querying node -> NodeID -- ^ target key ID to look up - -> IO RemoteNodeState -- ^ the node responsible for handling that key + -> m RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes @@ -543,23 +546,23 @@ requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- TODO: deal with lookup failures requestQueryID ns targetID = do - firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns + firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns -- TODO: make maxAttempts configurable queryIdLookupLoop firstCacheSnapshot ns 50 targetID -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState +queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState -- return node itself as default fallback value against infinite recursion. -- TODO: consider using an Either instead of a default value -queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns +queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID -- FOUND can only be returned if targetID is owned by local node case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- getPOSIXTime + responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- liftIO getPOSIXTime -- check for a FOUND and return it case responseEntries of FOUND foundNode -> pure foundNode diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 54c5e9a..15cee10 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -166,6 +166,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do currentlyResponsible <- liftEither lookupResp liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node + liftIO $ putStrLn "send a bootstrap Join" joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult @@ -244,26 +245,24 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache resp - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure $ Right currentlyResponsible + currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure currentlyResponsible -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: Service s (RealNodeSTM s) +fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeStateSTM s -- ^ the local 'NodeState' - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a + -> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do - ns <- readTVarIO nsSTM + ns <- liftIO $ readTVarIO nsSTM -- 1. get routed to the currently responsible node currentlyResponsible <- requestQueryID ns $ getNid ns - putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - joinResult <- requestJoin currentlyResponsible nsSTM - case joinResult of - Left err -> pure . Left $ "Error joining on " <> err - Right joinedNS -> pure . Right $ joinedNS + joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM + liftEither joinResult fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do @@ -323,7 +322,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- fediChordVserverJoin nsSTM + joinResult <- runExceptT $ fediChordVserverJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -504,18 +503,26 @@ stabiliseThread nsSTM = forever $ do -- try looking up additional neighbours if list too short forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry ) forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry ) newNs <- readTVarIO nsSTM @@ -638,7 +645,7 @@ requestMapPurge :: MVar RequestMap -> IO () requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime - putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> + putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> now - ts < responsePurgeAge ) rMapState threadDelay $ round responsePurgeAge * 2 * 10^6 @@ -757,7 +764,7 @@ getKeyResponsibility nodeSTM lookupKey = do -- new entry. -- If no vserver is active in the DHT, 'Nothing' is returned. updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) -updateLookupCache nodeSTM lookupKey = do +updateLookupCache nodeSTM keyToLookup = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM let firstVs = headMay (vservers node) @@ -767,18 +774,25 @@ updateLookupCache nodeSTM lookupKey = do pure (node, lookupSource) maybe (do -- if no local node available, delete cache entry and return Nothing - atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey + atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup pure Nothing ) (\n -> do -- start a lookup from the node, update the cache with the lookup result and return it - newResponsible <- requestQueryID n lookupKey - let newEntry = (getDomain newResponsible, getServicePort newResponsible) - now <- getPOSIXTime - -- atomic update against lost updates - atomically $ modifyTVar' (lookupCacheSTM node) $ - Map.insert lookupKey (CacheEntry False newEntry now) - pure $ Just newEntry + -- TODO: better retry management, because having no vserver joined yet should + -- be treated differently than other reasons for not getting a result. + newResponsible <- runExceptT $ requestQueryID n keyToLookup + either + (const $ pure Nothing) + (\result -> do + let newEntry = (getDomain result, getServicePort result) + now <- getPOSIXTime + -- atomic update against lost updates + atomically $ modifyTVar' (lookupCacheSTM node) $ + Map.insert keyToLookup (CacheEntry False newEntry now) + pure $ Just newEntry + ) + newResponsible ) lookupSource