From 61ea6ed3ff90266fda46428518e4cf4de0809048 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 8 Jul 2020 01:18:44 +0200 Subject: [PATCH] Periodically contact bootstrap nodes for convergence sampling or joining closes #56 --- app/Main.hs | 1 + src/Hash2Pub/FediChord.hs | 115 +++++++++++++++++++++++---------- src/Hash2Pub/FediChordTypes.hs | 15 +++-- 3 files changed, 91 insertions(+), 40 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index cc93c26..cdfc2b3 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -50,4 +50,5 @@ readConfig = do , confDhtPort = read portString , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] --, confStabiliseInterval = 60 + , confBootstrapSamplingInterval = 180 } diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index de9a462..2b9a2ef 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -82,6 +82,7 @@ import Network.Socket hiding (recv, recvFrom, send, sendTo) import Network.Socket.ByteString import Safe +import System.Random (randomRIO) import Hash2Pub.DHTProtocol import Hash2Pub.FediChordTypes @@ -142,30 +143,48 @@ fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeSta -> (String, PortNumber) -- ^ domain and port of a bootstrapping node -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordBootstrapJoin nsSTM (joinHost, joinPort) = +fediChordBootstrapJoin nsSTM bootstrapNode = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined - bracket (mkSendSocket joinHost joinPort) close (\sock -> do - putStrLn "BootstrapJoin" - -- 1. get routed to placement of own ID until FOUND: - -- Initialise an empty cache only with the responses from a bootstrapping node - ns <- readTVarIO nsSTM - bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock - if bootstrapResponse == Set.empty - then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." - else do - now <- getPOSIXTime - -- create new cache with all returned node responses - let bootstrapCache = - -- traverse response parts - foldr' (\resp cacheAcc -> case queryResult <$> payload resp of - Nothing -> cacheAcc - Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc - Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset - ) - initCache bootstrapResponse - fediChordJoin bootstrapCache nsSTM - ) - `catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException)) + ns <- readTVarIO nsSTM + runExceptT $ do + -- 1. get routed to the currently responsible node + lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns + currentlyResponsible <- liftEither lookupResp + liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + -- 2. then send a join to the currently responsible node + joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM + liftEither joinResult + +-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. +-- Unjoined try joining instead. +convergenceSampleThread :: LocalNodeStateSTM -> IO () +convergenceSampleThread nsSTM = forever $ do + nsSnap <- readTVarIO nsSTM + parentNode <- readTVarIO $ parentRealNode nsSnap + if isJoined nsSnap + then + runExceptT (do + -- joined node: choose random node, do queryIDLoop, compare result with own responsibility + let bss = bootstrapNodes parentNode + randIndex <- liftIO $ randomRIO (0, length bss - 1) + chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex + lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) + currentlyResponsible <- liftEither lookupResult + if getNid currentlyResponsible /= getNid nsSnap + -- if mismatch, stabilise on the result, else do nothing + then do + stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible + (preds, succs) <- liftEither stabResult + -- TODO: verify neighbours before adding, see #55 + liftIO . atomically $ do + ns <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors preds ns + else pure () + ) >> pure () + -- unjoined node: try joining through all bootstrapping nodes + else tryBootstrapJoining nsSTM >> pure () + let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode + threadDelay $ delaySecs * 10^6 -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. @@ -185,19 +204,44 @@ tryBootstrapJoining nsSTM = do tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." +-- | Look up a key just based on the responses of a single bootstrapping node. +bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) +bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do + ns <- readTVarIO nsSTM + bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close ( + -- Initialise an empty cache only with the responses from a bootstrapping node + fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + ) + `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) --- | join a node to the DHT, using the provided cache snapshot for resolving the new + case bootstrapResponse of + Left err -> pure $ Left err + Right resp + | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." + | otherwise -> do + now <- getPOSIXTime + -- create new cache with all returned node responses + let bootstrapCache = + -- traverse response parts + foldr' (\resp cacheAcc -> case queryResult <$> payload resp of + Nothing -> cacheAcc + Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc + Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset + ) + initCache resp + currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure $ Right currentlyResponsible + + +-- | join a node to the DHT using the global node cache -- node's position. -fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to - -- use for ID lookup - -> LocalNodeStateSTM -- ^ the local 'NodeState' +fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordJoin cacheSnapshot nsSTM = do +fediChordJoin nsSTM = do ns <- readTVarIO nsSTM - -- get routed to the currently responsible node, based on the response - -- from the bootstrapping node - currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns + -- 1. get routed to the currently responsible node + currentlyResponsible <- requestQueryID ns $ getNid ns putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node joinResult <- requestJoin currentlyResponsible nsSTM @@ -226,7 +270,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- fediChordJoin cache nsSTM + joinResult <- fediChordJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -497,9 +541,10 @@ fediMainThreads sock nsSTM = do (fediMessageHandler sendQ recvQ nsSTM) $ concurrently_ (stabiliseThread nsSTM) $ concurrently_ (cacheVerifyThread nsSTM) $ - concurrently_ - (sendThread sock sendQ) - (recvThread sock recvQ) + concurrently_ (convergenceSampleThread nsSTM) $ + concurrently_ + (sendThread sock sendQ) + (recvThread sock recvQ) -- defining this here as, for now, the RequestMap is only used by fediMessageHandler. diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 8351eba..296ebfa 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -588,11 +588,16 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs -- | configuration values used for initialising the FediChord DHT data FediChordConf = FediChordConf - { confDomain :: String - , confIP :: HostAddress6 - , confDhtPort :: Int - , confBootstrapNodes :: [(String, PortNumber)] - --, confStabiliseInterval :: Int + { confDomain :: String + -- ^ the domain/ hostname the node is reachable under + , confIP :: HostAddress6 + -- ^ IP address of outgoing packets + , confDhtPort :: Int + -- ^ listening port for the FediChord DHT + , confBootstrapNodes :: [(String, PortNumber)] + -- ^ list of potential bootstrapping nodes + , confBootstrapSamplingInterval :: Int + -- ^ pause between sampling the own ID through bootstrap nodes, in seconds } deriving (Show, Eq)