Compare commits

..

No commits in common. "61ea6ed3ff90266fda46428518e4cf4de0809048" and "d293cc05d148ebe7b9019afffcfdab5ff44719d5" have entirely different histories.

3 changed files with 40 additions and 102 deletions

View file

@ -21,8 +21,17 @@ main = do
(serverSock, thisNode) <- fediChordInit conf (serverSock, thisNode) <- fediChordInit conf
-- currently no masking is necessary, as there is nothing to clean up -- currently no masking is necessary, as there is nothing to clean up
cacheWriterThread <- forkIO $ cacheWriter thisNode cacheWriterThread <- forkIO $ cacheWriter thisNode
thisNodeSnap <- readTVarIO thisNode
realNode <- readTVarIO $ parentRealNode thisNodeSnap
-- try joining the DHT using one of the provided bootstrapping nodes -- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode let
tryJoining (bn:bns) = do
j <- fediChordBootstrapJoin thisNode bn
case j of
Left err -> putStrLn ("join error: " <> err) >> tryJoining bns
Right joined -> pure $ Right joined
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
joinedState <- tryJoining $ bootstrapNodes realNode
either (\err -> do either (\err -> do
-- handle unsuccessful join -- handle unsuccessful join
@ -50,5 +59,4 @@ readConfig = do
, confDhtPort = read portString , confDhtPort = read portString
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
--, confStabiliseInterval = 60 --, confStabiliseInterval = 60
, confBootstrapSamplingInterval = 180
} }

View file

@ -40,7 +40,6 @@ module Hash2Pub.FediChord (
, fediChordInit , fediChordInit
, fediChordJoin , fediChordJoin
, fediChordBootstrapJoin , fediChordBootstrapJoin
, tryBootstrapJoining
, fediMainThreads , fediMainThreads
, RealNode (..) , RealNode (..)
, nodeStateInit , nodeStateInit
@ -82,7 +81,6 @@ import Network.Socket hiding (recv, recvFrom, send,
sendTo) sendTo)
import Network.Socket.ByteString import Network.Socket.ByteString
import Safe import Safe
import System.Random (randomRIO)
import Hash2Pub.DHTProtocol import Hash2Pub.DHTProtocol
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
@ -143,82 +141,17 @@ fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeSta
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordBootstrapJoin nsSTM bootstrapNode = do fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
ns <- readTVarIO nsSTM bracket (mkSendSocket joinHost joinPort) close (\sock -> do
runExceptT $ do putStrLn "BootstrapJoin"
-- 1. get routed to the currently responsible node -- 1. get routed to placement of own ID until FOUND:
lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns
currentlyResponsible <- liftEither lookupResp
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM -> IO ()
convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap
if isJoined nsSnap
then
runExceptT (do
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
let bss = bootstrapNodes parentNode
randIndex <- liftIO $ randomRIO (0, length bss - 1)
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
currentlyResponsible <- liftEither lookupResult
if getNid currentlyResponsible /= getNid nsSnap
-- if mismatch, stabilise on the result, else do nothing
then do
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
(preds, succs) <- liftEither stabResult
-- TODO: verify neighbours before adding, see #55
liftIO . atomically $ do
ns <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors preds ns
else pure ()
) >> pure ()
-- unjoined node: try joining through all bootstrapping nodes
else tryBootstrapJoining nsSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
threadDelay $ delaySecs * 10^6
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM)
tryBootstrapJoining nsSTM = do
bss <- atomically $ do
nsSnap <- readTVar nsSTM
realNodeSnap <- readTVar $ parentRealNode nsSnap
pure $ bootstrapNodes realNodeSnap
tryJoining bss
where
tryJoining (bn:bns) = do
j <- fediChordBootstrapJoin nsSTM bn
case j of
Left err -> putStrLn ("join error: " <> err) >> tryJoining bns
Right joined -> pure $ Right joined
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
-- | Look up a key just based on the responses of a single bootstrapping node.
bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
ns <- readTVarIO nsSTM
bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close (
-- Initialise an empty cache only with the responses from a bootstrapping node -- Initialise an empty cache only with the responses from a bootstrapping node
fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) ns <- readTVarIO nsSTM
) bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) if bootstrapResponse == Set.empty
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
case bootstrapResponse of else do
Left err -> pure $ Left err
Right resp
| resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
| otherwise -> do
now <- getPOSIXTime now <- getPOSIXTime
-- create new cache with all returned node responses -- create new cache with all returned node responses
let bootstrapCache = let bootstrapCache =
@ -228,20 +161,23 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
) )
initCache resp initCache bootstrapResponse
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns fediChordJoin bootstrapCache nsSTM
pure $ Right currentlyResponsible )
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
-- | join a node to the DHT using the global node cache
-- node's position. -- node's position.
fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to
-- use for ID lookup
-> LocalNodeStateSTM -- ^ the local 'NodeState'
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordJoin nsSTM = do fediChordJoin cacheSnapshot nsSTM = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
-- 1. get routed to the currently responsible node -- get routed to the currently responsible node, based on the response
currentlyResponsible <- requestQueryID ns $ getNid ns -- from the bootstrapping node
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
joinResult <- requestJoin currentlyResponsible nsSTM joinResult <- requestJoin currentlyResponsible nsSTM
@ -270,7 +206,7 @@ joinOnNewEntriesThread nsSTM = loop
pure () pure ()
-- otherwise try joining -- otherwise try joining
FORWARD _ -> do FORWARD _ -> do
joinResult <- fediChordJoin nsSTM joinResult <- fediChordJoin cache nsSTM
either either
-- on join failure, sleep and retry -- on join failure, sleep and retry
-- TODO: make delay configurable -- TODO: make delay configurable
@ -541,7 +477,6 @@ fediMainThreads sock nsSTM = do
(fediMessageHandler sendQ recvQ nsSTM) $ (fediMessageHandler sendQ recvQ nsSTM) $
concurrently_ (stabiliseThread nsSTM) $ concurrently_ (stabiliseThread nsSTM) $
concurrently_ (cacheVerifyThread nsSTM) $ concurrently_ (cacheVerifyThread nsSTM) $
concurrently_ (convergenceSampleThread nsSTM) $
concurrently_ concurrently_
(sendThread sock sendQ) (sendThread sock sendQ)
(recvThread sock recvQ) (recvThread sock recvQ)

View file

@ -589,15 +589,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- | configuration values used for initialising the FediChord DHT -- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf data FediChordConf = FediChordConf
{ confDomain :: String { confDomain :: String
-- ^ the domain/ hostname the node is reachable under
, confIP :: HostAddress6 , confIP :: HostAddress6
-- ^ IP address of outgoing packets
, confDhtPort :: Int , confDhtPort :: Int
-- ^ listening port for the FediChord DHT
, confBootstrapNodes :: [(String, PortNumber)] , confBootstrapNodes :: [(String, PortNumber)]
-- ^ list of potential bootstrapping nodes --, confStabiliseInterval :: Int
, confBootstrapSamplingInterval :: Int
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
} }
deriving (Show, Eq) deriving (Show, Eq)