From 21ecf9b0417878b52191f0f2dea2994f97f23c30 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 27 Sep 2020 02:06:45 +0200 Subject: [PATCH] bootstrapQueryID: try all possible node IDs of a bootstrap node - closes #77 - when k-choices (#2) joining, try addressing each possible node ID of the bootstrap node until success - bugfix: include correct target ID of node that shall respond in QueryID requests --- src/Hash2Pub/DHTProtocol.hs | 11 ++--- src/Hash2Pub/FediChord.hs | 89 +++++++++++++++++++++++-------------- 2 files changed, 62 insertions(+), 38 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 1682e16..249ebef 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -649,14 +649,14 @@ sendQueryIdMessages :: (Integral i) -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned -> [RemoteNodeState] -- ^ nodes to query -> IO QueryResponse -- ^ accumulated response -sendQueryIdMessages targetID ns lParam targets = do +sendQueryIdMessages lookupID ns lParam targets = do -- create connected sockets to all query targets and use them for request handling nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) let srcAddr = confIP nodeConf queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) + sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage lookupID ns Nothing (getNid resultNode)) )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them @@ -689,13 +689,14 @@ sendQueryIdMessages targetID ns lParam targets = do -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i - => NodeID -- ^ target ID + => NodeID -- ^ lookup ID to be looked up -> LocalNodeState s -- ^ sender node state -> Maybe i -- ^ optionally provide a different l parameter + -> NodeID -- ^ target ID of message destination -> (Integer -> FediChordMessage) -lookupMessage targetID ns lParam = mkRequest ns targetID QueryID (Just $ pl ns targetID) +lookupMessage lookupID ns lParam targetID = mkRequest ns targetID QueryID (Just $ pl ns lookupID) where - pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns) fromIntegral lParam } + pl ns' lookupID' = QueryIDRequestPayload { queryTargetID = lookupID', queryLBestNodes = maybe (fromIntegral $ lNumBestNodes ns') fromIntegral lParam } -- | Send a stabilise request to provided 'RemoteNode' and, if successful, diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 685caea..6c90b5d 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -123,7 +123,7 @@ fediChordInit initConf serviceRunner = do -- prepare for joining: start node cache writer thread -- currently no masking is necessary, as there is nothing to clean up nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM - fediThreadsAsync <- do + fediThreadsAsync <- either (\err -> do -- handle unsuccessful join putStrLn $ err <> " Error joining, start listening for incoming requests anyways" @@ -255,15 +255,14 @@ kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarg activeVsSet = HMap.keysSet activeVss -- tuples of node IDs and vserver IDs, because vserver IDs are needed for -- LocalNodeState creation - nonJoinedIDs = filter (not . flip HSet.member activeVsSet . fst) [ (genNodeID (confIP conf) (confDomain conf) (fromInteger v), v) | v <- [0..confKChoicesMaxVS conf]] + nonJoinedIDs = filter (not . flip HSet.member activeVsSet . fst) [ (genNodeID (confIP conf) (confDomain conf) (fromInteger v), v) | v <- [0..pred (confKChoicesMaxVS conf)]] queryVs <- liftIO $ readTVarIO queryVsSTM -- query load of all possible segments -- simplification: treat each load lookup failure as a general unavailability of that segment -- TODO: retries for transient failures segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do - lookupResp <- liftIO $ bootstrapQueryId queryVsSTM bootstrapNode vsNid - currentlyResponsible <- liftEither lookupResp + currentlyResponsible <- bootstrapQueryId queryVsSTM bootstrapNode vsNid segment <- requestQueryLoad queryVs vsNid currentlyResponsible pure $ Just (segment, vsId, currentlyResponsible) -- store segment stats and vserver ID together, so it's clear @@ -319,8 +318,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do ns <- readTVarIO nsSTM runExceptT $ do -- 1. get routed to the currently responsible node - lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns - currentlyResponsible <- liftEither lookupResp + currentlyResponsible <- bootstrapQueryId nsSTM bootstrapNode $ getNid ns liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node liftIO $ putStrLn "send a bootstrap Join" @@ -342,8 +340,7 @@ convergenceSampleThread nodeSTM = forever $ do let bss = bootstrapNodes parentNode randIndex <- liftIO $ randomRIO (0, length bss - 1) chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) - currentlyResponsible <- liftEither lookupResult + currentlyResponsible <- bootstrapQueryId nsSTM chosenNode (getNid nsSnap) if getNid currentlyResponsible /= getNid nsSnap -- if mismatch, stabilise on the result, else do nothing then do @@ -393,34 +390,60 @@ tryBootstrapJoining nodeSTM = do -- | Look up a key just based on the responses of a single bootstrapping node. -bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) +bootstrapQueryId :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) + => LocalNodeStateSTM s + -> (String, PortNumber) + -> NodeID + -> m RemoteNodeState bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do - ns <- readTVarIO nsSTM - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) + ns <- liftIO $ readTVarIO nsSTM + nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) let srcAddr = confIP nodeConf - bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( - -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) - ) - `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) - - case bootstrapResponse of - Left err -> pure $ Left err - Right resp - | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." - | otherwise -> do - now <- getPOSIXTime - -- create new cache with all returned node responses - let bootstrapCache = - -- traverse response parts - foldr' (\resp cacheAcc -> case queryResult <$> payload resp of - Nothing -> cacheAcc - Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc - Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset - ) - initCache resp - runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + -- IP address needed for ID generation, so look it up + bootstrapAddr <- addrAddress <$> liftIO (resolve (Just bootstrapHost) (Just bootstrapPort)) + bootstrapIP <- case bootstrapAddr of + SockAddrInet6 _ _ bootstrapIP _ -> pure bootstrapIP + _ -> throwError $ "Expected an IPv6 address, but got " <> show bootstrapAddr + let possibleJoinIDs = + [ genNodeID bootstrapIP bootstrapHost (fromInteger v) | v <- [0..pred ( + if confEnableKChoices nodeConf then confKChoicesMaxVS nodeConf else 1)]] + tryQuery ns srcAddr nodeConf possibleJoinIDs + where + -- | try bootstrapping a query through any possible ID of the + -- given bootstrap node + tryQuery :: (MonadError String m, MonadIO m) + => LocalNodeState s + -> HostAddress6 + -> FediChordConf + -> [NodeID] + -> m RemoteNodeState + tryQuery _ _ _ [] = throwError $ "No ID of " <> show bootstrapHost <> " has responded." + tryQuery ns srcAddr nodeConf (bnid:bnids) = (do + bootstrapResponse <- liftIO $ bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( + -- Initialise an empty cache only with the responses from a bootstrapping node + fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing bnid) + ) + `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) + case bootstrapResponse of + Left err -> throwError err + Right resp + | resp == Set.empty -> throwError $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." + | otherwise -> do + now <- liftIO getPOSIXTime + -- create new cache with all returned node responses + let bootstrapCache = + -- traverse response parts + foldr' (\resp' cacheAcc -> case queryResult <$> payload resp' of + Nothing -> cacheAcc + Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc + Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset + ) + initCache resp + queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + ) `catchError` (\_ -> + -- only throw an error if all IDs have been tried + tryQuery ns srcAddr nodeConf bnids) -- | join a node to the DHT using the global node cache -- node's position.