diff --git a/app/Main.hs b/app/Main.hs index 36e79c5..d06ae26 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -36,8 +36,9 @@ main = do putStrLn $ err <> " Error joining, start listening for incoming requests anyways" print =<< readTVarIO thisNode + -- launch thread attempting to join on new cache entries + _ <- forkIO $ joinOnNewEntriesThread thisNode wait =<< async (fediMainThreads serverSock thisNode) - -- TODO: periodic retry ) (\joinedNS -> do -- launch main eventloop with successfully joined state diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 8337b06..100ae5f 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -46,6 +46,7 @@ module Hash2Pub.FediChord ( , mkSendSocket , resolve , cacheWriter + , joinOnNewEntriesThread ) where import Control.Applicative ((<|>)) @@ -166,6 +167,7 @@ fediChordJoin cacheSnapshot nsSTM = do -- get routed to the currently responsible node, based on the response -- from the bootstrapping node currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns + putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node joinResult <- requestJoin currentlyResponsible nsSTM case joinResult of @@ -173,6 +175,36 @@ fediChordJoin cacheSnapshot nsSTM = do Right joinedNS -> pure . Right $ joinedNS +-- | Wait for new cache entries to appear and then try joining on them. +-- Exits after successful joining. +joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () +joinOnNewEntriesThread nsSTM = loop + where + loop = do + nsSnap <- readTVarIO nsSTM + (lookupResult, cache) <- atomically $ do + cache <- readTVar $ nodeCacheSTM nsSnap + case queryLocalCache nsSnap cache 1 (getNid nsSnap) of + -- empty cache, block until cache changes and then retry + (FORWARD s) | Set.null s -> retry + result -> pure (result, cache) + case lookupResult of + -- already joined + FOUND _ -> do + print =<< readTVarIO nsSTM + pure () + -- otherwise try joining + FORWARD _ -> do + joinResult <- fediChordJoin cache nsSTM + either + -- on join failure, sleep and retry + -- TODO: make delay configurable + (const $ threadDelay (30 * 10^6) >> loop) + (const $ pure ()) + joinResult + emptyset = Set.empty -- because pattern matches don't accept qualified names + + -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. cacheWriter :: LocalNodeStateSTM -> IO ()