re-strucuture fediChordInit flow to also do the bootstrapping

This commit is contained in:
Trolli Schmittlauch 2020-09-20 19:30:35 +02:00
parent 12dfc56a73
commit 0ab6ee9c8f
3 changed files with 52 additions and 43 deletions

View file

@ -18,29 +18,10 @@ main = do
-- ToDo: parse and pass config
-- probably use `tomland` for that
(fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode
either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
wait =<< async (fediMainThreads serverSock thisNode)
)
joinedState
pure ()
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- wait for all DHT threads to terminate, this keeps the main thread running
wait fediThreads
readConfig :: IO (FediChordConf, ServiceConf)

View file

@ -98,38 +98,59 @@ import Debug.Trace (trace)
fediChordInit :: (Service s (RealNodeSTM s))
=> FediChordConf
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
-> IO (Socket, RealNodeSTM s)
-> IO (Async (), RealNodeSTM s)
fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode {
vservers = HMap.empty
cacheSTM <- newTVarIO initCache
cacheQ <- atomically newTQueue
let realNode = RealNode
{ vservers = HMap.empty
, nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache
, nodeService = undefined
, globalNodeCacheSTM = cacheSTM
, globalCacheWriteQueue = cacheQ
}
realNodeSTM <- newTVarIO realNode
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
-- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
-- prepare for joining: start node cache writer thread
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
-- TODO: k-choices way of joining, so far just initialise a single vserver
firstVS <- nodeStateInit realNodeSTM
firstVS <- nodeStateInit realNodeSTM 0
firstVSSTM <- newTVarIO firstVS
-- add vserver to list at RealNode
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
pure (serverSock, realNodeSTM)
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining firstVSSTM
fediThreadsAsync <- either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
async (fediMainThreads serverSock firstVSSTM)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
async (fediMainThreads serverSock firstVSSTM)
)
joinedState
pure (fediThreadsAsync, realNodeSTM)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s)
nodeStateInit realNodeSTM = do
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s)
nodeStateInit realNodeSTM vsID' = do
realNode <- readTVarIO realNodeSTM
cacheSTM <- newTVarIO initCache
q <- atomically newTQueue
let
conf = nodeConfig realNode
vsID = 0
vsID = vsID'
containedState = RemoteNodeState {
domain = confDomain conf
, ipAddr = confIP conf
@ -140,8 +161,8 @@ nodeStateInit realNodeSTM = do
}
initialState = LocalNodeState {
nodeState = containedState
, nodeCacheSTM = cacheSTM
, cacheWriteQueue = q
, nodeCacheSTM = globalNodeCacheSTM realNode
, cacheWriteQueue = globalCacheWriteQueue realNode
, successors = []
, predecessors = []
, kNeighbours = 3
@ -336,12 +357,12 @@ joinOnNewEntriesThread nsSTM = loop
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
nodeCacheWriter nsSTM =
nodeCacheWriter :: RealNodeSTM s -> IO ()
nodeCacheWriter nodeSTM = do
node <- readTVarIO nodeSTM
forever $ atomically $ do
ns <- readTVar nsSTM
cacheModifier <- readTQueue $ cacheWriteQueue ns
modifyTVar' (nodeCacheSTM ns) cacheModifier
cacheModifier <- readTQueue $ globalCacheWriteQueue node
modifyTVar' (globalNodeCacheSTM node) cacheModifier
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones

View file

@ -161,6 +161,13 @@ data RealNode s = RealNode
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes
, globalNodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes.
-- Shared between all vservers, each 'LocalNodeState' holds a reference to
-- the same TVar for avoiding unnecessary reads of parent node
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
-- but queued and processed by a single thread
, nodeService :: s (RealNodeSTM s)
}
@ -190,9 +197,9 @@ data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes
-- ^ reference to the 'globalNodeCacheSTM'
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- ^ reference to the 'globalCacheWriteQueue
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState]