diff --git a/app/Main.hs b/app/Main.hs index eac223d..ed599f8 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -18,29 +18,10 @@ main = do -- ToDo: parse and pass config -- probably use `tomland` for that (fConf, sConf) <- readConfig - -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … - (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) - -- currently no masking is necessary, as there is nothing to clean up - nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode - -- try joining the DHT using one of the provided bootstrapping nodes - joinedState <- tryBootstrapJoining thisNode - either (\err -> do - -- handle unsuccessful join - - putStrLn $ err <> " Error joining, start listening for incoming requests anyways" - print =<< readTVarIO thisNode - -- launch thread attempting to join on new cache entries - _ <- forkIO $ joinOnNewEntriesThread thisNode - wait =<< async (fediMainThreads serverSock thisNode) - ) - (\joinedNS -> do - -- launch main eventloop with successfully joined state - putStrLn "successful join" - wait =<< async (fediMainThreads serverSock thisNode) - ) - joinedState - pure () + (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) + -- wait for all DHT threads to terminate, this keeps the main thread running + wait fediThreads readConfig :: IO (FediChordConf, ServiceConf) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 0dcba44..6f9caf6 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -98,38 +98,59 @@ import Debug.Trace (trace) fediChordInit :: (Service s (RealNodeSTM s)) => FediChordConf -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service - -> IO (Socket, RealNodeSTM s) + -> IO (Async (), RealNodeSTM s) fediChordInit initConf serviceRunner = do emptyLookupCache <- newTVarIO Map.empty - let realNode = RealNode { - vservers = HMap.empty + cacheSTM <- newTVarIO initCache + cacheQ <- atomically newTQueue + let realNode = RealNode + { vservers = HMap.empty , nodeConfig = initConf , bootstrapNodes = confBootstrapNodes initConf , lookupCacheSTM = emptyLookupCache , nodeService = undefined - } + , globalNodeCacheSTM = cacheSTM + , globalCacheWriteQueue = cacheQ + } realNodeSTM <- newTVarIO realNode + serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf) -- launch service and set the reference in the RealNode serv <- serviceRunner realNodeSTM atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv } + -- prepare for joining: start node cache writer thread + -- currently no masking is necessary, as there is nothing to clean up + nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM -- TODO: k-choices way of joining, so far just initialise a single vserver - firstVS <- nodeStateInit realNodeSTM + firstVS <- nodeStateInit realNodeSTM 0 firstVSSTM <- newTVarIO firstVS -- add vserver to list at RealNode atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) } - serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf) - pure (serverSock, realNodeSTM) + -- try joining the DHT using one of the provided bootstrapping nodes + joinedState <- tryBootstrapJoining firstVSSTM + fediThreadsAsync <- either (\err -> do + -- handle unsuccessful join + + putStrLn $ err <> " Error joining, start listening for incoming requests anyways" + -- launch thread attempting to join on new cache entries + _ <- forkIO $ joinOnNewEntriesThread firstVSSTM + async (fediMainThreads serverSock firstVSSTM) + ) + (\joinedNS -> do + -- launch main eventloop with successfully joined state + putStrLn "successful join" + async (fediMainThreads serverSock firstVSSTM) + ) + joinedState + pure (fediThreadsAsync, realNodeSTM) -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. -nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s) -nodeStateInit realNodeSTM = do +nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s) +nodeStateInit realNodeSTM vsID' = do realNode <- readTVarIO realNodeSTM - cacheSTM <- newTVarIO initCache - q <- atomically newTQueue let conf = nodeConfig realNode - vsID = 0 + vsID = vsID' containedState = RemoteNodeState { domain = confDomain conf , ipAddr = confIP conf @@ -140,8 +161,8 @@ nodeStateInit realNodeSTM = do } initialState = LocalNodeState { nodeState = containedState - , nodeCacheSTM = cacheSTM - , cacheWriteQueue = q + , nodeCacheSTM = globalNodeCacheSTM realNode + , cacheWriteQueue = globalCacheWriteQueue realNode , successors = [] , predecessors = [] , kNeighbours = 3 @@ -336,12 +357,12 @@ joinOnNewEntriesThread nsSTM = loop -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -nodeCacheWriter :: LocalNodeStateSTM s -> IO () -nodeCacheWriter nsSTM = +nodeCacheWriter :: RealNodeSTM s -> IO () +nodeCacheWriter nodeSTM = do + node <- readTVarIO nodeSTM forever $ atomically $ do - ns <- readTVar nsSTM - cacheModifier <- readTQueue $ cacheWriteQueue ns - modifyTVar' (nodeCacheSTM ns) cacheModifier + cacheModifier <- readTQueue $ globalCacheWriteQueue node + modifyTVar' (globalNodeCacheSTM node) cacheModifier -- | Periodically iterate through cache, clean up expired entries and verify unverified ones diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index fd9d0f9..a1c0937 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -161,6 +161,13 @@ data RealNode s = RealNode -- ^ nodes to be used as bootstrapping points, new ones learned during operation , lookupCacheSTM :: TVar LookupCache -- ^ a global cache of looked up keys and their associated nodes + , globalNodeCacheSTM :: TVar NodeCache + -- ^ EpiChord node cache with expiry times for nodes. + -- Shared between all vservers, each 'LocalNodeState' holds a reference to + -- the same TVar for avoiding unnecessary reads of parent node + , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) + -- ^ cache updates are not written directly to the 'globalNodeCacheSTM' + -- but queued and processed by a single thread , nodeService :: s (RealNodeSTM s) } @@ -190,9 +197,9 @@ data LocalNodeState s = LocalNodeState { nodeState :: RemoteNodeState -- ^ represents common data present both in remote and local node representations , nodeCacheSTM :: TVar NodeCache - -- ^ EpiChord node cache with expiry times for nodes + -- ^ reference to the 'globalNodeCacheSTM' , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'nodeCache' but queued and + -- ^ reference to the 'globalCacheWriteQueue , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well -- ^ successor nodes in ascending order by distance , predecessors :: [RemoteNodeState]