Compare commits
	
		
			No commits in common. "61ea6ed3ff90266fda46428518e4cf4de0809048" and "d293cc05d148ebe7b9019afffcfdab5ff44719d5" have entirely different histories.
		
	
	
		
			61ea6ed3ff
			...
			d293cc05d1
		
	
		
					 3 changed files with 40 additions and 102 deletions
				
			
		
							
								
								
									
										12
									
								
								app/Main.hs
									
										
									
									
									
								
							
							
						
						
									
										12
									
								
								app/Main.hs
									
										
									
									
									
								
							| 
						 | 
				
			
			@ -21,8 +21,17 @@ main = do
 | 
			
		|||
    (serverSock, thisNode) <- fediChordInit conf
 | 
			
		||||
    -- currently no masking is necessary, as there is nothing to clean up
 | 
			
		||||
    cacheWriterThread <- forkIO $ cacheWriter thisNode
 | 
			
		||||
    thisNodeSnap <- readTVarIO thisNode
 | 
			
		||||
    realNode <- readTVarIO $ parentRealNode thisNodeSnap
 | 
			
		||||
    -- try joining the DHT using one of the provided bootstrapping nodes
 | 
			
		||||
    joinedState <- tryBootstrapJoining thisNode
 | 
			
		||||
    let
 | 
			
		||||
        tryJoining (bn:bns) = do
 | 
			
		||||
            j <- fediChordBootstrapJoin thisNode bn
 | 
			
		||||
            case j of
 | 
			
		||||
              Left err -> putStrLn ("join error: " <> err) >> tryJoining bns
 | 
			
		||||
              Right joined -> pure $ Right joined
 | 
			
		||||
        tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
 | 
			
		||||
    joinedState <- tryJoining $ bootstrapNodes realNode
 | 
			
		||||
    either (\err -> do
 | 
			
		||||
        -- handle unsuccessful join
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -50,5 +59,4 @@ readConfig = do
 | 
			
		|||
      , confDhtPort = read portString
 | 
			
		||||
      , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
 | 
			
		||||
      --, confStabiliseInterval = 60
 | 
			
		||||
      , confBootstrapSamplingInterval = 180
 | 
			
		||||
                           }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -40,7 +40,6 @@ module Hash2Pub.FediChord (
 | 
			
		|||
  , fediChordInit
 | 
			
		||||
  , fediChordJoin
 | 
			
		||||
  , fediChordBootstrapJoin
 | 
			
		||||
  , tryBootstrapJoining
 | 
			
		||||
  , fediMainThreads
 | 
			
		||||
  , RealNode (..)
 | 
			
		||||
  , nodeStateInit
 | 
			
		||||
| 
						 | 
				
			
			@ -82,7 +81,6 @@ import           Network.Socket                hiding (recv, recvFrom, send,
 | 
			
		|||
                                                sendTo)
 | 
			
		||||
import           Network.Socket.ByteString
 | 
			
		||||
import           Safe
 | 
			
		||||
import           System.Random                 (randomRIO)
 | 
			
		||||
 | 
			
		||||
import           Hash2Pub.DHTProtocol
 | 
			
		||||
import           Hash2Pub.FediChordTypes
 | 
			
		||||
| 
						 | 
				
			
			@ -143,82 +141,17 @@ fediChordBootstrapJoin :: LocalNodeStateSTM              -- ^ the local 'NodeSta
 | 
			
		|||
                        -> (String, PortNumber)   -- ^ domain and port of a bootstrapping node
 | 
			
		||||
                        -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a
 | 
			
		||||
                                            -- successful join, otherwise an error message
 | 
			
		||||
fediChordBootstrapJoin nsSTM bootstrapNode = do
 | 
			
		||||
fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
 | 
			
		||||
    -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
 | 
			
		||||
    ns <- readTVarIO nsSTM
 | 
			
		||||
    runExceptT $ do
 | 
			
		||||
        -- 1. get routed to the currently responsible node
 | 
			
		||||
        lookupResp <- liftIO $ bootstrapQueryId nsSTM bootstrapNode $ getNid ns
 | 
			
		||||
        currentlyResponsible <- liftEither lookupResp
 | 
			
		||||
        liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
 | 
			
		||||
        -- 2. then send a join to the currently responsible node
 | 
			
		||||
        joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
 | 
			
		||||
        liftEither joinResult
 | 
			
		||||
 | 
			
		||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
 | 
			
		||||
-- Unjoined try joining instead.
 | 
			
		||||
convergenceSampleThread :: LocalNodeStateSTM -> IO ()
 | 
			
		||||
convergenceSampleThread nsSTM = forever $ do
 | 
			
		||||
    nsSnap <- readTVarIO nsSTM
 | 
			
		||||
    parentNode <- readTVarIO $ parentRealNode nsSnap
 | 
			
		||||
    if isJoined nsSnap
 | 
			
		||||
       then
 | 
			
		||||
        runExceptT (do
 | 
			
		||||
            -- joined node: choose random node, do queryIDLoop, compare result with own responsibility
 | 
			
		||||
            let bss = bootstrapNodes parentNode
 | 
			
		||||
            randIndex <- liftIO $ randomRIO (0, length bss - 1)
 | 
			
		||||
            chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
 | 
			
		||||
            lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
 | 
			
		||||
            currentlyResponsible <- liftEither lookupResult
 | 
			
		||||
            if getNid currentlyResponsible /= getNid nsSnap
 | 
			
		||||
               -- if mismatch, stabilise on the result, else do nothing
 | 
			
		||||
               then do
 | 
			
		||||
                   stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
 | 
			
		||||
                   (preds, succs) <- liftEither stabResult
 | 
			
		||||
                   -- TODO: verify neighbours before adding, see #55
 | 
			
		||||
                   liftIO . atomically $ do
 | 
			
		||||
                       ns <- readTVar nsSTM
 | 
			
		||||
                       writeTVar nsSTM $ addPredecessors preds ns
 | 
			
		||||
               else pure ()
 | 
			
		||||
                   ) >> pure ()
 | 
			
		||||
    -- unjoined node: try joining through all bootstrapping nodes
 | 
			
		||||
    else tryBootstrapJoining nsSTM >> pure ()
 | 
			
		||||
    let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
 | 
			
		||||
    threadDelay $ delaySecs * 10^6
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
 | 
			
		||||
tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM)
 | 
			
		||||
tryBootstrapJoining nsSTM = do
 | 
			
		||||
    bss <- atomically $ do
 | 
			
		||||
        nsSnap <- readTVar nsSTM
 | 
			
		||||
        realNodeSnap <- readTVar $ parentRealNode nsSnap
 | 
			
		||||
        pure $ bootstrapNodes realNodeSnap
 | 
			
		||||
    tryJoining bss
 | 
			
		||||
  where
 | 
			
		||||
    tryJoining (bn:bns) = do
 | 
			
		||||
        j <- fediChordBootstrapJoin nsSTM bn
 | 
			
		||||
        case j of
 | 
			
		||||
          Left err     -> putStrLn ("join error: " <> err) >> tryJoining bns
 | 
			
		||||
          Right joined -> pure $ Right joined
 | 
			
		||||
    tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-- | Look up a key just based on the responses of a single bootstrapping node.
 | 
			
		||||
bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
 | 
			
		||||
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
 | 
			
		||||
    ns <- readTVarIO nsSTM
 | 
			
		||||
    bootstrapResponse <- bracket (mkSendSocket bootstrapHost bootstrapPort) close (
 | 
			
		||||
    bracket (mkSendSocket joinHost joinPort) close (\sock -> do
 | 
			
		||||
        putStrLn "BootstrapJoin"
 | 
			
		||||
        -- 1. get routed to placement of own ID until FOUND:
 | 
			
		||||
        -- Initialise an empty cache only with the responses from a bootstrapping node
 | 
			
		||||
        fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing)
 | 
			
		||||
                                                                                  )
 | 
			
		||||
       `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
 | 
			
		||||
 | 
			
		||||
    case bootstrapResponse of
 | 
			
		||||
      Left err -> pure $ Left err
 | 
			
		||||
      Right resp
 | 
			
		||||
        | resp == Set.empty -> pure . Left $ "Bootstrapping node " <> show bootstrapHost <> " gave no response."
 | 
			
		||||
        | otherwise -> do
 | 
			
		||||
        ns <- readTVarIO nsSTM
 | 
			
		||||
        bootstrapResponse <- sendRequestTo 5000 3 (lookupMessage (getNid ns) ns Nothing) sock
 | 
			
		||||
        if bootstrapResponse == Set.empty
 | 
			
		||||
           then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
 | 
			
		||||
           else do
 | 
			
		||||
               now <- getPOSIXTime
 | 
			
		||||
               -- create new cache with all returned node responses
 | 
			
		||||
               let bootstrapCache =
 | 
			
		||||
| 
						 | 
				
			
			@ -228,20 +161,23 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
 | 
			
		|||
                           Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
 | 
			
		||||
                           Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
 | 
			
		||||
                              )
 | 
			
		||||
                           initCache resp
 | 
			
		||||
               currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
 | 
			
		||||
               pure $ Right currentlyResponsible
 | 
			
		||||
                           initCache bootstrapResponse
 | 
			
		||||
               fediChordJoin bootstrapCache nsSTM
 | 
			
		||||
                                                   )
 | 
			
		||||
       `catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-- | join a node to the DHT using the global node cache
 | 
			
		||||
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
 | 
			
		||||
-- node's position.
 | 
			
		||||
fediChordJoin :: LocalNodeStateSTM                    -- ^ the local 'NodeState'
 | 
			
		||||
fediChordJoin :: NodeCache                          -- ^ a snapshot of the NodeCache to
 | 
			
		||||
                                                    -- use for ID lookup
 | 
			
		||||
              -> LocalNodeStateSTM                    -- ^ the local 'NodeState'
 | 
			
		||||
              -> IO (Either String LocalNodeStateSTM)  -- ^ the joined 'NodeState' after a
 | 
			
		||||
                                                    -- successful join, otherwise an error message
 | 
			
		||||
fediChordJoin nsSTM = do
 | 
			
		||||
fediChordJoin cacheSnapshot nsSTM = do
 | 
			
		||||
    ns <- readTVarIO nsSTM
 | 
			
		||||
    -- 1. get routed to the currently responsible node
 | 
			
		||||
    currentlyResponsible <- requestQueryID ns $ getNid ns
 | 
			
		||||
    -- get routed to the currently responsible node, based on the response
 | 
			
		||||
    -- from the bootstrapping node
 | 
			
		||||
    currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
 | 
			
		||||
    putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
 | 
			
		||||
    -- 2. then send a join to the currently responsible node
 | 
			
		||||
    joinResult <- requestJoin currentlyResponsible nsSTM
 | 
			
		||||
| 
						 | 
				
			
			@ -270,7 +206,7 @@ joinOnNewEntriesThread nsSTM = loop
 | 
			
		|||
              pure ()
 | 
			
		||||
          -- otherwise try joining
 | 
			
		||||
          FORWARD _ -> do
 | 
			
		||||
              joinResult <- fediChordJoin nsSTM
 | 
			
		||||
              joinResult <- fediChordJoin cache  nsSTM
 | 
			
		||||
              either
 | 
			
		||||
                -- on join failure, sleep and retry
 | 
			
		||||
                -- TODO: make delay configurable
 | 
			
		||||
| 
						 | 
				
			
			@ -541,7 +477,6 @@ fediMainThreads sock nsSTM = do
 | 
			
		|||
        (fediMessageHandler sendQ recvQ nsSTM) $
 | 
			
		||||
        concurrently_ (stabiliseThread nsSTM) $
 | 
			
		||||
            concurrently_ (cacheVerifyThread nsSTM) $
 | 
			
		||||
                concurrently_ (convergenceSampleThread nsSTM) $
 | 
			
		||||
                concurrently_
 | 
			
		||||
                    (sendThread sock sendQ)
 | 
			
		||||
                    (recvThread sock recvQ)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -589,15 +589,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
 | 
			
		|||
-- | configuration values used for initialising the FediChord DHT
 | 
			
		||||
data FediChordConf = FediChordConf
 | 
			
		||||
    { confDomain         :: String
 | 
			
		||||
    -- ^ the domain/ hostname the node is reachable under
 | 
			
		||||
    , confIP             :: HostAddress6
 | 
			
		||||
    -- ^ IP address of outgoing packets
 | 
			
		||||
    , confDhtPort        :: Int
 | 
			
		||||
    -- ^ listening port for the FediChord DHT
 | 
			
		||||
    , confBootstrapNodes :: [(String, PortNumber)]
 | 
			
		||||
    -- ^ list of potential bootstrapping nodes
 | 
			
		||||
    , confBootstrapSamplingInterval :: Int
 | 
			
		||||
    -- ^ pause between sampling the own ID through bootstrap nodes, in seconds
 | 
			
		||||
    --, confStabiliseInterval :: Int
 | 
			
		||||
    }
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue