Compare commits
	
		
			3 commits
		
	
	
		
			e3bfa26ddb
			...
			702684b1a9
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 702684b1a9 | |||
| 43eb04dfea | |||
| ad1465c5fe | 
					 4 changed files with 38 additions and 24 deletions
				
			
		|  | @ -203,14 +203,14 @@ queryIdLookupLoop cacheSnapshot ns targetID = do | |||
|           now <- getPOSIXTime | ||||
|           newLCache <- foldM (\oldCache resp -> do | ||||
|             let entriesToInsert = case queryResult <$> payload resp of | ||||
|                              Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] | ||||
|                              Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset | ||||
|                              Just (FOUND result1) -> [RemoteCacheEntry result1 now] | ||||
|                              Just (FORWARD resultset) -> Set.elems resultset | ||||
|                              _ -> [] | ||||
|             -- forward entries to global cache | ||||
|             forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry | ||||
|             queueAddEntries entriesToInsert ns | ||||
|             -- insert entries into local cache copy | ||||
|             pure $ foldl' ( | ||||
|                 \oldLCache insertFunc -> insertFunc oldLCache | ||||
|             pure $ foldr' ( | ||||
|                 addCacheEntryPure now | ||||
|                           ) oldCache entriesToInsert | ||||
|                             ) cacheSnapshot responses | ||||
| 
 | ||||
|  | @ -283,7 +283,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do | |||
| 
 | ||||
| 
 | ||||
| -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache | ||||
| queueAddEntries :: [RemoteCacheEntry] | ||||
| queueAddEntries :: Foldable c => c RemoteCacheEntry | ||||
|                 -> LocalNodeState | ||||
|                 -> IO () | ||||
| queueAddEntries entries ns = do | ||||
|  | @ -334,4 +334,5 @@ mkSendSocket dest destPort = do | |||
|     destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) | ||||
|     sendSock <- socket AF_INET6 Datagram defaultProtocol | ||||
|     setSocketOption sendSock IPv6Only 1 | ||||
|     connect sendSock destAddr | ||||
|     pure sendSock | ||||
|  |  | |||
|  | @ -37,6 +37,8 @@ module Hash2Pub.FediChord ( | |||
|   , bsAsIpAddr | ||||
|   , FediChordConf(..) | ||||
|   , fediChordInit | ||||
|   , fediChordJoin | ||||
|   , fediChordBootstrapJoin | ||||
|   , nodeStateInit | ||||
|   , mkServerSocket | ||||
|   , mkSendSocket | ||||
|  | @ -109,11 +111,13 @@ nodeStateInit conf = do | |||
|                                            } | ||||
|     pure initialState | ||||
| 
 | ||||
| fediChordJoin :: LocalNodeState              -- ^ the local 'NodeState' | ||||
|               -> (String, PortNumber)   -- ^ domain and port of a bootstrapping node | ||||
|               -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a | ||||
| -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed | ||||
| -- for resolving the new node's position. | ||||
| fediChordBootstrapJoin :: LocalNodeState              -- ^ the local 'NodeState' | ||||
|                         -> (String, PortNumber)   -- ^ domain and port of a bootstrapping node | ||||
|                         -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a | ||||
|                                             -- successful join, otherwise an error message | ||||
| fediChordJoin ns (joinHost, joinPort) = do | ||||
| fediChordBootstrapJoin ns (joinHost, joinPort) = do | ||||
|     -- can be invoked multiple times with all known bootstrapping nodes until successfully joined | ||||
|     sock <- mkSendSocket joinHost joinPort | ||||
|     -- 1. get routed to placement of own ID until FOUND: | ||||
|  | @ -132,18 +136,24 @@ fediChordJoin ns (joinHost, joinPort) = do | |||
|                        Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset | ||||
|                           ) | ||||
|                        initCache bootstrapResponse | ||||
|            -- get routed to the currently responsible node, based on the response | ||||
|            -- from the bootstrapping node | ||||
|            currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns | ||||
|            -- do actual join | ||||
|            joinResult <- requestJoin currentlyResponsible ns | ||||
|            case joinResult of | ||||
|              Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible | ||||
|              Just joinedNS -> pure . Right $ joinedNS | ||||
| 
 | ||||
|            fediChordJoin bootstrapCache ns | ||||
| 
 | ||||
| -- | join a node to the DHT, using the provided cache snapshot for resolving the new | ||||
| -- node's position. | ||||
| fediChordJoin :: NodeCache                          -- ^ a snapshot of the NodeCache to | ||||
|                                                     -- use for ID lookup | ||||
|               -> LocalNodeState                     -- ^ the local 'NodeState' | ||||
|               -> IO (Either String LocalNodeState)  -- ^ the joined 'NodeState' after a | ||||
|                                                     -- successful join, otherwise an error message | ||||
| fediChordJoin cacheSnapshot ns = do | ||||
|     -- get routed to the currently responsible node, based on the response | ||||
|     -- from the bootstrapping node | ||||
|     currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns $ getNid ns | ||||
|     -- 2. then send a join to the currently responsible node | ||||
|     -- after successful join, finally add own node to the cache | ||||
|     joinResult <- requestJoin currentlyResponsible ns | ||||
|     case joinResult of | ||||
|       Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible | ||||
|       Just joinedNS -> pure . Right $ joinedNS | ||||
| 
 | ||||
| 
 | ||||
| -- | cache updater thread that waits for incoming NodeCache update instructions on | ||||
|  |  | |||
|  | @ -419,9 +419,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs | |||
| 
 | ||||
| -- | configuration values used for initialising the FediChord DHT | ||||
| data FediChordConf = FediChordConf | ||||
|     { confDomain  :: String | ||||
|     , confIP      :: HostAddress6 | ||||
|     , confDhtPort :: Int | ||||
|     { confDomain         :: String | ||||
|     , confIP             :: HostAddress6 | ||||
|     , confDhtPort        :: Int | ||||
|     , confBootstrapNodes :: [(String, PortNumber)] | ||||
|     } | ||||
|     deriving (Show, Eq) | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,15 +18,17 @@ main = do | |||
|     -- currently no masking is necessary, as there is nothing to clean up | ||||
|     cacheWriterThread <- forkIO $ cacheWriter thisNode | ||||
|     -- idea: list of bootstrapping nodes, try joining within a timeout | ||||
|     joinedState <- fediChordBootstrapJoin thisNode $ head . confBootstrapNodes $ conf | ||||
|     -- stop main thread from terminating during development | ||||
|     getChar | ||||
|     pure () | ||||
| 
 | ||||
| readConfig :: IO FediChordConf | ||||
| readConfig = do | ||||
|     confDomainString : ipString : portString : _ <- getArgs | ||||
|     confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs | ||||
|     pure $ FediChordConf { | ||||
|         confDomain = confDomainString | ||||
|       , confIP = toHostAddress6 . read $ ipString | ||||
|       , confDhtPort = read portString | ||||
|       , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] | ||||
|                            } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue