Periodically contact bootstrap nodes for convergence sampling or joining
closes #56
This commit is contained in:
parent
56ca2b53cc
commit
61ea6ed3ff
|
@ -50,4 +50,5 @@ readConfig = do
|
|||
, confDhtPort = read portString
|
||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
||||
--, confStabiliseInterval = 60
|
||||
, confBootstrapSamplingInterval = 180
|
||||
}
|
||||
|
|
|
@ -82,6 +82,7 @@ import Network.Socket hiding (recv, recvFrom, send,
|
|||
sendTo)
|
||||
import Network.Socket.ByteString
|
||||
import Safe
|
||||
import System.Random (randomRIO)
|
||||
|
||||
import Hash2Pub.DHTProtocol
|
||||
import Hash2Pub.FediChordTypes
|
||||
|
@ -142,30 +143,48 @@ fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeSta
|
|||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
||||
fediChordBootstrapJoin nsSTM bootstrapNode = do
|
||||
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
||||
bracket (mkSendSocket joinHost joinPort) close (\sock -> do
|
||||
putStrLn "BootstrapJoin"
|
||||
-- 1. get routed to placement of own ID until FOUND:
|
||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||
ns <- readTVarIO nsSTM
|
||||
bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock
|
||||
if bootstrapResponse == Set.empty
|
||||
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
|
||||
else do
|
||||
now <- getPOSIXTime
|
||||
-- create new cache with all returned node responses
|
||||
let bootstrapCache =
|
||||
-- traverse response parts
|
||||
foldr' (\resp cacheAcc -> case queryResult <$> payload resp of
|
||||
Nothing -> cacheAcc
|
||||
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
||||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||
)
|
||||
initCache bootstrapResponse
|
||||
fediChordJoin bootstrapCache nsSTM
|
||||
)
|
||||
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
|
||||
ns <- readTVarIO nsSTM
|
||||
runExceptT $ do
|
||||
-- 1. get routed to the currently responsible node
|
||||
lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns
|
||||
currentlyResponsible <- liftEither lookupResp
|
||||
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||
-- 2. then send a join to the currently responsible node
|
||||
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
|
||||
liftEither joinResult
|
||||
|
||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
||||
-- Unjoined try joining instead.
|
||||
convergenceSampleThread :: LocalNodeStateSTM -> IO ()
|
||||
convergenceSampleThread nsSTM = forever $ do
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
parentNode <- readTVarIO $ parentRealNode nsSnap
|
||||
if isJoined nsSnap
|
||||
then
|
||||
runExceptT (do
|
||||
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
||||
let bss = bootstrapNodes parentNode
|
||||
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
||||
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
||||
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
||||
currentlyResponsible <- liftEither lookupResult
|
||||
if getNid currentlyResponsible /= getNid nsSnap
|
||||
-- if mismatch, stabilise on the result, else do nothing
|
||||
then do
|
||||
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
||||
(preds, succs) <- liftEither stabResult
|
||||
-- TODO: verify neighbours before adding, see #55
|
||||
liftIO . atomically $ do
|
||||
ns <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors preds ns
|
||||
else pure ()
|
||||
) >> pure ()
|
||||
-- unjoined node: try joining through all bootstrapping nodes
|
||||
else tryBootstrapJoining nsSTM >> pure ()
|
||||
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
|
||||
threadDelay $ delaySecs * 10^6
|
||||
|
||||
|
||||
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
|
||||
|
@ -185,19 +204,44 @@ tryBootstrapJoining nsSTM = do
|
|||
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
|
||||
|
||||
|
||||
-- | Look up a key just based on the responses of a single bootstrapping node.
|
||||
bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
|
||||
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
||||
ns <- readTVarIO nsSTM
|
||||
bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close (
|
||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||
fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
|
||||
)
|
||||
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
|
||||
|
||||
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
|
||||
case bootstrapResponse of
|
||||
Left err -> pure $ Left err
|
||||
Right resp
|
||||
| resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
|
||||
| otherwise -> do
|
||||
now <- getPOSIXTime
|
||||
-- create new cache with all returned node responses
|
||||
let bootstrapCache =
|
||||
-- traverse response parts
|
||||
foldr' (\resp cacheAcc -> case queryResult <$> payload resp of
|
||||
Nothing -> cacheAcc
|
||||
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
||||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||
)
|
||||
initCache resp
|
||||
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
|
||||
pure $ Right currentlyResponsible
|
||||
|
||||
|
||||
-- | join a node to the DHT using the global node cache
|
||||
-- node's position.
|
||||
fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to
|
||||
-- use for ID lookup
|
||||
-> LocalNodeStateSTM -- ^ the local 'NodeState'
|
||||
fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState'
|
||||
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
fediChordJoin cacheSnapshot nsSTM = do
|
||||
fediChordJoin nsSTM = do
|
||||
ns <- readTVarIO nsSTM
|
||||
-- get routed to the currently responsible node, based on the response
|
||||
-- from the bootstrapping node
|
||||
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
|
||||
-- 1. get routed to the currently responsible node
|
||||
currentlyResponsible <- requestQueryID ns $ getNid ns
|
||||
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||
-- 2. then send a join to the currently responsible node
|
||||
joinResult <- requestJoin currentlyResponsible nsSTM
|
||||
|
@ -226,7 +270,7 @@ joinOnNewEntriesThread nsSTM = loop
|
|||
pure ()
|
||||
-- otherwise try joining
|
||||
FORWARD _ -> do
|
||||
joinResult <- fediChordJoin cache nsSTM
|
||||
joinResult <- fediChordJoin nsSTM
|
||||
either
|
||||
-- on join failure, sleep and retry
|
||||
-- TODO: make delay configurable
|
||||
|
@ -497,9 +541,10 @@ fediMainThreads sock nsSTM = do
|
|||
(fediMessageHandler sendQ recvQ nsSTM) $
|
||||
concurrently_ (stabiliseThread nsSTM) $
|
||||
concurrently_ (cacheVerifyThread nsSTM) $
|
||||
concurrently_
|
||||
(sendThread sock sendQ)
|
||||
(recvThread sock recvQ)
|
||||
concurrently_ (convergenceSampleThread nsSTM) $
|
||||
concurrently_
|
||||
(sendThread sock sendQ)
|
||||
(recvThread sock recvQ)
|
||||
|
||||
|
||||
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
|
||||
|
|
|
@ -588,11 +588,16 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
|
|||
|
||||
-- | configuration values used for initialising the FediChord DHT
|
||||
data FediChordConf = FediChordConf
|
||||
{ confDomain :: String
|
||||
, confIP :: HostAddress6
|
||||
, confDhtPort :: Int
|
||||
, confBootstrapNodes :: [(String, PortNumber)]
|
||||
--, confStabiliseInterval :: Int
|
||||
{ confDomain :: String
|
||||
-- ^ the domain/ hostname the node is reachable under
|
||||
, confIP :: HostAddress6
|
||||
-- ^ IP address of outgoing packets
|
||||
, confDhtPort :: Int
|
||||
-- ^ listening port for the FediChord DHT
|
||||
, confBootstrapNodes :: [(String, PortNumber)]
|
||||
-- ^ list of potential bootstrapping nodes
|
||||
, confBootstrapSamplingInterval :: Int
|
||||
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
|
|
Loading…
Reference in a new issue