Compare commits
	
		
			2 commits
		
	
	
		
			d293cc05d1
			...
			61ea6ed3ff
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 61ea6ed3ff | |||
| 56ca2b53cc | 
					 3 changed files with 102 additions and 40 deletions
				
			
		
							
								
								
									
										12
									
								
								app/Main.hs
									
										
									
									
									
								
							
							
						
						
									
										12
									
								
								app/Main.hs
									
										
									
									
									
								
							| 
						 | 
					@ -21,17 +21,8 @@ main = do
 | 
				
			||||||
    (serverSock, thisNode) <- fediChordInit conf
 | 
					    (serverSock, thisNode) <- fediChordInit conf
 | 
				
			||||||
    -- currently no masking is necessary, as there is nothing to clean up
 | 
					    -- currently no masking is necessary, as there is nothing to clean up
 | 
				
			||||||
    cacheWriterThread <- forkIO $ cacheWriter thisNode
 | 
					    cacheWriterThread <- forkIO $ cacheWriter thisNode
 | 
				
			||||||
    thisNodeSnap <- readTVarIO thisNode
 | 
					 | 
				
			||||||
    realNode <- readTVarIO $ parentRealNode thisNodeSnap
 | 
					 | 
				
			||||||
    -- try joining the DHT using one of the provided bootstrapping nodes
 | 
					    -- try joining the DHT using one of the provided bootstrapping nodes
 | 
				
			||||||
    let
 | 
					    joinedState <- tryBootstrapJoining thisNode
 | 
				
			||||||
        tryJoining (bn:bns) = do
 | 
					 | 
				
			||||||
            j <- fediChordBootstrapJoin thisNode bn
 | 
					 | 
				
			||||||
            case j of
 | 
					 | 
				
			||||||
              Left err -> putStrLn ("join error: " <> err) >> tryJoining bns
 | 
					 | 
				
			||||||
              Right joined -> pure $ Right joined
 | 
					 | 
				
			||||||
        tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
 | 
					 | 
				
			||||||
    joinedState <- tryJoining $ bootstrapNodes realNode
 | 
					 | 
				
			||||||
    either (\err -> do
 | 
					    either (\err -> do
 | 
				
			||||||
        -- handle unsuccessful join
 | 
					        -- handle unsuccessful join
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -59,4 +50,5 @@ readConfig = do
 | 
				
			||||||
      , confDhtPort = read portString
 | 
					      , confDhtPort = read portString
 | 
				
			||||||
      , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
 | 
					      , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
 | 
				
			||||||
      --, confStabiliseInterval = 60
 | 
					      --, confStabiliseInterval = 60
 | 
				
			||||||
 | 
					      , confBootstrapSamplingInterval = 180
 | 
				
			||||||
                           }
 | 
					                           }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -40,6 +40,7 @@ module Hash2Pub.FediChord (
 | 
				
			||||||
  , fediChordInit
 | 
					  , fediChordInit
 | 
				
			||||||
  , fediChordJoin
 | 
					  , fediChordJoin
 | 
				
			||||||
  , fediChordBootstrapJoin
 | 
					  , fediChordBootstrapJoin
 | 
				
			||||||
 | 
					  , tryBootstrapJoining
 | 
				
			||||||
  , fediMainThreads
 | 
					  , fediMainThreads
 | 
				
			||||||
  , RealNode (..)
 | 
					  , RealNode (..)
 | 
				
			||||||
  , nodeStateInit
 | 
					  , nodeStateInit
 | 
				
			||||||
| 
						 | 
					@ -81,6 +82,7 @@ import           Network.Socket                hiding (recv, recvFrom, send,
 | 
				
			||||||
                                                sendTo)
 | 
					                                                sendTo)
 | 
				
			||||||
import           Network.Socket.ByteString
 | 
					import           Network.Socket.ByteString
 | 
				
			||||||
import           Safe
 | 
					import           Safe
 | 
				
			||||||
 | 
					import           System.Random                 (randomRIO)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import           Hash2Pub.DHTProtocol
 | 
					import           Hash2Pub.DHTProtocol
 | 
				
			||||||
import           Hash2Pub.FediChordTypes
 | 
					import           Hash2Pub.FediChordTypes
 | 
				
			||||||
| 
						 | 
					@ -141,17 +143,82 @@ fediChordBootstrapJoin :: LocalNodeStateSTM              -- ^ the local 'NodeSta
 | 
				
			||||||
                        -> (String, PortNumber)   -- ^ domain and port of a bootstrapping node
 | 
					                        -> (String, PortNumber)   -- ^ domain and port of a bootstrapping node
 | 
				
			||||||
                        -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
 | 
					                        -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
 | 
				
			||||||
                                            -- successful join, otherwise an error message
 | 
					                                            -- successful join, otherwise an error message
 | 
				
			||||||
fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
 | 
					fediChordBootstrapJoin nsSTM bootstrapNode = do
 | 
				
			||||||
    -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
 | 
					    -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
 | 
				
			||||||
    bracket (mkSendSocket joinHost joinPort) close (\sock -> do
 | 
					 | 
				
			||||||
        putStrLn "BootstrapJoin"
 | 
					 | 
				
			||||||
        -- 1. get routed to placement of own ID until FOUND:
 | 
					 | 
				
			||||||
        -- Initialise an empty cache only with the responses from a bootstrapping node
 | 
					 | 
				
			||||||
    ns <- readTVarIO nsSTM
 | 
					    ns <- readTVarIO nsSTM
 | 
				
			||||||
        bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock
 | 
					    runExceptT $ do
 | 
				
			||||||
        if bootstrapResponse == Set.empty
 | 
					        -- 1. get routed to the currently responsible node
 | 
				
			||||||
           then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
 | 
					        lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns
 | 
				
			||||||
           else do
 | 
					        currentlyResponsible <- liftEither lookupResp
 | 
				
			||||||
 | 
					        liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
 | 
				
			||||||
 | 
					        -- 2. then send a join to the currently responsible node
 | 
				
			||||||
 | 
					        joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
 | 
				
			||||||
 | 
					        liftEither joinResult
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
 | 
				
			||||||
 | 
					-- Unjoined try joining instead.
 | 
				
			||||||
 | 
					convergenceSampleThread :: LocalNodeStateSTM -> IO ()
 | 
				
			||||||
 | 
					convergenceSampleThread nsSTM = forever $ do
 | 
				
			||||||
 | 
					    nsSnap <- readTVarIO nsSTM
 | 
				
			||||||
 | 
					    parentNode <- readTVarIO $ parentRealNode nsSnap
 | 
				
			||||||
 | 
					    if isJoined nsSnap
 | 
				
			||||||
 | 
					       then
 | 
				
			||||||
 | 
					        runExceptT (do
 | 
				
			||||||
 | 
					            -- joined node: choose random node, do queryIDLoop, compare result with own responsibility
 | 
				
			||||||
 | 
					            let bss = bootstrapNodes parentNode
 | 
				
			||||||
 | 
					            randIndex <- liftIO $ randomRIO (0, length bss - 1)
 | 
				
			||||||
 | 
					            chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
 | 
				
			||||||
 | 
					            lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
 | 
				
			||||||
 | 
					            currentlyResponsible <- liftEither lookupResult
 | 
				
			||||||
 | 
					            if getNid currentlyResponsible /= getNid nsSnap
 | 
				
			||||||
 | 
					               -- if mismatch, stabilise on the result, else do nothing
 | 
				
			||||||
 | 
					               then do
 | 
				
			||||||
 | 
					                   stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
 | 
				
			||||||
 | 
					                   (preds, succs) <- liftEither stabResult
 | 
				
			||||||
 | 
					                   -- TODO: verify neighbours before adding, see #55
 | 
				
			||||||
 | 
					                   liftIO . atomically $ do
 | 
				
			||||||
 | 
					                       ns <- readTVar nsSTM
 | 
				
			||||||
 | 
					                       writeTVar nsSTM $ addPredecessors preds ns
 | 
				
			||||||
 | 
					               else pure ()
 | 
				
			||||||
 | 
					                   ) >> pure ()
 | 
				
			||||||
 | 
					    -- unjoined node: try joining through all bootstrapping nodes
 | 
				
			||||||
 | 
					    else tryBootstrapJoining nsSTM >> pure ()
 | 
				
			||||||
 | 
					    let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
 | 
				
			||||||
 | 
					    threadDelay $ delaySecs * 10^6
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
 | 
				
			||||||
 | 
					tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM)
 | 
				
			||||||
 | 
					tryBootstrapJoining nsSTM = do
 | 
				
			||||||
 | 
					    bss <- atomically $ do
 | 
				
			||||||
 | 
					        nsSnap <- readTVar nsSTM
 | 
				
			||||||
 | 
					        realNodeSnap <- readTVar $ parentRealNode nsSnap
 | 
				
			||||||
 | 
					        pure $ bootstrapNodes realNodeSnap
 | 
				
			||||||
 | 
					    tryJoining bss
 | 
				
			||||||
 | 
					  where
 | 
				
			||||||
 | 
					    tryJoining (bn:bns) = do
 | 
				
			||||||
 | 
					        j <- fediChordBootstrapJoin nsSTM bn
 | 
				
			||||||
 | 
					        case j of
 | 
				
			||||||
 | 
					          Left err     -> putStrLn ("join error: " <> err) >> tryJoining bns
 | 
				
			||||||
 | 
					          Right joined -> pure $ Right joined
 | 
				
			||||||
 | 
					    tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					-- | Look up a key just based on the responses of a single bootstrapping node.
 | 
				
			||||||
 | 
					bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
 | 
				
			||||||
 | 
					bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
 | 
				
			||||||
 | 
					    ns <- readTVarIO nsSTM
 | 
				
			||||||
 | 
					    bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close (
 | 
				
			||||||
 | 
					        -- Initialise an empty cache only with the responses from a bootstrapping node
 | 
				
			||||||
 | 
					        fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
 | 
				
			||||||
 | 
					                                                                                  )
 | 
				
			||||||
 | 
					       `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    case bootstrapResponse of
 | 
				
			||||||
 | 
					      Left err -> pure $ Left err
 | 
				
			||||||
 | 
					      Right resp
 | 
				
			||||||
 | 
					        | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
 | 
				
			||||||
 | 
					        | otherwise -> do
 | 
				
			||||||
               now <- getPOSIXTime
 | 
					               now <- getPOSIXTime
 | 
				
			||||||
               -- create new cache with all returned node responses
 | 
					               -- create new cache with all returned node responses
 | 
				
			||||||
               let bootstrapCache =
 | 
					               let bootstrapCache =
 | 
				
			||||||
| 
						 | 
					@ -161,23 +228,20 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
 | 
				
			||||||
                           Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
 | 
					                           Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
 | 
				
			||||||
                           Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
 | 
					                           Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
 | 
				
			||||||
                              )
 | 
					                              )
 | 
				
			||||||
                           initCache bootstrapResponse
 | 
					                           initCache resp
 | 
				
			||||||
               fediChordJoin bootstrapCache nsSTM
 | 
					               currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
 | 
				
			||||||
                                                   )
 | 
					               pure $ Right currentlyResponsible
 | 
				
			||||||
       `catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
 | 
					
 | 
				
			||||||
 | 
					-- | join a node to the DHT using the global node cache
 | 
				
			||||||
-- node's position.
 | 
					-- node's position.
 | 
				
			||||||
fediChordJoin :: NodeCache                          -- ^ a snapshot of the NodeCache to
 | 
					fediChordJoin :: LocalNodeStateSTM                    -- ^ the local 'NodeState'
 | 
				
			||||||
                                                    -- use for ID lookup
 | 
					 | 
				
			||||||
              -> LocalNodeStateSTM                    -- ^ the local 'NodeState'
 | 
					 | 
				
			||||||
              -> IO (Either String LocalNodeStateSTM)  -- ^ the joined 'NodeState' after a
 | 
					              -> IO (Either String LocalNodeStateSTM)  -- ^ the joined 'NodeState' after a
 | 
				
			||||||
                                                    -- successful join, otherwise an error message
 | 
					                                                    -- successful join, otherwise an error message
 | 
				
			||||||
fediChordJoin cacheSnapshot nsSTM = do
 | 
					fediChordJoin nsSTM = do
 | 
				
			||||||
    ns <- readTVarIO nsSTM
 | 
					    ns <- readTVarIO nsSTM
 | 
				
			||||||
    -- get routed to the currently responsible node, based on the response
 | 
					    -- 1. get routed to the currently responsible node
 | 
				
			||||||
    -- from the bootstrapping node
 | 
					    currentlyResponsible <- requestQueryID ns $ getNid ns
 | 
				
			||||||
    currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
 | 
					 | 
				
			||||||
    putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
 | 
					    putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
 | 
				
			||||||
    -- 2. then send a join to the currently responsible node
 | 
					    -- 2. then send a join to the currently responsible node
 | 
				
			||||||
    joinResult <- requestJoin currentlyResponsible nsSTM
 | 
					    joinResult <- requestJoin currentlyResponsible nsSTM
 | 
				
			||||||
| 
						 | 
					@ -206,7 +270,7 @@ joinOnNewEntriesThread nsSTM = loop
 | 
				
			||||||
              pure ()
 | 
					              pure ()
 | 
				
			||||||
          -- otherwise try joining
 | 
					          -- otherwise try joining
 | 
				
			||||||
          FORWARD _ -> do
 | 
					          FORWARD _ -> do
 | 
				
			||||||
              joinResult <- fediChordJoin cache  nsSTM
 | 
					              joinResult <- fediChordJoin nsSTM
 | 
				
			||||||
              either
 | 
					              either
 | 
				
			||||||
                -- on join failure, sleep and retry
 | 
					                -- on join failure, sleep and retry
 | 
				
			||||||
                -- TODO: make delay configurable
 | 
					                -- TODO: make delay configurable
 | 
				
			||||||
| 
						 | 
					@ -477,6 +541,7 @@ fediMainThreads sock nsSTM = do
 | 
				
			||||||
        (fediMessageHandler sendQ recvQ nsSTM) $
 | 
					        (fediMessageHandler sendQ recvQ nsSTM) $
 | 
				
			||||||
        concurrently_ (stabiliseThread nsSTM) $
 | 
					        concurrently_ (stabiliseThread nsSTM) $
 | 
				
			||||||
            concurrently_ (cacheVerifyThread nsSTM) $
 | 
					            concurrently_ (cacheVerifyThread nsSTM) $
 | 
				
			||||||
 | 
					                concurrently_ (convergenceSampleThread nsSTM) $
 | 
				
			||||||
                    concurrently_
 | 
					                    concurrently_
 | 
				
			||||||
                        (sendThread sock sendQ)
 | 
					                        (sendThread sock sendQ)
 | 
				
			||||||
                        (recvThread sock recvQ)
 | 
					                        (recvThread sock recvQ)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -589,10 +589,15 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
 | 
				
			||||||
-- | configuration values used for initialising the FediChord DHT
 | 
					-- | configuration values used for initialising the FediChord DHT
 | 
				
			||||||
data FediChordConf = FediChordConf
 | 
					data FediChordConf = FediChordConf
 | 
				
			||||||
    { confDomain                    :: String
 | 
					    { confDomain                    :: String
 | 
				
			||||||
 | 
					    -- ^ the domain/ hostname the node is reachable under
 | 
				
			||||||
    , confIP                        :: HostAddress6
 | 
					    , confIP                        :: HostAddress6
 | 
				
			||||||
 | 
					    -- ^ IP address of outgoing packets
 | 
				
			||||||
    , confDhtPort                   :: Int
 | 
					    , confDhtPort                   :: Int
 | 
				
			||||||
 | 
					    -- ^ listening port for the FediChord DHT
 | 
				
			||||||
    , confBootstrapNodes            :: [(String, PortNumber)]
 | 
					    , confBootstrapNodes            :: [(String, PortNumber)]
 | 
				
			||||||
    --, confStabiliseInterval :: Int
 | 
					    -- ^ list of potential bootstrapping nodes
 | 
				
			||||||
 | 
					    , confBootstrapSamplingInterval :: Int
 | 
				
			||||||
 | 
					    -- ^ pause between sampling the own ID through bootstrap nodes, in seconds
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    deriving (Show, Eq)
 | 
					    deriving (Show, Eq)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue