Compare commits
No commits in common. "499c90e63af74499ca895e26e697cdf81b433011" and "12dfc56a7321ffeb204e3978b6010992ee4df8dc" have entirely different histories.
499c90e63a
...
12dfc56a73
4 changed files with 216 additions and 245 deletions
25
app/Main.hs
25
app/Main.hs
|
@ -18,10 +18,29 @@ main = do
|
||||||
-- ToDo: parse and pass config
|
-- ToDo: parse and pass config
|
||||||
-- probably use `tomland` for that
|
-- probably use `tomland` for that
|
||||||
(fConf, sConf) <- readConfig
|
(fConf, sConf) <- readConfig
|
||||||
|
-- TODO: first initialise 'RealNode', then the vservers
|
||||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||||
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
||||||
-- wait for all DHT threads to terminate, this keeps the main thread running
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
wait fediThreads
|
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
||||||
|
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||||
|
joinedState <- tryBootstrapJoining thisNode
|
||||||
|
either (\err -> do
|
||||||
|
-- handle unsuccessful join
|
||||||
|
|
||||||
|
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||||
|
print =<< readTVarIO thisNode
|
||||||
|
-- launch thread attempting to join on new cache entries
|
||||||
|
_ <- forkIO $ joinOnNewEntriesThread thisNode
|
||||||
|
wait =<< async (fediMainThreads serverSock thisNode)
|
||||||
|
)
|
||||||
|
(\joinedNS -> do
|
||||||
|
-- launch main eventloop with successfully joined state
|
||||||
|
putStrLn "successful join"
|
||||||
|
wait =<< async (fediMainThreads serverSock thisNode)
|
||||||
|
)
|
||||||
|
joinedState
|
||||||
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
readConfig :: IO (FediChordConf, ServiceConf)
|
readConfig :: IO (FediChordConf, ServiceConf)
|
||||||
|
|
|
@ -241,16 +241,16 @@ sendMessageSize = 1200
|
||||||
-- ====== message send and receive operations ======
|
-- ====== message send and receive operations ======
|
||||||
|
|
||||||
-- encode the response to a request that just signals successful receipt
|
-- encode the response to a request that just signals successful receipt
|
||||||
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
|
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
||||||
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
|
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||||
requestID = requestID req
|
requestID = requestID req
|
||||||
, senderID = receiverID req
|
, senderID = ownID
|
||||||
, part = part req
|
, part = part req
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
, action = action req
|
, action = action req
|
||||||
, payload = Nothing
|
, payload = Nothing
|
||||||
}
|
}
|
||||||
ackRequest _ = Map.empty
|
ackRequest _ _ = Map.empty
|
||||||
|
|
||||||
|
|
||||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||||
|
@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||||
Nothing -> pure ()
|
Nothing -> pure ()
|
||||||
Just aPart -> do
|
Just aPart -> do
|
||||||
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
|
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
|
||||||
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
|
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
|
||||||
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
||||||
maybe (pure ()) (
|
maybe (pure ()) (
|
||||||
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
||||||
|
@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do
|
||||||
writeTVar ownStateSTM newState
|
writeTVar ownStateSTM newState
|
||||||
pure (cacheInsertQ, newState)
|
pure (cacheInsertQ, newState)
|
||||||
-- execute the cache insertions
|
-- execute the cache insertions
|
||||||
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
|
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||||
if responses == Set.empty
|
if responses == Set.empty
|
||||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||||
else do
|
else do
|
||||||
|
@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do
|
||||||
_ -> Set.empty
|
_ -> Set.empty
|
||||||
|
|
||||||
-- forward entries to global cache
|
-- forward entries to global cache
|
||||||
queueAddEntries entrySet (cacheWriteQueue ns)
|
queueAddEntries entrySet ns
|
||||||
-- return accumulated QueryResult
|
-- return accumulated QueryResult
|
||||||
pure $ case acc of
|
pure $ case acc of
|
||||||
-- once a FOUND as been encountered, return this as a result
|
-- once a FOUND as been encountered, return this as a result
|
||||||
|
@ -670,7 +670,7 @@ requestStabilise ns neighbour = do
|
||||||
)
|
)
|
||||||
([],[]) respSet
|
([],[]) respSet
|
||||||
-- update successfully responded neighbour in cache
|
-- update successfully responded neighbour in cache
|
||||||
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
|
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
|
||||||
pure $ if null responsePreds && null responseSuccs
|
pure $ if null responsePreds && null responseSuccs
|
||||||
then Left "no neighbours returned"
|
then Left "no neighbours returned"
|
||||||
else Right (responsePreds, responseSuccs)
|
else Right (responsePreds, responseSuccs)
|
||||||
|
@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||||
|
|
||||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||||
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
||||||
-> TQueue (NodeCache -> NodeCache)
|
-> LocalNodeState s
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueAddEntries entries cacheQ = do
|
queueAddEntries entries ns = do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry
|
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
|
||||||
|
|
||||||
|
|
||||||
-- | enque a list of node IDs to be deleted from the global NodeCache
|
-- | enque a list of node IDs to be deleted from the global NodeCache
|
||||||
queueDeleteEntries :: Foldable c
|
queueDeleteEntries :: Foldable c
|
||||||
=> c NodeID
|
=> c NodeID
|
||||||
-> TQueue (NodeCache -> NodeCache)
|
-> LocalNodeState s
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
|
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
|
||||||
|
|
||||||
|
|
||||||
-- | enque a single node ID to be deleted from the global NodeCache
|
-- | enque a single node ID to be deleted from the global NodeCache
|
||||||
queueDeleteEntry :: NodeID
|
queueDeleteEntry :: NodeID
|
||||||
-> TQueue (NodeCache -> NodeCache)
|
-> LocalNodeState s
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||||
|
|
||||||
|
@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||||
-- global 'NodeCache'.
|
-- global 'NodeCache'.
|
||||||
queueUpdateVerifieds :: Foldable c
|
queueUpdateVerifieds :: Foldable c
|
||||||
=> c NodeID
|
=> c NodeID
|
||||||
-> TQueue (NodeCache -> NodeCache)
|
-> LocalNodeState s
|
||||||
-> IO ()
|
-> IO ()
|
||||||
queueUpdateVerifieds nIds cacheQ = do
|
queueUpdateVerifieds nIds ns = do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
|
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
|
||||||
markCacheEntryAsVerified (Just now) nid'
|
markCacheEntryAsVerified (Just now) nid'
|
||||||
|
|
||||||
-- | retry an IO action at most *i* times until it delivers a result
|
-- | retry an IO action at most *i* times until it delivers a result
|
||||||
|
|
|
@ -69,12 +69,12 @@ import qualified Data.ByteString.UTF8 as BSU
|
||||||
import Data.Either (rights)
|
import Data.Either (rights)
|
||||||
import Data.Foldable (foldr')
|
import Data.Foldable (foldr')
|
||||||
import Data.Functor.Identity
|
import Data.Functor.Identity
|
||||||
import Data.HashMap.Strict (HashMap)
|
|
||||||
import qualified Data.HashMap.Strict as HMap
|
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
import Data.IP (IPv6, fromHostAddress6,
|
||||||
toHostAddress6)
|
toHostAddress6)
|
||||||
import Data.List ((\\))
|
import Data.List ((\\))
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
|
import qualified Data.HashMap.Strict as HMap
|
||||||
|
import Data.HashMap.Strict (HashMap)
|
||||||
import Data.Maybe (catMaybes, fromJust, fromMaybe,
|
import Data.Maybe (catMaybes, fromJust, fromMaybe,
|
||||||
isJust, isNothing, mapMaybe)
|
isJust, isNothing, mapMaybe)
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
|
@ -98,59 +98,38 @@ import Debug.Trace (trace)
|
||||||
fediChordInit :: (Service s (RealNodeSTM s))
|
fediChordInit :: (Service s (RealNodeSTM s))
|
||||||
=> FediChordConf
|
=> FediChordConf
|
||||||
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
|
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
|
||||||
-> IO (Async (), RealNodeSTM s)
|
-> IO (Socket, RealNodeSTM s)
|
||||||
fediChordInit initConf serviceRunner = do
|
fediChordInit initConf serviceRunner = do
|
||||||
emptyLookupCache <- newTVarIO Map.empty
|
emptyLookupCache <- newTVarIO Map.empty
|
||||||
cacheSTM <- newTVarIO initCache
|
let realNode = RealNode {
|
||||||
cacheQ <- atomically newTQueue
|
vservers = HMap.empty
|
||||||
let realNode = RealNode
|
|
||||||
{ vservers = HMap.empty
|
|
||||||
, nodeConfig = initConf
|
, nodeConfig = initConf
|
||||||
, bootstrapNodes = confBootstrapNodes initConf
|
, bootstrapNodes = confBootstrapNodes initConf
|
||||||
, lookupCacheSTM = emptyLookupCache
|
, lookupCacheSTM = emptyLookupCache
|
||||||
, nodeService = undefined
|
, nodeService = undefined
|
||||||
, globalNodeCacheSTM = cacheSTM
|
}
|
||||||
, globalCacheWriteQueue = cacheQ
|
|
||||||
}
|
|
||||||
realNodeSTM <- newTVarIO realNode
|
realNodeSTM <- newTVarIO realNode
|
||||||
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
|
|
||||||
-- launch service and set the reference in the RealNode
|
-- launch service and set the reference in the RealNode
|
||||||
serv <- serviceRunner realNodeSTM
|
serv <- serviceRunner realNodeSTM
|
||||||
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
|
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
|
||||||
-- prepare for joining: start node cache writer thread
|
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
|
||||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
|
|
||||||
-- TODO: k-choices way of joining, so far just initialise a single vserver
|
-- TODO: k-choices way of joining, so far just initialise a single vserver
|
||||||
firstVS <- nodeStateInit realNodeSTM 0
|
firstVS <- nodeStateInit realNodeSTM
|
||||||
firstVSSTM <- newTVarIO firstVS
|
firstVSSTM <- newTVarIO firstVS
|
||||||
-- add vserver to list at RealNode
|
-- add vserver to list at RealNode
|
||||||
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
|
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
|
||||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
|
||||||
joinedState <- tryBootstrapJoining firstVSSTM
|
pure (serverSock, realNodeSTM)
|
||||||
fediThreadsAsync <- either (\err -> do
|
|
||||||
-- handle unsuccessful join
|
|
||||||
|
|
||||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
|
||||||
-- launch thread attempting to join on new cache entries
|
|
||||||
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
|
|
||||||
async (fediMainThreads serverSock realNodeSTM)
|
|
||||||
)
|
|
||||||
(\joinedNS -> do
|
|
||||||
-- launch main eventloop with successfully joined state
|
|
||||||
putStrLn "successful join"
|
|
||||||
async (fediMainThreads serverSock realNodeSTM)
|
|
||||||
)
|
|
||||||
joinedState
|
|
||||||
pure (fediThreadsAsync, realNodeSTM)
|
|
||||||
|
|
||||||
-- | initialises the 'NodeState' for this local node.
|
-- | initialises the 'NodeState' for this local node.
|
||||||
-- Separated from 'fediChordInit' to be usable in tests.
|
-- Separated from 'fediChordInit' to be usable in tests.
|
||||||
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s)
|
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s)
|
||||||
nodeStateInit realNodeSTM vsID' = do
|
nodeStateInit realNodeSTM = do
|
||||||
realNode <- readTVarIO realNodeSTM
|
realNode <- readTVarIO realNodeSTM
|
||||||
|
cacheSTM <- newTVarIO initCache
|
||||||
|
q <- atomically newTQueue
|
||||||
let
|
let
|
||||||
conf = nodeConfig realNode
|
conf = nodeConfig realNode
|
||||||
vsID = vsID'
|
vsID = 0
|
||||||
containedState = RemoteNodeState {
|
containedState = RemoteNodeState {
|
||||||
domain = confDomain conf
|
domain = confDomain conf
|
||||||
, ipAddr = confIP conf
|
, ipAddr = confIP conf
|
||||||
|
@ -161,8 +140,8 @@ nodeStateInit realNodeSTM vsID' = do
|
||||||
}
|
}
|
||||||
initialState = LocalNodeState {
|
initialState = LocalNodeState {
|
||||||
nodeState = containedState
|
nodeState = containedState
|
||||||
, nodeCacheSTM = globalNodeCacheSTM realNode
|
, nodeCacheSTM = cacheSTM
|
||||||
, cacheWriteQueue = globalCacheWriteQueue realNode
|
, cacheWriteQueue = q
|
||||||
, successors = []
|
, successors = []
|
||||||
, predecessors = []
|
, predecessors = []
|
||||||
, kNeighbours = 3
|
, kNeighbours = 3
|
||||||
|
@ -195,36 +174,33 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
|
||||||
|
|
||||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
||||||
-- Unjoined try joining instead.
|
-- Unjoined try joining instead.
|
||||||
convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
||||||
convergenceSampleThread nodeSTM = forever $ do
|
convergenceSampleThread nsSTM = forever $ do
|
||||||
node <- readTVarIO nodeSTM
|
nsSnap <- readTVarIO nsSTM
|
||||||
forM_ (vservers node) $ \nsSTM -> do
|
parentNode <- readTVarIO $ parentRealNode nsSnap
|
||||||
nsSnap <- readTVarIO nsSTM
|
if isJoined nsSnap
|
||||||
parentNode <- readTVarIO $ parentRealNode nsSnap
|
then
|
||||||
if isJoined nsSnap
|
runExceptT (do
|
||||||
then
|
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
||||||
runExceptT (do
|
let bss = bootstrapNodes parentNode
|
||||||
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
||||||
let bss = bootstrapNodes parentNode
|
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
||||||
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
||||||
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
currentlyResponsible <- liftEither lookupResult
|
||||||
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
if getNid currentlyResponsible /= getNid nsSnap
|
||||||
currentlyResponsible <- liftEither lookupResult
|
-- if mismatch, stabilise on the result, else do nothing
|
||||||
if getNid currentlyResponsible /= getNid nsSnap
|
then do
|
||||||
-- if mismatch, stabilise on the result, else do nothing
|
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
||||||
then do
|
(preds, succs) <- liftEither stabResult
|
||||||
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
-- TODO: verify neighbours before adding, see #55
|
||||||
(preds, succs) <- liftEither stabResult
|
liftIO . atomically $ do
|
||||||
-- TODO: verify neighbours before adding, see #55
|
ns <- readTVar nsSTM
|
||||||
liftIO . atomically $ do
|
writeTVar nsSTM $ addPredecessors preds ns
|
||||||
ns <- readTVar nsSTM
|
else pure ()
|
||||||
writeTVar nsSTM $ addPredecessors preds ns
|
) >> pure ()
|
||||||
else pure ()
|
-- unjoined node: try joining through all bootstrapping nodes
|
||||||
) >> pure ()
|
else tryBootstrapJoining nsSTM >> pure ()
|
||||||
-- unjoined node: try joining through all bootstrapping nodes
|
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
|
||||||
else tryBootstrapJoining nsSTM >> pure ()
|
|
||||||
|
|
||||||
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node
|
|
||||||
threadDelay delaySecs
|
threadDelay delaySecs
|
||||||
|
|
||||||
|
|
||||||
|
@ -360,81 +336,68 @@ joinOnNewEntriesThread nsSTM = loop
|
||||||
|
|
||||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||||
nodeCacheWriter :: RealNodeSTM s -> IO ()
|
nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
|
||||||
nodeCacheWriter nodeSTM = do
|
nodeCacheWriter nsSTM =
|
||||||
node <- readTVarIO nodeSTM
|
|
||||||
forever $ atomically $ do
|
forever $ atomically $ do
|
||||||
cacheModifier <- readTQueue $ globalCacheWriteQueue node
|
ns <- readTVar nsSTM
|
||||||
modifyTVar' (globalNodeCacheSTM node) cacheModifier
|
cacheModifier <- readTQueue $ cacheWriteQueue ns
|
||||||
|
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
||||||
|
|
||||||
|
|
||||||
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
||||||
nodeCacheVerifyThread :: RealNodeSTM s -> IO ()
|
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
|
||||||
nodeCacheVerifyThread nodeSTM = forever $ do
|
nodeCacheVerifyThread nsSTM = forever $ do
|
||||||
(node, firstVSSTM) <- atomically $ do
|
-- get cache
|
||||||
node <- readTVar nodeSTM
|
(ns, cache, maxEntryAge) <- atomically $ do
|
||||||
case headMay (HMap.elems $ vservers node) of
|
ns <- readTVar nsSTM
|
||||||
-- wait until first VS is joined
|
cache <- readTVar $ nodeCacheSTM ns
|
||||||
Nothing -> retry
|
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
|
||||||
Just vs' -> pure (node, vs')
|
pure (ns, cache, maxEntryAge)
|
||||||
let
|
|
||||||
maxEntryAge = confMaxNodeCacheAge $ nodeConfig node
|
|
||||||
cacheQ = globalCacheWriteQueue node
|
|
||||||
cache <- readTVarIO $ globalNodeCacheSTM node
|
|
||||||
-- always use the first active VS as a sender for operations like Ping
|
|
||||||
firstVS <- readTVarIO firstVSSTM
|
|
||||||
-- iterate entries:
|
-- iterate entries:
|
||||||
-- for avoiding too many time syscalls, get current time before iterating.
|
-- for avoiding too many time syscalls, get current time before iterating.
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) ->
|
forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) ->
|
||||||
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
|
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
|
||||||
if (now - ts) > maxEntryAge
|
if (now - ts) > maxEntryAge
|
||||||
then
|
then
|
||||||
queueDeleteEntry (getNid cacheNode) cacheQ
|
queueDeleteEntry (getNid node) ns
|
||||||
-- case unverified: try verifying, otherwise delete
|
-- case unverified: try verifying, otherwise delete
|
||||||
else if not validated
|
else if not validated
|
||||||
then do
|
then do
|
||||||
-- marking as verified is done by 'requestPing' as well
|
-- marking as verified is done by 'requestPing' as well
|
||||||
pong <- requestPing firstVS cacheNode
|
pong <- requestPing ns node
|
||||||
either (\_->
|
either (\_->
|
||||||
queueDeleteEntry (getNid cacheNode) cacheQ
|
queueDeleteEntry (getNid node) ns
|
||||||
)
|
)
|
||||||
(\vss ->
|
(\vss ->
|
||||||
if cacheNode `notElem` vss
|
if node `notElem` vss
|
||||||
then queueDeleteEntry (getNid cacheNode) cacheQ
|
then queueDeleteEntry (getNid node) ns
|
||||||
-- after verifying a node, check whether it can be a closer neighbour
|
-- after verifying a node, check whether it can be a closer neighbour
|
||||||
-- do this for each node
|
else do
|
||||||
-- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor
|
if node `isPossiblePredecessor` ns
|
||||||
else forM_ (vservers node) (\nsSTM -> do
|
|
||||||
ns <- readTVarIO nsSTM
|
|
||||||
if cacheNode `isPossiblePredecessor` ns
|
|
||||||
then atomically $ do
|
then atomically $ do
|
||||||
ns' <- readTVar nsSTM
|
ns' <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addPredecessors [cacheNode] ns'
|
writeTVar nsSTM $ addPredecessors [node] ns'
|
||||||
else pure ()
|
else pure ()
|
||||||
if cacheNode `isPossibleSuccessor` ns
|
if node `isPossibleSuccessor` ns
|
||||||
then atomically $ do
|
then atomically $ do
|
||||||
ns' <- readTVar nsSTM
|
ns' <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addSuccessors [cacheNode] ns'
|
writeTVar nsSTM $ addSuccessors [node] ns'
|
||||||
else pure ()
|
else pure ()
|
||||||
)
|
|
||||||
) pong
|
) pong
|
||||||
else pure ()
|
else pure ()
|
||||||
)
|
)
|
||||||
|
|
||||||
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
||||||
-- middle of each slice not verifying the invariant
|
-- middle of each slice not verifying the invariant
|
||||||
latestNode <- readTVarIO nodeSTM
|
latestNs <- readTVarIO nsSTM
|
||||||
forM_ (vservers latestNode) (\nsSTM -> do
|
latestCache <- readTVarIO $ nodeCacheSTM latestNs
|
||||||
latestNs <- readTVarIO nsSTM
|
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
||||||
latestCache <- readTVarIO $ nodeCacheSTM latestNs
|
FOUND node -> [node]
|
||||||
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
||||||
FOUND node -> [node]
|
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
||||||
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
||||||
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
)
|
||||||
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
|
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
|
||||||
|
|
||||||
|
@ -498,93 +461,90 @@ checkCacheSliceInvariants ns
|
||||||
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
|
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
|
||||||
-- one responds, and get their neighbours for maintaining the own neighbour lists.
|
-- one responds, and get their neighbours for maintaining the own neighbour lists.
|
||||||
-- If necessary, request new neighbours.
|
-- If necessary, request new neighbours.
|
||||||
stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
||||||
stabiliseThread nodeSTM = forever $ do
|
stabiliseThread nsSTM = forever $ do
|
||||||
node <- readTVarIO nodeSTM
|
oldNs <- readTVarIO nsSTM
|
||||||
forM_ (vservers node) (\nsSTM -> do
|
|
||||||
oldNs <- readTVarIO nsSTM
|
|
||||||
|
|
||||||
|
|
||||||
-- iterate through the same snapshot, collect potential new neighbours
|
-- iterate through the same snapshot, collect potential new neighbours
|
||||||
-- and nodes to be deleted, and modify these changes only at the end of
|
-- and nodes to be deleted, and modify these changes only at the end of
|
||||||
-- each stabilise run.
|
-- each stabilise run.
|
||||||
-- This decision makes iterating through a potentially changing list easier.
|
-- This decision makes iterating through a potentially changing list easier.
|
||||||
|
|
||||||
-- don't contact all neighbours unless the previous one failed/ Left ed
|
-- don't contact all neighbours unless the previous one failed/ Left ed
|
||||||
|
|
||||||
predStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
predStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
||||||
succStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
succStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
||||||
|
|
||||||
|
let
|
||||||
|
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
|
||||||
|
(succDeletes, succNeighbours) = either (const ([], [])) id succStabilise
|
||||||
|
allDeletes = predDeletes <> succDeletes
|
||||||
|
allNeighbours = predNeighbours <> succNeighbours
|
||||||
|
|
||||||
|
-- now actually modify the node state's neighbours
|
||||||
|
updatedNs <- atomically $ do
|
||||||
|
newerNsSnap <- readTVar nsSTM
|
||||||
let
|
let
|
||||||
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
|
-- sorting and taking only k neighbours is taken care of by the
|
||||||
(succDeletes, succNeighbours) = either (const ([], [])) id succStabilise
|
-- setSuccessors/ setPredecessors functions
|
||||||
allDeletes = predDeletes <> succDeletes
|
newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||||
allNeighbours = predNeighbours <> succNeighbours
|
newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||||
|
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
|
||||||
|
writeTVar nsSTM newNs
|
||||||
|
pure newNs
|
||||||
|
-- delete unresponding nodes from cache as well
|
||||||
|
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
|
||||||
|
|
||||||
-- now actually modify the node state's neighbours
|
-- try looking up additional neighbours if list too short
|
||||||
updatedNs <- atomically $ do
|
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||||
newerNsSnap <- readTVar nsSTM
|
ns' <- readTVarIO nsSTM
|
||||||
let
|
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
||||||
-- sorting and taking only k neighbours is taken care of by the
|
either
|
||||||
-- setSuccessors/ setPredecessors functions
|
(const $ pure ())
|
||||||
newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours
|
(\entry -> atomically $ do
|
||||||
newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours
|
latestNs <- readTVar nsSTM
|
||||||
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
|
writeTVar nsSTM $ addPredecessors [entry] latestNs
|
||||||
writeTVar nsSTM newNs
|
)
|
||||||
pure newNs
|
nextEntry
|
||||||
-- delete unresponding nodes from cache as well
|
)
|
||||||
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
|
|
||||||
|
|
||||||
-- try looking up additional neighbours if list too short
|
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||||
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
ns' <- readTVarIO nsSTM
|
||||||
ns' <- readTVarIO nsSTM
|
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
|
||||||
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
either
|
||||||
either
|
(const $ pure ())
|
||||||
(const $ pure ())
|
(\entry -> atomically $ do
|
||||||
(\entry -> atomically $ do
|
latestNs <- readTVar nsSTM
|
||||||
latestNs <- readTVar nsSTM
|
writeTVar nsSTM $ addSuccessors [entry] latestNs
|
||||||
writeTVar nsSTM $ addPredecessors [entry] latestNs
|
)
|
||||||
)
|
nextEntry
|
||||||
nextEntry
|
)
|
||||||
)
|
|
||||||
|
|
||||||
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
newNs <- readTVarIO nsSTM
|
||||||
ns' <- readTVarIO nsSTM
|
|
||||||
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
|
|
||||||
either
|
|
||||||
(const $ pure ())
|
|
||||||
(\entry -> atomically $ do
|
|
||||||
latestNs <- readTVar nsSTM
|
|
||||||
writeTVar nsSTM $ addSuccessors [entry] latestNs
|
|
||||||
)
|
|
||||||
nextEntry
|
|
||||||
)
|
|
||||||
|
|
||||||
newNs <- readTVarIO nsSTM
|
let
|
||||||
|
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs
|
||||||
|
newPredecessor = headMay $ predecessors newNs
|
||||||
|
-- manage need for service data migration:
|
||||||
|
maybe (pure ()) (\newPredecessor' ->
|
||||||
|
when (
|
||||||
|
isJust newPredecessor
|
||||||
|
&& oldPredecessor /= newPredecessor'
|
||||||
|
-- case: predecessor has changed in some way => own responsibility has changed in some way
|
||||||
|
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
|
||||||
|
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
|
||||||
|
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
|
||||||
|
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
|
||||||
|
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
|
||||||
|
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
|
||||||
|
-- TODO: deal with migration failure, e.g retry
|
||||||
|
pure ()
|
||||||
|
)
|
||||||
|
newPredecessor
|
||||||
|
|
||||||
let
|
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
|
||||||
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs
|
threadDelay stabiliseDelay
|
||||||
newPredecessor = headMay $ predecessors newNs
|
|
||||||
-- manage need for service data migration:
|
|
||||||
maybe (pure ()) (\newPredecessor' ->
|
|
||||||
when (
|
|
||||||
isJust newPredecessor
|
|
||||||
&& oldPredecessor /= newPredecessor'
|
|
||||||
-- case: predecessor has changed in some way => own responsibility has changed in some way
|
|
||||||
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
|
|
||||||
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
|
|
||||||
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
|
|
||||||
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
|
|
||||||
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
|
|
||||||
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
|
|
||||||
-- TODO: deal with migration failure, e.g retry
|
|
||||||
pure ()
|
|
||||||
)
|
|
||||||
newPredecessor
|
|
||||||
|
|
||||||
)
|
|
||||||
|
|
||||||
threadDelay . confStabiliseInterval . nodeConfig $ node
|
|
||||||
where
|
where
|
||||||
-- | send a stabilise request to the n-th neighbour
|
-- | send a stabilise request to the n-th neighbour
|
||||||
-- (specified by the provided getter function) and on failure retry
|
-- (specified by the provided getter function) and on failure retry
|
||||||
|
@ -645,23 +605,20 @@ sendThread sock sendQ = forever $ do
|
||||||
sendAllTo sock packet addr
|
sendAllTo sock packet addr
|
||||||
|
|
||||||
-- | Sets up and manages the main server threads of FediChord
|
-- | Sets up and manages the main server threads of FediChord
|
||||||
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO ()
|
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
|
||||||
fediMainThreads sock nodeSTM = do
|
fediMainThreads sock nsSTM = do
|
||||||
node <- readTVarIO nodeSTM
|
ns <- readTVarIO nsSTM
|
||||||
putStrLn "launching threads"
|
putStrLn "launching threads"
|
||||||
sendQ <- newTQueueIO
|
sendQ <- newTQueueIO
|
||||||
recvQ <- newTQueueIO
|
recvQ <- newTQueueIO
|
||||||
-- concurrently launch all handler threads, if one of them throws an exception
|
-- concurrently launch all handler threads, if one of them throws an exception
|
||||||
-- all get cancelled
|
-- all get cancelled
|
||||||
concurrently_
|
concurrently_
|
||||||
(fediMessageHandler sendQ recvQ nodeSTM) $
|
(fediMessageHandler sendQ recvQ nsSTM) $
|
||||||
-- decision whether to [1] launch 1 thread per VS or [2] let a single
|
concurrently_ (stabiliseThread nsSTM) $
|
||||||
-- thread process all VSes sequentially:
|
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
||||||
-- choose option 2 for the sake of limiting concurrency in simulation scenario
|
concurrently_ (convergenceSampleThread nsSTM) $
|
||||||
concurrently_ (stabiliseThread nodeSTM) $
|
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
||||||
concurrently_ (nodeCacheVerifyThread nodeSTM) $
|
|
||||||
concurrently_ (convergenceSampleThread nodeSTM) $
|
|
||||||
concurrently_ (lookupCacheCleanup nodeSTM) $
|
|
||||||
concurrently_
|
concurrently_
|
||||||
(sendThread sock sendQ)
|
(sendThread sock sendQ)
|
||||||
(recvThread sock recvQ)
|
(recvThread sock recvQ)
|
||||||
|
@ -690,17 +647,20 @@ requestMapPurge purgeAge mapVar = forever $ do
|
||||||
fediMessageHandler :: Service s (RealNodeSTM s)
|
fediMessageHandler :: Service s (RealNodeSTM s)
|
||||||
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||||
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
||||||
-> RealNodeSTM s -- ^ node
|
-> LocalNodeStateSTM s -- ^ acting NodeState
|
||||||
-> IO ()
|
-> IO ()
|
||||||
fediMessageHandler sendQ recvQ nodeSTM = do
|
fediMessageHandler sendQ recvQ nsSTM = do
|
||||||
nodeConf <- nodeConfig <$> readTVarIO nodeSTM
|
-- Read node state just once, assuming that all relevant data for this function does
|
||||||
|
-- not change.
|
||||||
|
-- Other functions are passed the nsSTM reference and thus can get the latest state.
|
||||||
|
nsSnap <- readTVarIO nsSTM
|
||||||
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
|
||||||
-- handling multipart messages:
|
-- handling multipart messages:
|
||||||
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
||||||
requestMap <- newMVar (Map.empty :: RequestMap)
|
requestMap <- newMVar (Map.empty :: RequestMap)
|
||||||
-- run receive loop and requestMapPurge concurrently, so that an exception makes
|
-- run receive loop and requestMapPurge concurrently, so that an exception makes
|
||||||
-- both of them fail
|
-- both of them fail
|
||||||
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
|
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
|
||||||
node <- readTVarIO nodeSTM
|
|
||||||
-- wait for incoming messages
|
-- wait for incoming messages
|
||||||
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
||||||
let aMsg = deserialiseMessage rawMsg
|
let aMsg = deserialiseMessage rawMsg
|
||||||
|
@ -710,14 +670,12 @@ fediMessageHandler sendQ recvQ nodeSTM = do
|
||||||
)
|
)
|
||||||
(\validMsg ->
|
(\validMsg ->
|
||||||
case validMsg of
|
case validMsg of
|
||||||
aRequest@Request{} -> case dispatchVS node aRequest of
|
aRequest@Request{}
|
||||||
-- if no match to an active vserver ID, just ignore
|
|
||||||
Nothing -> pure ()
|
|
||||||
-- if not a multipart message, handle immediately. Response is at the same time an ACK
|
-- if not a multipart message, handle immediately. Response is at the same time an ACK
|
||||||
Just nsSTM | part aRequest == 1 && isFinalPart aRequest ->
|
| part aRequest == 1 && isFinalPart aRequest ->
|
||||||
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
|
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
|
||||||
-- otherwise collect all message parts first before handling the whole request
|
-- otherwise collect all message parts first before handling the whole request
|
||||||
Just nsSTM | otherwise -> do
|
| otherwise -> do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
-- critical locking section of requestMap
|
-- critical locking section of requestMap
|
||||||
rMapState <- takeMVar requestMap
|
rMapState <- takeMVar requestMap
|
||||||
|
@ -735,7 +693,7 @@ fediMessageHandler sendQ recvQ nodeSTM = do
|
||||||
-- put map back into MVar, end of critical section
|
-- put map back into MVar, end of critical section
|
||||||
putMVar requestMap newMapState
|
putMVar requestMap newMapState
|
||||||
-- ACK the received part
|
-- ACK the received part
|
||||||
forM_ (ackRequest aRequest) $
|
forM_ (ackRequest (getNid nsSnap) aRequest) $
|
||||||
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
|
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
|
||||||
-- if all parts received, then handle request.
|
-- if all parts received, then handle request.
|
||||||
let
|
let
|
||||||
|
@ -751,8 +709,6 @@ fediMessageHandler sendQ recvQ nodeSTM = do
|
||||||
aMsg
|
aMsg
|
||||||
|
|
||||||
pure ()
|
pure ()
|
||||||
where
|
|
||||||
dispatchVS node req = HMap.lookup (receiverID req) (vservers node)
|
|
||||||
|
|
||||||
|
|
||||||
-- ==== interface to service layer ====
|
-- ==== interface to service layer ====
|
||||||
|
|
|
@ -69,10 +69,10 @@ import Control.Exception
|
||||||
import Data.Foldable (foldr')
|
import Data.Foldable (foldr')
|
||||||
import Data.Function (on)
|
import Data.Function (on)
|
||||||
import qualified Data.Hashable as Hashable
|
import qualified Data.Hashable as Hashable
|
||||||
import Data.HashMap.Strict (HashMap)
|
|
||||||
import qualified Data.HashMap.Strict as HMap
|
|
||||||
import Data.List (delete, nub, sortBy)
|
import Data.List (delete, nub, sortBy)
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
|
import Data.HashMap.Strict (HashMap)
|
||||||
|
import qualified Data.HashMap.Strict as HMap
|
||||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
import Data.Maybe (fromJust, fromMaybe, isJust,
|
||||||
isNothing, mapMaybe)
|
isNothing, mapMaybe)
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
|
@ -153,19 +153,15 @@ a `localCompare` b
|
||||||
-- Also contains shared data and config values.
|
-- Also contains shared data and config values.
|
||||||
-- TODO: more data structures for k-choices bookkeeping
|
-- TODO: more data structures for k-choices bookkeeping
|
||||||
data RealNode s = RealNode
|
data RealNode s = RealNode
|
||||||
{ vservers :: HashMap NodeID (LocalNodeStateSTM s)
|
{ vservers :: HashMap NodeID (LocalNodeStateSTM s)
|
||||||
-- ^ map of all active VServer node IDs to their node state
|
-- ^ map of all active VServer node IDs to their node state
|
||||||
, nodeConfig :: FediChordConf
|
, nodeConfig :: FediChordConf
|
||||||
-- ^ holds the initial configuration read at program start
|
-- ^ holds the initial configuration read at program start
|
||||||
, bootstrapNodes :: [(String, PortNumber)]
|
, bootstrapNodes :: [(String, PortNumber)]
|
||||||
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
|
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
|
||||||
, lookupCacheSTM :: TVar LookupCache
|
, lookupCacheSTM :: TVar LookupCache
|
||||||
-- ^ a global cache of looked up keys and their associated nodes
|
-- ^ a global cache of looked up keys and their associated nodes
|
||||||
, globalNodeCacheSTM :: TVar NodeCache
|
, nodeService :: s (RealNodeSTM s)
|
||||||
-- ^ EpiChord node cache with expiry times for nodes.
|
|
||||||
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
|
||||||
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
|
|
||||||
, nodeService :: s (RealNodeSTM s)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type RealNodeSTM s = TVar (RealNode s)
|
type RealNodeSTM s = TVar (RealNode s)
|
||||||
|
@ -194,9 +190,9 @@ data LocalNodeState s = LocalNodeState
|
||||||
{ nodeState :: RemoteNodeState
|
{ nodeState :: RemoteNodeState
|
||||||
-- ^ represents common data present both in remote and local node representations
|
-- ^ represents common data present both in remote and local node representations
|
||||||
, nodeCacheSTM :: TVar NodeCache
|
, nodeCacheSTM :: TVar NodeCache
|
||||||
-- ^ reference to the 'globalNodeCacheSTM'
|
-- ^ EpiChord node cache with expiry times for nodes
|
||||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||||
-- ^ reference to the 'globalCacheWriteQueue
|
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||||
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
|
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
|
||||||
-- ^ successor nodes in ascending order by distance
|
-- ^ successor nodes in ascending order by distance
|
||||||
, predecessors :: [RemoteNodeState]
|
, predecessors :: [RemoteNodeState]
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue