diff --git a/app/Main.hs b/app/Main.hs index eb54359..36e79c5 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -18,6 +18,8 @@ main = do conf <- readConfig -- ToDo: load persisted caches, bootstrapping nodes … (serverSock, thisNode) <- fediChordInit conf + print =<< readTVarIO thisNode + print serverSock -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode -- try joining the DHT using one of the provided bootstrapping nodes @@ -34,9 +36,8 @@ main = do putStrLn $ err <> " Error joining, start listening for incoming requests anyways" print =<< readTVarIO thisNode - -- launch thread attempting to join on new cache entries - _ <- forkIO $ joinOnNewEntriesThread thisNode wait =<< async (fediMainThreads serverSock thisNode) + -- TODO: periodic retry ) (\joinedNS -> do -- launch main eventloop with successfully joined state diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 5470731..8337b06 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -46,7 +46,6 @@ module Hash2Pub.FediChord ( , mkSendSocket , resolve , cacheWriter - , joinOnNewEntriesThread ) where import Control.Applicative ((<|>)) @@ -167,7 +166,6 @@ fediChordJoin cacheSnapshot nsSTM = do -- get routed to the currently responsible node, based on the response -- from the bootstrapping node currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns - putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node joinResult <- requestJoin currentlyResponsible nsSTM case joinResult of @@ -175,36 +173,6 @@ fediChordJoin cacheSnapshot nsSTM = do Right joinedNS -> pure . Right $ joinedNS --- | Wait for new cache entries to appear and then try joining on them. --- Exits after successful joining. -joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () -joinOnNewEntriesThread nsSTM = loop - where - loop = do - nsSnap <- readTVarIO nsSTM - (lookupResult, cache) <- atomically $ do - cache <- readTVar $ nodeCacheSTM nsSnap - case queryLocalCache nsSnap cache 1 (getNid nsSnap) of - -- empty cache, block until cache changes and then retry - (FORWARD s) | Set.null s -> retry - result -> pure (result, cache) - case lookupResult of - -- already joined - FOUND _ -> do - print =<< readTVarIO nsSTM - pure () - -- otherwise try joining - FORWARD _ -> do - joinResult <- fediChordJoin cache nsSTM - either - -- on join failure, sleep and retry - -- TODO: make delay configurable - (const $ threadDelay (30 * 10^6) >> loop) - (const $ pure ()) - joinResult - emptyset = Set.empty -- because pattern matches don't accept qualified names - - -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. cacheWriter :: LocalNodeStateSTM -> IO () @@ -271,6 +239,7 @@ cacheVerifyThread nsSTM = forever $ do let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of FOUND node -> [node] FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet + print $ checkCacheSliceInvariants latestNs latestCache forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle ) @@ -343,7 +312,6 @@ stabiliseThread nsSTM = forever $ do ns <- readTVarIO nsSTM putStrLn "stabilise run: begin" - print ns -- iterate through the same snapshot, collect potential new neighbours -- and nodes to be deleted, and modify these changes only at the end of