Make key lookups fail after request exhaustion instead of providing default
Returning the own node as a default does not make sense in all contexts: Especially for bootstrap joining this can be harmful, so signalling instead that the lookup failed makes distinguishing on a case by case basis possible. Also contributes to #57
This commit is contained in:
parent
6c5e40f8ad
commit
b23201a49c
|
@ -49,6 +49,8 @@ import Control.Concurrent.STM.TQueue
|
|||
import Control.Concurrent.STM.TVar
|
||||
import Control.Exception
|
||||
import Control.Monad (foldM, forM, forM_, void, when)
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
import Control.Monad.Except (MonadError(..), runExceptT)
|
||||
import qualified Data.ByteString as BS
|
||||
import Data.Either (rights)
|
||||
import Data.Foldable (foldl', foldr')
|
||||
|
@ -533,9 +535,10 @@ requestJoin toJoinOn ownStateSTM = do
|
|||
|
||||
|
||||
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
|
||||
requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
|
||||
requestQueryID :: (MonadIO m, MonadError String m)
|
||||
=> LocalNodeState s -- ^ NodeState of the querying node
|
||||
-> NodeID -- ^ target key ID to look up
|
||||
-> IO RemoteNodeState -- ^ the node responsible for handling that key
|
||||
-> m RemoteNodeState -- ^ the node responsible for handling that key
|
||||
-- 1. do a local lookup for the l closest nodes
|
||||
-- 2. create l sockets
|
||||
-- 3. send a message async concurrently to all l nodes
|
||||
|
@ -543,23 +546,23 @@ requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
|
|||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
-- TODO: deal with lookup failures
|
||||
requestQueryID ns targetID = do
|
||||
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
|
||||
firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns
|
||||
-- TODO: make maxAttempts configurable
|
||||
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
|
||||
|
||||
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
|
||||
queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState
|
||||
queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState
|
||||
-- return node itself as default fallback value against infinite recursion.
|
||||
-- TODO: consider using an Either instead of a default value
|
||||
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
|
||||
queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts"
|
||||
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
|
||||
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
|
||||
-- FOUND can only be returned if targetID is owned by local node
|
||||
case localResult of
|
||||
FOUND thisNode -> pure thisNode
|
||||
FORWARD nodeSet -> do
|
||||
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
|
||||
now <- getPOSIXTime
|
||||
responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
|
||||
now <- liftIO getPOSIXTime
|
||||
-- check for a FOUND and return it
|
||||
case responseEntries of
|
||||
FOUND foundNode -> pure foundNode
|
||||
|
|
|
@ -166,6 +166,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
|
|||
currentlyResponsible <- liftEither lookupResp
|
||||
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||
-- 2. then send a join to the currently responsible node
|
||||
liftIO $ putStrLn "send a bootstrap Join"
|
||||
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
|
||||
liftEither joinResult
|
||||
|
||||
|
@ -244,26 +245,24 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
|||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||
)
|
||||
initCache resp
|
||||
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
|
||||
pure $ Right currentlyResponsible
|
||||
currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
|
||||
pure currentlyResponsible
|
||||
|
||||
|
||||
-- | join a node to the DHT using the global node cache
|
||||
-- node's position.
|
||||
fediChordVserverJoin :: Service s (RealNodeSTM s)
|
||||
fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
|
||||
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
|
||||
-> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
fediChordVserverJoin nsSTM = do
|
||||
ns <- readTVarIO nsSTM
|
||||
ns <- liftIO $ readTVarIO nsSTM
|
||||
-- 1. get routed to the currently responsible node
|
||||
currentlyResponsible <- requestQueryID ns $ getNid ns
|
||||
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||
-- 2. then send a join to the currently responsible node
|
||||
joinResult <- requestJoin currentlyResponsible nsSTM
|
||||
case joinResult of
|
||||
Left err -> pure . Left $ "Error joining on " <> err
|
||||
Right joinedNS -> pure . Right $ joinedNS
|
||||
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
|
||||
liftEither joinResult
|
||||
|
||||
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
|
||||
fediChordVserverLeave ns = do
|
||||
|
@ -323,7 +322,7 @@ joinOnNewEntriesThread nsSTM = loop
|
|||
pure ()
|
||||
-- otherwise try joining
|
||||
FORWARD _ -> do
|
||||
joinResult <- fediChordVserverJoin nsSTM
|
||||
joinResult <- runExceptT $ fediChordVserverJoin nsSTM
|
||||
either
|
||||
-- on join failure, sleep and retry
|
||||
-- TODO: make delay configurable
|
||||
|
@ -504,18 +503,26 @@ stabiliseThread nsSTM = forever $ do
|
|||
-- try looking up additional neighbours if list too short
|
||||
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||
ns' <- readTVarIO nsSTM
|
||||
nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
||||
atomically $ do
|
||||
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
||||
either
|
||||
(const $ pure ())
|
||||
(\entry -> atomically $ do
|
||||
latestNs <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
|
||||
writeTVar nsSTM $ addPredecessors [entry] latestNs
|
||||
)
|
||||
nextEntry
|
||||
)
|
||||
|
||||
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||
ns' <- readTVarIO nsSTM
|
||||
nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
|
||||
atomically $ do
|
||||
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
|
||||
either
|
||||
(const $ pure ())
|
||||
(\entry -> atomically $ do
|
||||
latestNs <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
|
||||
writeTVar nsSTM $ addSuccessors [entry] latestNs
|
||||
)
|
||||
nextEntry
|
||||
)
|
||||
|
||||
newNs <- readTVarIO nsSTM
|
||||
|
@ -638,7 +645,7 @@ requestMapPurge :: MVar RequestMap -> IO ()
|
|||
requestMapPurge mapVar = forever $ do
|
||||
rMapState <- takeMVar mapVar
|
||||
now <- getPOSIXTime
|
||||
putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
|
||||
putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) ->
|
||||
now - ts < responsePurgeAge
|
||||
) rMapState
|
||||
threadDelay $ round responsePurgeAge * 2 * 10^6
|
||||
|
@ -757,7 +764,7 @@ getKeyResponsibility nodeSTM lookupKey = do
|
|||
-- new entry.
|
||||
-- If no vserver is active in the DHT, 'Nothing' is returned.
|
||||
updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
|
||||
updateLookupCache nodeSTM lookupKey = do
|
||||
updateLookupCache nodeSTM keyToLookup = do
|
||||
(node, lookupSource) <- atomically $ do
|
||||
node <- readTVar nodeSTM
|
||||
let firstVs = headMay (vservers node)
|
||||
|
@ -767,18 +774,25 @@ updateLookupCache nodeSTM lookupKey = do
|
|||
pure (node, lookupSource)
|
||||
maybe (do
|
||||
-- if no local node available, delete cache entry and return Nothing
|
||||
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey
|
||||
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup
|
||||
pure Nothing
|
||||
)
|
||||
(\n -> do
|
||||
-- start a lookup from the node, update the cache with the lookup result and return it
|
||||
newResponsible <- requestQueryID n lookupKey
|
||||
let newEntry = (getDomain newResponsible, getServicePort newResponsible)
|
||||
-- TODO: better retry management, because having no vserver joined yet should
|
||||
-- be treated differently than other reasons for not getting a result.
|
||||
newResponsible <- runExceptT $ requestQueryID n keyToLookup
|
||||
either
|
||||
(const $ pure Nothing)
|
||||
(\result -> do
|
||||
let newEntry = (getDomain result, getServicePort result)
|
||||
now <- getPOSIXTime
|
||||
-- atomic update against lost updates
|
||||
atomically $ modifyTVar' (lookupCacheSTM node) $
|
||||
Map.insert lookupKey (CacheEntry False newEntry now)
|
||||
Map.insert keyToLookup (CacheEntry False newEntry now)
|
||||
pure $ Just newEntry
|
||||
)
|
||||
newResponsible
|
||||
) lookupSource
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue