From 7c17e3a44dff3a73314ea290c2b48a494a7c6b4d Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 4 Jul 2020 15:03:28 +0200 Subject: [PATCH 1/2] implement join-retry on new cache entries closes #42 --- app/Main.hs | 3 ++- src/Hash2Pub/FediChord.hs | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/app/Main.hs b/app/Main.hs index 36e79c5..d06ae26 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -36,8 +36,9 @@ main = do putStrLn $ err <> " Error joining, start listening for incoming requests anyways" print =<< readTVarIO thisNode + -- launch thread attempting to join on new cache entries + _ <- forkIO $ joinOnNewEntriesThread thisNode wait =<< async (fediMainThreads serverSock thisNode) - -- TODO: periodic retry ) (\joinedNS -> do -- launch main eventloop with successfully joined state diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 8337b06..100ae5f 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -46,6 +46,7 @@ module Hash2Pub.FediChord ( , mkSendSocket , resolve , cacheWriter + , joinOnNewEntriesThread ) where import Control.Applicative ((<|>)) @@ -166,6 +167,7 @@ fediChordJoin cacheSnapshot nsSTM = do -- get routed to the currently responsible node, based on the response -- from the bootstrapping node currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns + putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node joinResult <- requestJoin currentlyResponsible nsSTM case joinResult of @@ -173,6 +175,36 @@ fediChordJoin cacheSnapshot nsSTM = do Right joinedNS -> pure . Right $ joinedNS +-- | Wait for new cache entries to appear and then try joining on them. +-- Exits after successful joining. +joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () +joinOnNewEntriesThread nsSTM = loop + where + loop = do + nsSnap <- readTVarIO nsSTM + (lookupResult, cache) <- atomically $ do + cache <- readTVar $ nodeCacheSTM nsSnap + case queryLocalCache nsSnap cache 1 (getNid nsSnap) of + -- empty cache, block until cache changes and then retry + (FORWARD s) | Set.null s -> retry + result -> pure (result, cache) + case lookupResult of + -- already joined + FOUND _ -> do + print =<< readTVarIO nsSTM + pure () + -- otherwise try joining + FORWARD _ -> do + joinResult <- fediChordJoin cache nsSTM + either + -- on join failure, sleep and retry + -- TODO: make delay configurable + (const $ threadDelay (30 * 10^6) >> loop) + (const $ pure ()) + joinResult + emptyset = Set.empty -- because pattern matches don't accept qualified names + + -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. cacheWriter :: LocalNodeStateSTM -> IO () From e91f317a8e3346b381f85ad8380e14b4b374ddce Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 5 Jul 2020 16:52:09 +0200 Subject: [PATCH 2/2] decrease logging verbosity --- app/Main.hs | 2 -- src/Hash2Pub/FediChord.hs | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index d06ae26..eb54359 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -18,8 +18,6 @@ main = do conf <- readConfig -- ToDo: load persisted caches, bootstrapping nodes … (serverSock, thisNode) <- fediChordInit conf - print =<< readTVarIO thisNode - print serverSock -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode -- try joining the DHT using one of the provided bootstrapping nodes diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 100ae5f..5470731 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -271,7 +271,6 @@ cacheVerifyThread nsSTM = forever $ do let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of FOUND node -> [node] FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet - print $ checkCacheSliceInvariants latestNs latestCache forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle ) @@ -344,6 +343,7 @@ stabiliseThread nsSTM = forever $ do ns <- readTVarIO nsSTM putStrLn "stabilise run: begin" + print ns -- iterate through the same snapshot, collect potential new neighbours -- and nodes to be deleted, and modify these changes only at the end of