Compare commits
	
		
			2 commits
		
	
	
		
			bdb92411c6
			...
			e91f317a8e
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| e91f317a8e | |||
| 7c17e3a44d | 
					 2 changed files with 35 additions and 4 deletions
				
			
		|  | @ -18,8 +18,6 @@ main = do | ||||||
|     conf <- readConfig |     conf <- readConfig | ||||||
|     -- ToDo: load persisted caches, bootstrapping nodes … |     -- ToDo: load persisted caches, bootstrapping nodes … | ||||||
|     (serverSock, thisNode) <- fediChordInit conf |     (serverSock, thisNode) <- fediChordInit conf | ||||||
|     print =<< readTVarIO thisNode |  | ||||||
|     print serverSock |  | ||||||
|     -- currently no masking is necessary, as there is nothing to clean up |     -- currently no masking is necessary, as there is nothing to clean up | ||||||
|     cacheWriterThread <- forkIO $ cacheWriter thisNode |     cacheWriterThread <- forkIO $ cacheWriter thisNode | ||||||
|     -- try joining the DHT using one of the provided bootstrapping nodes |     -- try joining the DHT using one of the provided bootstrapping nodes | ||||||
|  | @ -36,8 +34,9 @@ main = do | ||||||
| 
 | 
 | ||||||
|         putStrLn $ err <> " Error joining, start listening for incoming requests anyways" |         putStrLn $ err <> " Error joining, start listening for incoming requests anyways" | ||||||
|         print =<< readTVarIO thisNode |         print =<< readTVarIO thisNode | ||||||
|  |         -- launch thread attempting to join on new cache entries | ||||||
|  |         _ <- forkIO $ joinOnNewEntriesThread thisNode | ||||||
|         wait =<< async (fediMainThreads serverSock thisNode) |         wait =<< async (fediMainThreads serverSock thisNode) | ||||||
|         -- TODO: periodic retry |  | ||||||
|            ) |            ) | ||||||
|            (\joinedNS -> do |            (\joinedNS -> do | ||||||
|         -- launch main eventloop with successfully joined state |         -- launch main eventloop with successfully joined state | ||||||
|  |  | ||||||
|  | @ -46,6 +46,7 @@ module Hash2Pub.FediChord ( | ||||||
|   , mkSendSocket |   , mkSendSocket | ||||||
|   , resolve |   , resolve | ||||||
|   , cacheWriter |   , cacheWriter | ||||||
|  |   , joinOnNewEntriesThread | ||||||
|                            ) where |                            ) where | ||||||
| 
 | 
 | ||||||
| import           Control.Applicative           ((<|>)) | import           Control.Applicative           ((<|>)) | ||||||
|  | @ -166,6 +167,7 @@ fediChordJoin cacheSnapshot nsSTM = do | ||||||
|     -- get routed to the currently responsible node, based on the response |     -- get routed to the currently responsible node, based on the response | ||||||
|     -- from the bootstrapping node |     -- from the bootstrapping node | ||||||
|     currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns |     currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns | ||||||
|  |     putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) | ||||||
|     -- 2. then send a join to the currently responsible node |     -- 2. then send a join to the currently responsible node | ||||||
|     joinResult <- requestJoin currentlyResponsible nsSTM |     joinResult <- requestJoin currentlyResponsible nsSTM | ||||||
|     case joinResult of |     case joinResult of | ||||||
|  | @ -173,6 +175,36 @@ fediChordJoin cacheSnapshot nsSTM = do | ||||||
|       Right joinedNS -> pure . Right $ joinedNS |       Right joinedNS -> pure . Right $ joinedNS | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | -- | Wait for new cache entries to appear and then try joining on them. | ||||||
|  | -- Exits after successful joining. | ||||||
|  | joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () | ||||||
|  | joinOnNewEntriesThread nsSTM = loop | ||||||
|  |   where | ||||||
|  |     loop = do | ||||||
|  |         nsSnap <- readTVarIO nsSTM | ||||||
|  |         (lookupResult, cache) <- atomically $ do | ||||||
|  |             cache <- readTVar $ nodeCacheSTM nsSnap | ||||||
|  |             case queryLocalCache nsSnap cache 1 (getNid nsSnap) of | ||||||
|  |               -- empty cache, block until cache changes and then retry | ||||||
|  |               (FORWARD s) | Set.null s -> retry | ||||||
|  |               result -> pure (result, cache) | ||||||
|  |         case lookupResult of | ||||||
|  |           -- already joined | ||||||
|  |           FOUND _ -> do | ||||||
|  |               print =<< readTVarIO nsSTM | ||||||
|  |               pure () | ||||||
|  |           -- otherwise try joining | ||||||
|  |           FORWARD _ -> do | ||||||
|  |               joinResult <- fediChordJoin cache  nsSTM | ||||||
|  |               either | ||||||
|  |                 -- on join failure, sleep and retry | ||||||
|  |                 -- TODO: make delay configurable | ||||||
|  |                 (const $ threadDelay (30 * 10^6) >> loop) | ||||||
|  |                 (const $ pure ())  | ||||||
|  |                 joinResult | ||||||
|  |     emptyset = Set.empty    -- because pattern matches don't accept qualified names | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| -- | cache updater thread that waits for incoming NodeCache update instructions on | -- | cache updater thread that waits for incoming NodeCache update instructions on | ||||||
| -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. | -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. | ||||||
| cacheWriter :: LocalNodeStateSTM -> IO () | cacheWriter :: LocalNodeStateSTM -> IO () | ||||||
|  | @ -239,7 +271,6 @@ cacheVerifyThread nsSTM = forever $ do | ||||||
|     let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of |     let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of | ||||||
|                                   FOUND node -> [node] |                                   FOUND node -> [node] | ||||||
|                                   FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet |                                   FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet | ||||||
|     print $ checkCacheSliceInvariants latestNs latestCache |  | ||||||
|     forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> |     forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> | ||||||
|         forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle |         forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle | ||||||
|                                                              ) |                                                              ) | ||||||
|  | @ -312,6 +343,7 @@ stabiliseThread nsSTM = forever $ do | ||||||
|     ns <- readTVarIO nsSTM |     ns <- readTVarIO nsSTM | ||||||
| 
 | 
 | ||||||
|     putStrLn "stabilise run: begin" |     putStrLn "stabilise run: begin" | ||||||
|  |     print ns | ||||||
| 
 | 
 | ||||||
|     -- iterate through the same snapshot, collect potential new neighbours |     -- iterate through the same snapshot, collect potential new neighbours | ||||||
|     -- and nodes to be deleted, and modify these changes only at the end of |     -- and nodes to be deleted, and modify these changes only at the end of | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue