parent
bdb92411c6
commit
7c17e3a44d
|
@ -36,8 +36,9 @@ main = do
|
|||
|
||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||
print =<< readTVarIO thisNode
|
||||
-- launch thread attempting to join on new cache entries
|
||||
_ <- forkIO $ joinOnNewEntriesThread thisNode
|
||||
wait =<< async (fediMainThreads serverSock thisNode)
|
||||
-- TODO: periodic retry
|
||||
)
|
||||
(\joinedNS -> do
|
||||
-- launch main eventloop with successfully joined state
|
||||
|
|
|
@ -46,6 +46,7 @@ module Hash2Pub.FediChord (
|
|||
, mkSendSocket
|
||||
, resolve
|
||||
, cacheWriter
|
||||
, joinOnNewEntriesThread
|
||||
) where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
|
@ -166,6 +167,7 @@ fediChordJoin cacheSnapshot nsSTM = do
|
|||
-- get routed to the currently responsible node, based on the response
|
||||
-- from the bootstrapping node
|
||||
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
|
||||
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||
-- 2. then send a join to the currently responsible node
|
||||
joinResult <- requestJoin currentlyResponsible nsSTM
|
||||
case joinResult of
|
||||
|
@ -173,6 +175,36 @@ fediChordJoin cacheSnapshot nsSTM = do
|
|||
Right joinedNS -> pure . Right $ joinedNS
|
||||
|
||||
|
||||
-- | Wait for new cache entries to appear and then try joining on them.
|
||||
-- Exits after successful joining.
|
||||
joinOnNewEntriesThread :: LocalNodeStateSTM -> IO ()
|
||||
joinOnNewEntriesThread nsSTM = loop
|
||||
where
|
||||
loop = do
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
(lookupResult, cache) <- atomically $ do
|
||||
cache <- readTVar $ nodeCacheSTM nsSnap
|
||||
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
|
||||
-- empty cache, block until cache changes and then retry
|
||||
(FORWARD s) | Set.null s -> retry
|
||||
result -> pure (result, cache)
|
||||
case lookupResult of
|
||||
-- already joined
|
||||
FOUND _ -> do
|
||||
print =<< readTVarIO nsSTM
|
||||
pure ()
|
||||
-- otherwise try joining
|
||||
FORWARD _ -> do
|
||||
joinResult <- fediChordJoin cache nsSTM
|
||||
either
|
||||
-- on join failure, sleep and retry
|
||||
-- TODO: make delay configurable
|
||||
(const $ threadDelay (30 * 10^6) >> loop)
|
||||
(const $ pure ())
|
||||
joinResult
|
||||
emptyset = Set.empty -- because pattern matches don't accept qualified names
|
||||
|
||||
|
||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||
cacheWriter :: LocalNodeStateSTM -> IO ()
|
||||
|
|
Loading…
Reference in a new issue