Compare commits

...

2 commits

Author SHA1 Message Date
Trolli Schmittlauch e91f317a8e decrease logging verbosity 2020-07-05 16:52:09 +02:00
Trolli Schmittlauch 7c17e3a44d implement join-retry on new cache entries
closes #42
2020-07-05 16:48:57 +02:00
2 changed files with 35 additions and 4 deletions

View file

@ -18,8 +18,6 @@ main = do
conf <- readConfig
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit conf
print =<< readTVarIO thisNode
print serverSock
-- currently no masking is necessary, as there is nothing to clean up
cacheWriterThread <- forkIO $ cacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
@ -36,8 +34,9 @@ main = do
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
-- TODO: periodic retry
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state

View file

@ -46,6 +46,7 @@ module Hash2Pub.FediChord (
, mkSendSocket
, resolve
, cacheWriter
, joinOnNewEntriesThread
) where
import Control.Applicative ((<|>))
@ -166,6 +167,7 @@ fediChordJoin cacheSnapshot nsSTM = do
-- get routed to the currently responsible node, based on the response
-- from the bootstrapping node
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node
joinResult <- requestJoin currentlyResponsible nsSTM
case joinResult of
@ -173,6 +175,36 @@ fediChordJoin cacheSnapshot nsSTM = do
Right joinedNS -> pure . Right $ joinedNS
-- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM -> IO ()
joinOnNewEntriesThread nsSTM = loop
where
loop = do
nsSnap <- readTVarIO nsSTM
(lookupResult, cache) <- atomically $ do
cache <- readTVar $ nodeCacheSTM nsSnap
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
-- empty cache, block until cache changes and then retry
(FORWARD s) | Set.null s -> retry
result -> pure (result, cache)
case lookupResult of
-- already joined
FOUND _ -> do
print =<< readTVarIO nsSTM
pure ()
-- otherwise try joining
FORWARD _ -> do
joinResult <- fediChordJoin cache nsSTM
either
-- on join failure, sleep and retry
-- TODO: make delay configurable
(const $ threadDelay (30 * 10^6) >> loop)
(const $ pure ())
joinResult
emptyset = Set.empty -- because pattern matches don't accept qualified names
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
cacheWriter :: LocalNodeStateSTM -> IO ()
@ -239,7 +271,6 @@ cacheVerifyThread nsSTM = forever $ do
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
FOUND node -> [node]
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
print $ checkCacheSliceInvariants latestNs latestCache
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
)
@ -312,6 +343,7 @@ stabiliseThread nsSTM = forever $ do
ns <- readTVarIO nsSTM
putStrLn "stabilise run: begin"
print ns
-- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of