bootstrapQueryID: try all possible node IDs of a bootstrap node
- closes #77 - when k-choices (#2) joining, try addressing each possible node ID of the bootstrap node until success - bugfix: include correct target ID of node that shall respond in QueryID requests
This commit is contained in:
parent
9a61c186e3
commit
21ecf9b041
|
@ -649,14 +649,14 @@ sendQueryIdMessages :: (Integral i)
|
||||||
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
|
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
|
||||||
-> [RemoteNodeState] -- ^ nodes to query
|
-> [RemoteNodeState] -- ^ nodes to query
|
||||||
-> IO QueryResponse -- ^ accumulated response
|
-> IO QueryResponse -- ^ accumulated response
|
||||||
sendQueryIdMessages targetID ns lParam targets = do
|
sendQueryIdMessages lookupID ns lParam targets = do
|
||||||
|
|
||||||
-- create connected sockets to all query targets and use them for request handling
|
-- create connected sockets to all query targets and use them for request handling
|
||||||
|
|
||||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
let srcAddr = confIP nodeConf
|
let srcAddr = confIP nodeConf
|
||||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
||||||
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode))
|
||||||
)) targets
|
)) targets
|
||||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||||
-- ToDo: exception handling, maybe log them
|
-- ToDo: exception handling, maybe log them
|
||||||
|
@ -689,13 +689,14 @@ sendQueryIdMessages targetID ns lParam targets = do
|
||||||
|
|
||||||
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
||||||
lookupMessage :: Integral i
|
lookupMessage :: Integral i
|
||||||
=> NodeID -- ^ target ID
|
=> NodeID -- ^ lookup ID to be looked up
|
||||||
-> LocalNodeState s -- ^ sender node state
|
-> LocalNodeState s -- ^ sender node state
|
||||||
-> Maybe i -- ^ optionally provide a different l parameter
|
-> Maybe i -- ^ optionally provide a different l parameter
|
||||||
|
-> NodeID -- ^ target ID of message destination
|
||||||
-> (Integer -> FediChordMessage)
|
-> (Integer -> FediChordMessage)
|
||||||
lookupMessage targetID ns lParam = mkRequest ns targetID QueryID (Just $ pl ns targetID)
|
lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID)
|
||||||
where
|
where
|
||||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam }
|
pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam }
|
||||||
|
|
||||||
|
|
||||||
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
||||||
|
|
|
@ -123,7 +123,7 @@ fediChordInit initConf serviceRunner = do
|
||||||
-- prepare for joining: start node cache writer thread
|
-- prepare for joining: start node cache writer thread
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
|
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
|
||||||
fediThreadsAsync <- do
|
fediThreadsAsync <-
|
||||||
either (\err -> do
|
either (\err -> do
|
||||||
-- handle unsuccessful join
|
-- handle unsuccessful join
|
||||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||||
|
@ -255,15 +255,14 @@ kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarg
|
||||||
activeVsSet = HMap.keysSet activeVss
|
activeVsSet = HMap.keysSet activeVss
|
||||||
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
|
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
|
||||||
-- LocalNodeState creation
|
-- LocalNodeState creation
|
||||||
nonJoinedIDs = filter (not . flip HSet.member activeVsSet . fst) [ (genNodeID (confIP conf) (confDomain conf) (fromInteger v), v) | v <- [0..confKChoicesMaxVS conf]]
|
nonJoinedIDs = filter (not . flip HSet.member activeVsSet . fst) [ (genNodeID (confIP conf) (confDomain conf) (fromInteger v), v) | v <- [0..pred (confKChoicesMaxVS conf)]]
|
||||||
queryVs <- liftIO $ readTVarIO queryVsSTM
|
queryVs <- liftIO $ readTVarIO queryVsSTM
|
||||||
|
|
||||||
-- query load of all possible segments
|
-- query load of all possible segments
|
||||||
-- simplification: treat each load lookup failure as a general unavailability of that segment
|
-- simplification: treat each load lookup failure as a general unavailability of that segment
|
||||||
-- TODO: retries for transient failures
|
-- TODO: retries for transient failures
|
||||||
segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
|
segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do
|
||||||
lookupResp <- liftIO $ bootstrapQueryId queryVsSTM bootstrapNode vsNid
|
currentlyResponsible <- bootstrapQueryId queryVsSTM bootstrapNode vsNid
|
||||||
currentlyResponsible <- liftEither lookupResp
|
|
||||||
segment <- requestQueryLoad queryVs vsNid currentlyResponsible
|
segment <- requestQueryLoad queryVs vsNid currentlyResponsible
|
||||||
pure $ Just (segment, vsId, currentlyResponsible)
|
pure $ Just (segment, vsId, currentlyResponsible)
|
||||||
-- store segment stats and vserver ID together, so it's clear
|
-- store segment stats and vserver ID together, so it's clear
|
||||||
|
@ -319,8 +318,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
runExceptT $ do
|
runExceptT $ do
|
||||||
-- 1. get routed to the currently responsible node
|
-- 1. get routed to the currently responsible node
|
||||||
lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns
|
currentlyResponsible <- bootstrapQueryId nsSTM bootstrapNode $ getNid ns
|
||||||
currentlyResponsible <- liftEither lookupResp
|
|
||||||
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||||
-- 2. then send a join to the currently responsible node
|
-- 2. then send a join to the currently responsible node
|
||||||
liftIO $ putStrLn "send a bootstrap Join"
|
liftIO $ putStrLn "send a bootstrap Join"
|
||||||
|
@ -342,8 +340,7 @@ convergenceSampleThread nodeSTM = forever $ do
|
||||||
let bss = bootstrapNodes parentNode
|
let bss = bootstrapNodes parentNode
|
||||||
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
||||||
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
||||||
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
currentlyResponsible <- bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
||||||
currentlyResponsible <- liftEither lookupResult
|
|
||||||
if getNid currentlyResponsible /= getNid nsSnap
|
if getNid currentlyResponsible /= getNid nsSnap
|
||||||
-- if mismatch, stabilise on the result, else do nothing
|
-- if mismatch, stabilise on the result, else do nothing
|
||||||
then do
|
then do
|
||||||
|
@ -393,34 +390,60 @@ tryBootstrapJoining nodeSTM = do
|
||||||
|
|
||||||
|
|
||||||
-- | Look up a key just based on the responses of a single bootstrapping node.
|
-- | Look up a key just based on the responses of a single bootstrapping node.
|
||||||
bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
|
bootstrapQueryId :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||||
|
=> LocalNodeStateSTM s
|
||||||
|
-> (String, PortNumber)
|
||||||
|
-> NodeID
|
||||||
|
-> m RemoteNodeState
|
||||||
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
||||||
ns <- readTVarIO nsSTM
|
ns <- liftIO $ readTVarIO nsSTM
|
||||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
|
||||||
let srcAddr = confIP nodeConf
|
let srcAddr = confIP nodeConf
|
||||||
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
|
-- IP address needed for ID generation, so look it up
|
||||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
bootstrapAddr <- addrAddress <$> liftIO (resolve (Just bootstrapHost) (Just bootstrapPort))
|
||||||
fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
bootstrapIP <- case bootstrapAddr of
|
||||||
)
|
SockAddrInet6 _ _ bootstrapIP _ -> pure bootstrapIP
|
||||||
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
|
_ -> throwError $ "Expected an IPv6 address, but got " <> show bootstrapAddr
|
||||||
|
let possibleJoinIDs =
|
||||||
case bootstrapResponse of
|
[ genNodeID bootstrapIP bootstrapHost (fromInteger v) | v <- [0..pred (
|
||||||
Left err -> pure $ Left err
|
if confEnableKChoices nodeConf then confKChoicesMaxVS nodeConf else 1)]]
|
||||||
Right resp
|
tryQuery ns srcAddr nodeConf possibleJoinIDs
|
||||||
| resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
|
where
|
||||||
| otherwise -> do
|
-- | try bootstrapping a query through any possible ID of the
|
||||||
now <- getPOSIXTime
|
-- given bootstrap node
|
||||||
-- create new cache with all returned node responses
|
tryQuery :: (MonadError String m, MonadIO m)
|
||||||
let bootstrapCache =
|
=> LocalNodeState s
|
||||||
-- traverse response parts
|
-> HostAddress6
|
||||||
foldr' (\resp cacheAcc -> case queryResult <$> payload resp of
|
-> FediChordConf
|
||||||
Nothing -> cacheAcc
|
-> [NodeID]
|
||||||
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
-> m RemoteNodeState
|
||||||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
tryQuery _ _ _ [] = throwError $ "No ID of " <> show bootstrapHost <> " has responded."
|
||||||
)
|
tryQuery ns srcAddr nodeConf (bnid:bnids) = (do
|
||||||
initCache resp
|
bootstrapResponse <- liftIO $ bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
|
||||||
runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
|
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||||
|
fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing bnid)
|
||||||
|
)
|
||||||
|
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
|
||||||
|
|
||||||
|
case bootstrapResponse of
|
||||||
|
Left err -> throwError err
|
||||||
|
Right resp
|
||||||
|
| resp == Set.empty -> throwError $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
|
||||||
|
| otherwise -> do
|
||||||
|
now <- liftIO getPOSIXTime
|
||||||
|
-- create new cache with all returned node responses
|
||||||
|
let bootstrapCache =
|
||||||
|
-- traverse response parts
|
||||||
|
foldr' (\resp' cacheAcc -> case queryResult <$> payload resp' of
|
||||||
|
Nothing -> cacheAcc
|
||||||
|
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
||||||
|
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||||
|
)
|
||||||
|
initCache resp
|
||||||
|
queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
|
||||||
|
) `catchError` (\_ ->
|
||||||
|
-- only throw an error if all IDs have been tried
|
||||||
|
tryQuery ns srcAddr nodeConf bnids)
|
||||||
|
|
||||||
-- | join a node to the DHT using the global node cache
|
-- | join a node to the DHT using the global node cache
|
||||||
-- node's position.
|
-- node's position.
|
||||||
|
|
Loading…
Reference in a new issue