diff --git a/app/Main.hs b/app/Main.hs index cdfc2b3..03f72f1 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -21,8 +21,17 @@ main = do (serverSock, thisNode) <- fediChordInit conf -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode + thisNodeSnap <- readTVarIO thisNode + realNode <- readTVarIO $ parentRealNode thisNodeSnap -- try joining the DHT using one of the provided bootstrapping nodes - joinedState <- tryBootstrapJoining thisNode + let + tryJoining (bn:bns) = do + j <- fediChordBootstrapJoin thisNode bn + case j of + Left err -> putStrLn ("join error: " <> err) >> tryJoining bns + Right joined -> pure $ Right joined + tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." + joinedState <- tryJoining $ bootstrapNodes realNode either (\err -> do -- handle unsuccessful join @@ -50,5 +59,4 @@ readConfig = do , confDhtPort = read portString , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] --, confStabiliseInterval = 60 - , confBootstrapSamplingInterval = 180 } diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 2b9a2ef..ba7edb4 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -40,7 +40,6 @@ module Hash2Pub.FediChord ( , fediChordInit , fediChordJoin , fediChordBootstrapJoin - , tryBootstrapJoining , fediMainThreads , RealNode (..) , nodeStateInit @@ -82,7 +81,6 @@ import Network.Socket hiding (recv, recvFrom, send, sendTo) import Network.Socket.ByteString import Safe -import System.Random (randomRIO) import Hash2Pub.DHTProtocol import Hash2Pub.FediChordTypes @@ -143,82 +141,17 @@ fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeSta -> (String, PortNumber) -- ^ domain and port of a bootstrapping node -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordBootstrapJoin nsSTM bootstrapNode = do +fediChordBootstrapJoin nsSTM (joinHost, joinPort) = -- can be invoked multiple times with all known bootstrapping nodes until successfully joined - ns <- readTVarIO nsSTM - runExceptT $ do - -- 1. get routed to the currently responsible node - lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns - currentlyResponsible <- liftEither lookupResp - liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) - -- 2. then send a join to the currently responsible node - joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM - liftEither joinResult - --- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. --- Unjoined try joining instead. -convergenceSampleThread :: LocalNodeStateSTM -> IO () -convergenceSampleThread nsSTM = forever $ do - nsSnap <- readTVarIO nsSTM - parentNode <- readTVarIO $ parentRealNode nsSnap - if isJoined nsSnap - then - runExceptT (do - -- joined node: choose random node, do queryIDLoop, compare result with own responsibility - let bss = bootstrapNodes parentNode - randIndex <- liftIO $ randomRIO (0, length bss - 1) - chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) - currentlyResponsible <- liftEither lookupResult - if getNid currentlyResponsible /= getNid nsSnap - -- if mismatch, stabilise on the result, else do nothing - then do - stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible - (preds, succs) <- liftEither stabResult - -- TODO: verify neighbours before adding, see #55 - liftIO . atomically $ do - ns <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors preds ns - else pure () - ) >> pure () - -- unjoined node: try joining through all bootstrapping nodes - else tryBootstrapJoining nsSTM >> pure () - let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode - threadDelay $ delaySecs * 10^6 - - --- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM) -tryBootstrapJoining nsSTM = do - bss <- atomically $ do - nsSnap <- readTVar nsSTM - realNodeSnap <- readTVar $ parentRealNode nsSnap - pure $ bootstrapNodes realNodeSnap - tryJoining bss - where - tryJoining (bn:bns) = do - j <- fediChordBootstrapJoin nsSTM bn - case j of - Left err -> putStrLn ("join error: " <> err) >> tryJoining bns - Right joined -> pure $ Right joined - tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." - - --- | Look up a key just based on the responses of a single bootstrapping node. -bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) -bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do - ns <- readTVarIO nsSTM - bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close ( + bracket (mkSendSocket joinHost joinPort) close (\sock -> do + putStrLn "BootstrapJoin" + -- 1. get routed to placement of own ID until FOUND: -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) - ) - `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) - - case bootstrapResponse of - Left err -> pure $ Left err - Right resp - | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response." - | otherwise -> do + ns <- readTVarIO nsSTM + bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock + if bootstrapResponse == Set.empty + then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." + else do now <- getPOSIXTime -- create new cache with all returned node responses let bootstrapCache = @@ -228,20 +161,23 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) - initCache resp - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure $ Right currentlyResponsible + initCache bootstrapResponse + fediChordJoin bootstrapCache nsSTM + ) + `catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException)) - --- | join a node to the DHT using the global node cache +-- | join a node to the DHT, using the provided cache snapshot for resolving the new -- node's position. -fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' +fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to + -- use for ID lookup + -> LocalNodeStateSTM -- ^ the local 'NodeState' -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordJoin nsSTM = do +fediChordJoin cacheSnapshot nsSTM = do ns <- readTVarIO nsSTM - -- 1. get routed to the currently responsible node - currentlyResponsible <- requestQueryID ns $ getNid ns + -- get routed to the currently responsible node, based on the response + -- from the bootstrapping node + currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node joinResult <- requestJoin currentlyResponsible nsSTM @@ -270,7 +206,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- fediChordJoin nsSTM + joinResult <- fediChordJoin cache nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -541,10 +477,9 @@ fediMainThreads sock nsSTM = do (fediMessageHandler sendQ recvQ nsSTM) $ concurrently_ (stabiliseThread nsSTM) $ concurrently_ (cacheVerifyThread nsSTM) $ - concurrently_ (convergenceSampleThread nsSTM) $ - concurrently_ - (sendThread sock sendQ) - (recvThread sock recvQ) + concurrently_ + (sendThread sock sendQ) + (recvThread sock recvQ) -- defining this here as, for now, the RequestMap is only used by fediMessageHandler. diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 296ebfa..8351eba 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -588,16 +588,11 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs -- | configuration values used for initialising the FediChord DHT data FediChordConf = FediChordConf - { confDomain :: String - -- ^ the domain/ hostname the node is reachable under - , confIP :: HostAddress6 - -- ^ IP address of outgoing packets - , confDhtPort :: Int - -- ^ listening port for the FediChord DHT - , confBootstrapNodes :: [(String, PortNumber)] - -- ^ list of potential bootstrapping nodes - , confBootstrapSamplingInterval :: Int - -- ^ pause between sampling the own ID through bootstrap nodes, in seconds + { confDomain :: String + , confIP :: HostAddress6 + , confDhtPort :: Int + , confBootstrapNodes :: [(String, PortNumber)] + --, confStabiliseInterval :: Int } deriving (Show, Eq)