Compare commits
No commits in common. "61ea6ed3ff90266fda46428518e4cf4de0809048" and "d293cc05d148ebe7b9019afffcfdab5ff44719d5" have entirely different histories.
61ea6ed3ff
...
d293cc05d1
12
app/Main.hs
12
app/Main.hs
|
@ -21,8 +21,17 @@ main = do
|
||||||
(serverSock, thisNode) <- fediChordInit conf
|
(serverSock, thisNode) <- fediChordInit conf
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
||||||
|
thisNodeSnap <- readTVarIO thisNode
|
||||||
|
realNode <- readTVarIO $ parentRealNode thisNodeSnap
|
||||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||||
joinedState <- tryBootstrapJoining thisNode
|
let
|
||||||
|
tryJoining (bn:bns) = do
|
||||||
|
j <- fediChordBootstrapJoin thisNode bn
|
||||||
|
case j of
|
||||||
|
Left err -> putStrLn ("join error: " <> err) >> tryJoining bns
|
||||||
|
Right joined -> pure $ Right joined
|
||||||
|
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
|
||||||
|
joinedState <- tryJoining $ bootstrapNodes realNode
|
||||||
either (\err -> do
|
either (\err -> do
|
||||||
-- handle unsuccessful join
|
-- handle unsuccessful join
|
||||||
|
|
||||||
|
@ -50,5 +59,4 @@ readConfig = do
|
||||||
, confDhtPort = read portString
|
, confDhtPort = read portString
|
||||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
||||||
--, confStabiliseInterval = 60
|
--, confStabiliseInterval = 60
|
||||||
, confBootstrapSamplingInterval = 180
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,6 @@ module Hash2Pub.FediChord (
|
||||||
, fediChordInit
|
, fediChordInit
|
||||||
, fediChordJoin
|
, fediChordJoin
|
||||||
, fediChordBootstrapJoin
|
, fediChordBootstrapJoin
|
||||||
, tryBootstrapJoining
|
|
||||||
, fediMainThreads
|
, fediMainThreads
|
||||||
, RealNode (..)
|
, RealNode (..)
|
||||||
, nodeStateInit
|
, nodeStateInit
|
||||||
|
@ -82,7 +81,6 @@ import Network.Socket hiding (recv, recvFrom, send,
|
||||||
sendTo)
|
sendTo)
|
||||||
import Network.Socket.ByteString
|
import Network.Socket.ByteString
|
||||||
import Safe
|
import Safe
|
||||||
import System.Random (randomRIO)
|
|
||||||
|
|
||||||
import Hash2Pub.DHTProtocol
|
import Hash2Pub.DHTProtocol
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.FediChordTypes
|
||||||
|
@ -143,82 +141,17 @@ fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeSta
|
||||||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||||
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
|
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
|
||||||
-- successful join, otherwise an error message
|
-- successful join, otherwise an error message
|
||||||
fediChordBootstrapJoin nsSTM bootstrapNode = do
|
fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
||||||
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
||||||
ns <- readTVarIO nsSTM
|
bracket (mkSendSocket joinHost joinPort) close (\sock -> do
|
||||||
runExceptT $ do
|
putStrLn "BootstrapJoin"
|
||||||
-- 1. get routed to the currently responsible node
|
-- 1. get routed to placement of own ID until FOUND:
|
||||||
lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns
|
|
||||||
currentlyResponsible <- liftEither lookupResp
|
|
||||||
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
|
||||||
-- 2. then send a join to the currently responsible node
|
|
||||||
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
|
|
||||||
liftEither joinResult
|
|
||||||
|
|
||||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
|
||||||
-- Unjoined try joining instead.
|
|
||||||
convergenceSampleThread :: LocalNodeStateSTM -> IO ()
|
|
||||||
convergenceSampleThread nsSTM = forever $ do
|
|
||||||
nsSnap <- readTVarIO nsSTM
|
|
||||||
parentNode <- readTVarIO $ parentRealNode nsSnap
|
|
||||||
if isJoined nsSnap
|
|
||||||
then
|
|
||||||
runExceptT (do
|
|
||||||
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
|
||||||
let bss = bootstrapNodes parentNode
|
|
||||||
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
|
||||||
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
|
||||||
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
|
||||||
currentlyResponsible <- liftEither lookupResult
|
|
||||||
if getNid currentlyResponsible /= getNid nsSnap
|
|
||||||
-- if mismatch, stabilise on the result, else do nothing
|
|
||||||
then do
|
|
||||||
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
|
||||||
(preds, succs) <- liftEither stabResult
|
|
||||||
-- TODO: verify neighbours before adding, see #55
|
|
||||||
liftIO . atomically $ do
|
|
||||||
ns <- readTVar nsSTM
|
|
||||||
writeTVar nsSTM $ addPredecessors preds ns
|
|
||||||
else pure ()
|
|
||||||
) >> pure ()
|
|
||||||
-- unjoined node: try joining through all bootstrapping nodes
|
|
||||||
else tryBootstrapJoining nsSTM >> pure ()
|
|
||||||
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
|
|
||||||
threadDelay $ delaySecs * 10^6
|
|
||||||
|
|
||||||
|
|
||||||
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
|
|
||||||
tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM)
|
|
||||||
tryBootstrapJoining nsSTM = do
|
|
||||||
bss <- atomically $ do
|
|
||||||
nsSnap <- readTVar nsSTM
|
|
||||||
realNodeSnap <- readTVar $ parentRealNode nsSnap
|
|
||||||
pure $ bootstrapNodes realNodeSnap
|
|
||||||
tryJoining bss
|
|
||||||
where
|
|
||||||
tryJoining (bn:bns) = do
|
|
||||||
j <- fediChordBootstrapJoin nsSTM bn
|
|
||||||
case j of
|
|
||||||
Left err -> putStrLn ("join error: " <> err) >> tryJoining bns
|
|
||||||
Right joined -> pure $ Right joined
|
|
||||||
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
|
|
||||||
|
|
||||||
|
|
||||||
-- | Look up a key just based on the responses of a single bootstrapping node.
|
|
||||||
bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
|
|
||||||
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
|
||||||
ns <- readTVarIO nsSTM
|
|
||||||
bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close (
|
|
||||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||||
fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
|
ns <- readTVarIO nsSTM
|
||||||
)
|
bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock
|
||||||
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
|
if bootstrapResponse == Set.empty
|
||||||
|
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
|
||||||
case bootstrapResponse of
|
else do
|
||||||
Left err -> pure $ Left err
|
|
||||||
Right resp
|
|
||||||
| resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
|
|
||||||
| otherwise -> do
|
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
-- create new cache with all returned node responses
|
-- create new cache with all returned node responses
|
||||||
let bootstrapCache =
|
let bootstrapCache =
|
||||||
|
@ -228,20 +161,23 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
||||||
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
||||||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||||
)
|
)
|
||||||
initCache resp
|
initCache bootstrapResponse
|
||||||
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
|
fediChordJoin bootstrapCache nsSTM
|
||||||
pure $ Right currentlyResponsible
|
)
|
||||||
|
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
|
||||||
|
|
||||||
|
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
|
||||||
-- | join a node to the DHT using the global node cache
|
|
||||||
-- node's position.
|
-- node's position.
|
||||||
fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState'
|
fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to
|
||||||
|
-- use for ID lookup
|
||||||
|
-> LocalNodeStateSTM -- ^ the local 'NodeState'
|
||||||
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
|
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
|
||||||
-- successful join, otherwise an error message
|
-- successful join, otherwise an error message
|
||||||
fediChordJoin nsSTM = do
|
fediChordJoin cacheSnapshot nsSTM = do
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
-- 1. get routed to the currently responsible node
|
-- get routed to the currently responsible node, based on the response
|
||||||
currentlyResponsible <- requestQueryID ns $ getNid ns
|
-- from the bootstrapping node
|
||||||
|
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
|
||||||
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||||
-- 2. then send a join to the currently responsible node
|
-- 2. then send a join to the currently responsible node
|
||||||
joinResult <- requestJoin currentlyResponsible nsSTM
|
joinResult <- requestJoin currentlyResponsible nsSTM
|
||||||
|
@ -270,7 +206,7 @@ joinOnNewEntriesThread nsSTM = loop
|
||||||
pure ()
|
pure ()
|
||||||
-- otherwise try joining
|
-- otherwise try joining
|
||||||
FORWARD _ -> do
|
FORWARD _ -> do
|
||||||
joinResult <- fediChordJoin nsSTM
|
joinResult <- fediChordJoin cache nsSTM
|
||||||
either
|
either
|
||||||
-- on join failure, sleep and retry
|
-- on join failure, sleep and retry
|
||||||
-- TODO: make delay configurable
|
-- TODO: make delay configurable
|
||||||
|
@ -541,7 +477,6 @@ fediMainThreads sock nsSTM = do
|
||||||
(fediMessageHandler sendQ recvQ nsSTM) $
|
(fediMessageHandler sendQ recvQ nsSTM) $
|
||||||
concurrently_ (stabiliseThread nsSTM) $
|
concurrently_ (stabiliseThread nsSTM) $
|
||||||
concurrently_ (cacheVerifyThread nsSTM) $
|
concurrently_ (cacheVerifyThread nsSTM) $
|
||||||
concurrently_ (convergenceSampleThread nsSTM) $
|
|
||||||
concurrently_
|
concurrently_
|
||||||
(sendThread sock sendQ)
|
(sendThread sock sendQ)
|
||||||
(recvThread sock recvQ)
|
(recvThread sock recvQ)
|
||||||
|
|
|
@ -589,15 +589,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
|
||||||
-- | configuration values used for initialising the FediChord DHT
|
-- | configuration values used for initialising the FediChord DHT
|
||||||
data FediChordConf = FediChordConf
|
data FediChordConf = FediChordConf
|
||||||
{ confDomain :: String
|
{ confDomain :: String
|
||||||
-- ^ the domain/ hostname the node is reachable under
|
|
||||||
, confIP :: HostAddress6
|
, confIP :: HostAddress6
|
||||||
-- ^ IP address of outgoing packets
|
|
||||||
, confDhtPort :: Int
|
, confDhtPort :: Int
|
||||||
-- ^ listening port for the FediChord DHT
|
|
||||||
, confBootstrapNodes :: [(String, PortNumber)]
|
, confBootstrapNodes :: [(String, PortNumber)]
|
||||||
-- ^ list of potential bootstrapping nodes
|
--, confStabiliseInterval :: Int
|
||||||
, confBootstrapSamplingInterval :: Int
|
|
||||||
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
|
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue