Compare commits
2 commits
bdb92411c6
...
e91f317a8e
Author | SHA1 | Date | |
---|---|---|---|
|
e91f317a8e | ||
|
7c17e3a44d |
|
@ -18,8 +18,6 @@ main = do
|
||||||
conf <- readConfig
|
conf <- readConfig
|
||||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||||
(serverSock, thisNode) <- fediChordInit conf
|
(serverSock, thisNode) <- fediChordInit conf
|
||||||
print =<< readTVarIO thisNode
|
|
||||||
print serverSock
|
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
||||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||||
|
@ -36,8 +34,9 @@ main = do
|
||||||
|
|
||||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||||
print =<< readTVarIO thisNode
|
print =<< readTVarIO thisNode
|
||||||
|
-- launch thread attempting to join on new cache entries
|
||||||
|
_ <- forkIO $ joinOnNewEntriesThread thisNode
|
||||||
wait =<< async (fediMainThreads serverSock thisNode)
|
wait =<< async (fediMainThreads serverSock thisNode)
|
||||||
-- TODO: periodic retry
|
|
||||||
)
|
)
|
||||||
(\joinedNS -> do
|
(\joinedNS -> do
|
||||||
-- launch main eventloop with successfully joined state
|
-- launch main eventloop with successfully joined state
|
||||||
|
|
|
@ -46,6 +46,7 @@ module Hash2Pub.FediChord (
|
||||||
, mkSendSocket
|
, mkSendSocket
|
||||||
, resolve
|
, resolve
|
||||||
, cacheWriter
|
, cacheWriter
|
||||||
|
, joinOnNewEntriesThread
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Control.Applicative ((<|>))
|
import Control.Applicative ((<|>))
|
||||||
|
@ -166,6 +167,7 @@ fediChordJoin cacheSnapshot nsSTM = do
|
||||||
-- get routed to the currently responsible node, based on the response
|
-- get routed to the currently responsible node, based on the response
|
||||||
-- from the bootstrapping node
|
-- from the bootstrapping node
|
||||||
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
|
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns 50 $ getNid ns
|
||||||
|
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
|
||||||
-- 2. then send a join to the currently responsible node
|
-- 2. then send a join to the currently responsible node
|
||||||
joinResult <- requestJoin currentlyResponsible nsSTM
|
joinResult <- requestJoin currentlyResponsible nsSTM
|
||||||
case joinResult of
|
case joinResult of
|
||||||
|
@ -173,6 +175,36 @@ fediChordJoin cacheSnapshot nsSTM = do
|
||||||
Right joinedNS -> pure . Right $ joinedNS
|
Right joinedNS -> pure . Right $ joinedNS
|
||||||
|
|
||||||
|
|
||||||
|
-- | Wait for new cache entries to appear and then try joining on them.
|
||||||
|
-- Exits after successful joining.
|
||||||
|
joinOnNewEntriesThread :: LocalNodeStateSTM -> IO ()
|
||||||
|
joinOnNewEntriesThread nsSTM = loop
|
||||||
|
where
|
||||||
|
loop = do
|
||||||
|
nsSnap <- readTVarIO nsSTM
|
||||||
|
(lookupResult, cache) <- atomically $ do
|
||||||
|
cache <- readTVar $ nodeCacheSTM nsSnap
|
||||||
|
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
|
||||||
|
-- empty cache, block until cache changes and then retry
|
||||||
|
(FORWARD s) | Set.null s -> retry
|
||||||
|
result -> pure (result, cache)
|
||||||
|
case lookupResult of
|
||||||
|
-- already joined
|
||||||
|
FOUND _ -> do
|
||||||
|
print =<< readTVarIO nsSTM
|
||||||
|
pure ()
|
||||||
|
-- otherwise try joining
|
||||||
|
FORWARD _ -> do
|
||||||
|
joinResult <- fediChordJoin cache nsSTM
|
||||||
|
either
|
||||||
|
-- on join failure, sleep and retry
|
||||||
|
-- TODO: make delay configurable
|
||||||
|
(const $ threadDelay (30 * 10^6) >> loop)
|
||||||
|
(const $ pure ())
|
||||||
|
joinResult
|
||||||
|
emptyset = Set.empty -- because pattern matches don't accept qualified names
|
||||||
|
|
||||||
|
|
||||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||||
cacheWriter :: LocalNodeStateSTM -> IO ()
|
cacheWriter :: LocalNodeStateSTM -> IO ()
|
||||||
|
@ -239,7 +271,6 @@ cacheVerifyThread nsSTM = forever $ do
|
||||||
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
||||||
FOUND node -> [node]
|
FOUND node -> [node]
|
||||||
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
||||||
print $ checkCacheSliceInvariants latestNs latestCache
|
|
||||||
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
||||||
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
||||||
)
|
)
|
||||||
|
@ -312,6 +343,7 @@ stabiliseThread nsSTM = forever $ do
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
|
|
||||||
putStrLn "stabilise run: begin"
|
putStrLn "stabilise run: begin"
|
||||||
|
print ns
|
||||||
|
|
||||||
-- iterate through the same snapshot, collect potential new neighbours
|
-- iterate through the same snapshot, collect potential new neighbours
|
||||||
-- and nodes to be deleted, and modify these changes only at the end of
|
-- and nodes to be deleted, and modify these changes only at the end of
|
||||||
|
|
Loading…
Reference in a new issue