Compare commits
No commits in common. "e06c53ff7cd334f94c8ee097ff20f4996f6b0900" and "f6481996d777a33c95d2a43dcdc7e54a4e9486f0" have entirely different histories.
e06c53ff7c
...
f6481996d7
|
@ -35,7 +35,6 @@ main = do
|
||||||
-- handle unsuccessful join
|
-- handle unsuccessful join
|
||||||
|
|
||||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||||
print =<< readTVarIO thisNode
|
|
||||||
wait =<< async (fediMainThreads serverSock thisNode)
|
wait =<< async (fediMainThreads serverSock thisNode)
|
||||||
-- TODO: periodic retry
|
-- TODO: periodic retry
|
||||||
)
|
)
|
||||||
|
|
|
@ -246,7 +246,6 @@ handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling
|
||||||
-> SockAddr -- ^ source address of the request
|
-> SockAddr -- ^ source address of the request
|
||||||
-> IO ()
|
-> IO ()
|
||||||
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||||
putStrLn $ "handling incoming request: " <> show msgSet
|
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
-- add nodestate to cache
|
-- add nodestate to cache
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
|
|
|
@ -150,6 +150,7 @@ fediChordBootstrapJoin nsSTM (joinHost, joinPort) =
|
||||||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||||
)
|
)
|
||||||
initCache bootstrapResponse
|
initCache bootstrapResponse
|
||||||
|
putStrLn "initialised bootstrap cache"
|
||||||
fediChordJoin bootstrapCache nsSTM
|
fediChordJoin bootstrapCache nsSTM
|
||||||
)
|
)
|
||||||
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
|
`catch` (\e -> pure . Left $ "Error at bootstrap joining: " <> displayException (e :: IOException))
|
||||||
|
@ -191,7 +192,6 @@ maxEntryAge = 600
|
||||||
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
||||||
cacheVerifyThread :: LocalNodeStateSTM -> IO ()
|
cacheVerifyThread :: LocalNodeStateSTM -> IO ()
|
||||||
cacheVerifyThread nsSTM = forever $ do
|
cacheVerifyThread nsSTM = forever $ do
|
||||||
putStrLn "cache verify run: begin"
|
|
||||||
-- get cache
|
-- get cache
|
||||||
(ns, cache) <- atomically $ do
|
(ns, cache) <- atomically $ do
|
||||||
ns <- readTVar nsSTM
|
ns <- readTVar nsSTM
|
||||||
|
@ -231,7 +231,7 @@ cacheVerifyThread nsSTM = forever $ do
|
||||||
) pong
|
) pong
|
||||||
else pure ()
|
else pure ()
|
||||||
)
|
)
|
||||||
|
|
||||||
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
||||||
-- middle of each slice not verifying the invariant
|
-- middle of each slice not verifying the invariant
|
||||||
latestNs <- readTVarIO nsSTM
|
latestNs <- readTVarIO nsSTM
|
||||||
|
@ -239,12 +239,10 @@ cacheVerifyThread nsSTM = forever $ do
|
||||||
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
||||||
FOUND node -> [node]
|
FOUND node -> [node]
|
||||||
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
||||||
print $ checkCacheSliceInvariants latestNs latestCache
|
|
||||||
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
||||||
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
||||||
)
|
)
|
||||||
|
|
||||||
putStrLn "cache verify run: end"
|
|
||||||
threadDelay $ 10^6 * round maxEntryAge `div` 20
|
threadDelay $ 10^6 * round maxEntryAge `div` 20
|
||||||
|
|
||||||
|
|
||||||
|
@ -255,7 +253,7 @@ checkCacheSliceInvariants :: LocalNodeState
|
||||||
-> NodeCache
|
-> NodeCache
|
||||||
-> [NodeID] -- ^ list of middle IDs of slices not
|
-> [NodeID] -- ^ list of middle IDs of slices not
|
||||||
-- ^ fulfilling the invariant
|
-- ^ fulfilling the invariant
|
||||||
checkCacheSliceInvariants ns
|
checkCacheSliceInvariants ns
|
||||||
| isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
|
| isJoined ns = checkPredecessorSlice jEntries (getNid ns) startBound lastPred <> checkSuccessorSlice jEntries (getNid ns) startBound lastSucc
|
||||||
| otherwise = const []
|
| otherwise = const []
|
||||||
where
|
where
|
||||||
|
@ -311,8 +309,6 @@ stabiliseThread :: LocalNodeStateSTM -> IO ()
|
||||||
stabiliseThread nsSTM = forever $ do
|
stabiliseThread nsSTM = forever $ do
|
||||||
ns <- readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
|
|
||||||
putStrLn "stabilise run: begin"
|
|
||||||
|
|
||||||
-- iterate through the same snapshot, collect potential new neighbours
|
-- iterate through the same snapshot, collect potential new neighbours
|
||||||
-- and nodes to be deleted, and modify these changes only at the end of
|
-- and nodes to be deleted, and modify these changes only at the end of
|
||||||
-- each stabilise run.
|
-- each stabilise run.
|
||||||
|
@ -360,7 +356,6 @@ stabiliseThread nsSTM = forever $ do
|
||||||
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
|
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
|
||||||
)
|
)
|
||||||
|
|
||||||
putStrLn "stabilise run: end"
|
|
||||||
-- TODO: make delay configurable
|
-- TODO: make delay configurable
|
||||||
threadDelay (60 * 10^6)
|
threadDelay (60 * 10^6)
|
||||||
where
|
where
|
||||||
|
@ -425,7 +420,6 @@ sendThread sock sendQ = forever $ do
|
||||||
-- | Sets up and manages the main server threads of FediChord
|
-- | Sets up and manages the main server threads of FediChord
|
||||||
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
|
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
|
||||||
fediMainThreads sock nsSTM = do
|
fediMainThreads sock nsSTM = do
|
||||||
(\x -> putStrLn $ "launching threads, ns: " <> show x) =<< readTVarIO nsSTM
|
|
||||||
sendQ <- newTQueueIO
|
sendQ <- newTQueueIO
|
||||||
recvQ <- newTQueueIO
|
recvQ <- newTQueueIO
|
||||||
-- concurrently launch all handler threads, if one of them throws an exception
|
-- concurrently launch all handler threads, if one of them throws an exception
|
||||||
|
|
|
@ -80,8 +80,6 @@ import qualified Network.ByteOrder as NetworkBytes
|
||||||
|
|
||||||
import Hash2Pub.Utils
|
import Hash2Pub.Utils
|
||||||
|
|
||||||
import Debug.Trace (trace)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- define protocol constants
|
-- define protocol constants
|
||||||
|
|
|
@ -276,7 +276,7 @@ spec = do
|
||||||
ownNode <- setNid ownId <$> exampleLocalNode
|
ownNode <- setNid ownId <$> exampleLocalNode
|
||||||
let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId
|
let (FORWARD qResult) = queryLocalCache ownNode bootstrapCache 2 ownId
|
||||||
remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode
|
remoteNode (head $ Set.elems qResult) `shouldBe` bootstrapNode
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- some example data
|
-- some example data
|
||||||
|
|
Loading…
Reference in a new issue