diff --git a/app/Main.hs b/app/Main.hs index ed599f8..eac223d 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -18,10 +18,29 @@ main = do -- ToDo: parse and pass config -- probably use `tomland` for that (fConf, sConf) <- readConfig + -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … - (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) - -- wait for all DHT threads to terminate, this keeps the main thread running - wait fediThreads + (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) + -- currently no masking is necessary, as there is nothing to clean up + nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode + -- try joining the DHT using one of the provided bootstrapping nodes + joinedState <- tryBootstrapJoining thisNode + either (\err -> do + -- handle unsuccessful join + + putStrLn $ err <> " Error joining, start listening for incoming requests anyways" + print =<< readTVarIO thisNode + -- launch thread attempting to join on new cache entries + _ <- forkIO $ joinOnNewEntriesThread thisNode + wait =<< async (fediMainThreads serverSock thisNode) + ) + (\joinedNS -> do + -- launch main eventloop with successfully joined state + putStrLn "successful join" + wait =<< async (fediMainThreads serverSock thisNode) + ) + joinedState + pure () readConfig :: IO (FediChordConf, ServiceConf) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index f462a26..38c0dcb 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -241,16 +241,16 @@ sendMessageSize = 1200 -- ====== message send and receive operations ====== -- encode the response to a request that just signals successful receipt -ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString -ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { +ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString +ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { requestID = requestID req - , senderID = receiverID req + , senderID = ownID , part = part req , isFinalPart = False , action = action req , payload = Nothing } -ackRequest _ = Map.empty +ackRequest _ _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue @@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do Nothing -> pure () Just aPart -> do let (SockAddrInet6 _ _ sourceIP _) = sourceAddr - queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) + queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue maybe (pure ()) ( mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) @@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions - mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ + mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) else do @@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do _ -> Set.empty -- forward entries to global cache - queueAddEntries entrySet (cacheWriteQueue ns) + queueAddEntries entrySet ns -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result @@ -670,7 +670,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache queueAddEntries :: Foldable c => c RemoteCacheEntry - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () -queueAddEntries entries cacheQ = do +queueAddEntries entries ns = do now <- getPOSIXTime - forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry + forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry -- | enque a list of node IDs to be deleted from the global NodeCache queueDeleteEntries :: Foldable c => c NodeID - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () -queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry +queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry -- | enque a single node ID to be deleted from the global NodeCache queueDeleteEntry :: NodeID - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete @@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete -- global 'NodeCache'. queueUpdateVerifieds :: Foldable c => c NodeID - -> TQueue (NodeCache -> NodeCache) + -> LocalNodeState s -> IO () -queueUpdateVerifieds nIds cacheQ = do +queueUpdateVerifieds nIds ns = do now <- getPOSIXTime - forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ + forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ markCacheEntryAsVerified (Just now) nid' -- | retry an IO action at most *i* times until it delivers a result diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 9baf160..0dcba44 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -69,12 +69,12 @@ import qualified Data.ByteString.UTF8 as BSU import Data.Either (rights) import Data.Foldable (foldr') import Data.Functor.Identity -import Data.HashMap.Strict (HashMap) -import qualified Data.HashMap.Strict as HMap import Data.IP (IPv6, fromHostAddress6, toHostAddress6) import Data.List ((\\)) import qualified Data.Map.Strict as Map +import qualified Data.HashMap.Strict as HMap +import Data.HashMap.Strict (HashMap) import Data.Maybe (catMaybes, fromJust, fromMaybe, isJust, isNothing, mapMaybe) import qualified Data.Set as Set @@ -98,59 +98,38 @@ import Debug.Trace (trace) fediChordInit :: (Service s (RealNodeSTM s)) => FediChordConf -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service - -> IO (Async (), RealNodeSTM s) + -> IO (Socket, RealNodeSTM s) fediChordInit initConf serviceRunner = do emptyLookupCache <- newTVarIO Map.empty - cacheSTM <- newTVarIO initCache - cacheQ <- atomically newTQueue - let realNode = RealNode - { vservers = HMap.empty + let realNode = RealNode { + vservers = HMap.empty , nodeConfig = initConf , bootstrapNodes = confBootstrapNodes initConf , lookupCacheSTM = emptyLookupCache , nodeService = undefined - , globalNodeCacheSTM = cacheSTM - , globalCacheWriteQueue = cacheQ - } + } realNodeSTM <- newTVarIO realNode - serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf) -- launch service and set the reference in the RealNode serv <- serviceRunner realNodeSTM atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv } - -- prepare for joining: start node cache writer thread - -- currently no masking is necessary, as there is nothing to clean up - nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM -- TODO: k-choices way of joining, so far just initialise a single vserver - firstVS <- nodeStateInit realNodeSTM 0 + firstVS <- nodeStateInit realNodeSTM firstVSSTM <- newTVarIO firstVS -- add vserver to list at RealNode atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) } - -- try joining the DHT using one of the provided bootstrapping nodes - joinedState <- tryBootstrapJoining firstVSSTM - fediThreadsAsync <- either (\err -> do - -- handle unsuccessful join - - putStrLn $ err <> " Error joining, start listening for incoming requests anyways" - -- launch thread attempting to join on new cache entries - _ <- forkIO $ joinOnNewEntriesThread firstVSSTM - async (fediMainThreads serverSock realNodeSTM) - ) - (\joinedNS -> do - -- launch main eventloop with successfully joined state - putStrLn "successful join" - async (fediMainThreads serverSock realNodeSTM) - ) - joinedState - pure (fediThreadsAsync, realNodeSTM) + serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf) + pure (serverSock, realNodeSTM) -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. -nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s) -nodeStateInit realNodeSTM vsID' = do +nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s) +nodeStateInit realNodeSTM = do realNode <- readTVarIO realNodeSTM + cacheSTM <- newTVarIO initCache + q <- atomically newTQueue let conf = nodeConfig realNode - vsID = vsID' + vsID = 0 containedState = RemoteNodeState { domain = confDomain conf , ipAddr = confIP conf @@ -161,8 +140,8 @@ nodeStateInit realNodeSTM vsID' = do } initialState = LocalNodeState { nodeState = containedState - , nodeCacheSTM = globalNodeCacheSTM realNode - , cacheWriteQueue = globalCacheWriteQueue realNode + , nodeCacheSTM = cacheSTM + , cacheWriteQueue = q , successors = [] , predecessors = [] , kNeighbours = 3 @@ -195,36 +174,33 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () -convergenceSampleThread nodeSTM = forever $ do - node <- readTVarIO nodeSTM - forM_ (vservers node) $ \nsSTM -> do - nsSnap <- readTVarIO nsSTM - parentNode <- readTVarIO $ parentRealNode nsSnap - if isJoined nsSnap - then - runExceptT (do - -- joined node: choose random node, do queryIDLoop, compare result with own responsibility - let bss = bootstrapNodes parentNode - randIndex <- liftIO $ randomRIO (0, length bss - 1) - chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) - currentlyResponsible <- liftEither lookupResult - if getNid currentlyResponsible /= getNid nsSnap - -- if mismatch, stabilise on the result, else do nothing - then do - stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible - (preds, succs) <- liftEither stabResult - -- TODO: verify neighbours before adding, see #55 - liftIO . atomically $ do - ns <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors preds ns - else pure () - ) >> pure () - -- unjoined node: try joining through all bootstrapping nodes - else tryBootstrapJoining nsSTM >> pure () - - let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node +convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () +convergenceSampleThread nsSTM = forever $ do + nsSnap <- readTVarIO nsSTM + parentNode <- readTVarIO $ parentRealNode nsSnap + if isJoined nsSnap + then + runExceptT (do + -- joined node: choose random node, do queryIDLoop, compare result with own responsibility + let bss = bootstrapNodes parentNode + randIndex <- liftIO $ randomRIO (0, length bss - 1) + chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex + lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) + currentlyResponsible <- liftEither lookupResult + if getNid currentlyResponsible /= getNid nsSnap + -- if mismatch, stabilise on the result, else do nothing + then do + stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible + (preds, succs) <- liftEither stabResult + -- TODO: verify neighbours before adding, see #55 + liftIO . atomically $ do + ns <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors preds ns + else pure () + ) >> pure () + -- unjoined node: try joining through all bootstrapping nodes + else tryBootstrapJoining nsSTM >> pure () + let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode threadDelay delaySecs @@ -360,81 +336,68 @@ joinOnNewEntriesThread nsSTM = loop -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -nodeCacheWriter :: RealNodeSTM s -> IO () -nodeCacheWriter nodeSTM = do - node <- readTVarIO nodeSTM +nodeCacheWriter :: LocalNodeStateSTM s -> IO () +nodeCacheWriter nsSTM = forever $ atomically $ do - cacheModifier <- readTQueue $ globalCacheWriteQueue node - modifyTVar' (globalNodeCacheSTM node) cacheModifier + ns <- readTVar nsSTM + cacheModifier <- readTQueue $ cacheWriteQueue ns + modifyTVar' (nodeCacheSTM ns) cacheModifier -- | Periodically iterate through cache, clean up expired entries and verify unverified ones -nodeCacheVerifyThread :: RealNodeSTM s -> IO () -nodeCacheVerifyThread nodeSTM = forever $ do - (node, firstVSSTM) <- atomically $ do - node <- readTVar nodeSTM - case headMay (HMap.elems $ vservers node) of - -- wait until first VS is joined - Nothing -> retry - Just vs' -> pure (node, vs') - let - maxEntryAge = confMaxNodeCacheAge $ nodeConfig node - cacheQ = globalCacheWriteQueue node - cache <- readTVarIO $ globalNodeCacheSTM node - -- always use the first active VS as a sender for operations like Ping - firstVS <- readTVarIO firstVSSTM +nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () +nodeCacheVerifyThread nsSTM = forever $ do + -- get cache + (ns, cache, maxEntryAge) <- atomically $ do + ns <- readTVar nsSTM + cache <- readTVar $ nodeCacheSTM ns + maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) + pure (ns, cache, maxEntryAge) -- iterate entries: -- for avoiding too many time syscalls, get current time before iterating. now <- getPOSIXTime - forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) -> + forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) -> -- case too old: delete (future work: decide whether pinging and resetting timestamp is better) if (now - ts) > maxEntryAge then - queueDeleteEntry (getNid cacheNode) cacheQ - -- case unverified: try verifying, otherwise delete + queueDeleteEntry (getNid node) ns + -- case unverified: try verifying, otherwise delete else if not validated then do -- marking as verified is done by 'requestPing' as well - pong <- requestPing firstVS cacheNode + pong <- requestPing ns node either (\_-> - queueDeleteEntry (getNid cacheNode) cacheQ + queueDeleteEntry (getNid node) ns ) (\vss -> - if cacheNode `notElem` vss - then queueDeleteEntry (getNid cacheNode) cacheQ + if node `notElem` vss + then queueDeleteEntry (getNid node) ns -- after verifying a node, check whether it can be a closer neighbour - -- do this for each node - -- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor - else forM_ (vservers node) (\nsSTM -> do - ns <- readTVarIO nsSTM - if cacheNode `isPossiblePredecessor` ns + else do + if node `isPossiblePredecessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [cacheNode] ns' + writeTVar nsSTM $ addPredecessors [node] ns' else pure () - if cacheNode `isPossibleSuccessor` ns + if node `isPossibleSuccessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [cacheNode] ns' + writeTVar nsSTM $ addSuccessors [node] ns' else pure () - ) ) pong else pure () ) -- check the cache invariant per slice and, if necessary, do a single lookup to the -- middle of each slice not verifying the invariant - latestNode <- readTVarIO nodeSTM - forM_ (vservers latestNode) (\nsSTM -> do - latestNs <- readTVarIO nsSTM - latestCache <- readTVarIO $ nodeCacheSTM latestNs - let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of - FOUND node -> [node] - FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet - forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> - forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle - ) - ) + latestNs <- readTVarIO nsSTM + latestCache <- readTVarIO $ nodeCacheSTM latestNs + let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of + FOUND node -> [node] + FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet + forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> + forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle + ) threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds @@ -498,93 +461,90 @@ checkCacheSliceInvariants ns -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. -stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () -stabiliseThread nodeSTM = forever $ do - node <- readTVarIO nodeSTM - forM_ (vservers node) (\nsSTM -> do - oldNs <- readTVarIO nsSTM +stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () +stabiliseThread nsSTM = forever $ do + oldNs <- readTVarIO nsSTM - -- iterate through the same snapshot, collect potential new neighbours - -- and nodes to be deleted, and modify these changes only at the end of - -- each stabilise run. - -- This decision makes iterating through a potentially changing list easier. + -- iterate through the same snapshot, collect potential new neighbours + -- and nodes to be deleted, and modify these changes only at the end of + -- each stabilise run. + -- This decision makes iterating through a potentially changing list easier. - -- don't contact all neighbours unless the previous one failed/ Left ed + -- don't contact all neighbours unless the previous one failed/ Left ed - predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] - succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + let + (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise + (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise + allDeletes = predDeletes <> succDeletes + allNeighbours = predNeighbours <> succNeighbours + + -- now actually modify the node state's neighbours + updatedNs <- atomically $ do + newerNsSnap <- readTVar nsSTM let - (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise - (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise - allDeletes = predDeletes <> succDeletes - allNeighbours = predNeighbours <> succNeighbours + -- sorting and taking only k neighbours is taken care of by the + -- setSuccessors/ setPredecessors functions + newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours + newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours + newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap + writeTVar nsSTM newNs + pure newNs + -- delete unresponding nodes from cache as well + mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes - -- now actually modify the node state's neighbours - updatedNs <- atomically $ do - newerNsSnap <- readTVar nsSTM - let - -- sorting and taking only k neighbours is taken care of by the - -- setSuccessors/ setPredecessors functions - newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours - newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours - newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap - writeTVar nsSTM newNs - pure newNs - -- delete unresponding nodes from cache as well - mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes + -- try looking up additional neighbours if list too short + forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry + ) - -- try looking up additional neighbours if list too short - forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [entry] latestNs - ) - nextEntry - ) + forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry + ) - forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [entry] latestNs - ) - nextEntry - ) + newNs <- readTVarIO nsSTM - newNs <- readTVarIO nsSTM + let + oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs + newPredecessor = headMay $ predecessors newNs + -- manage need for service data migration: + maybe (pure ()) (\newPredecessor' -> + when ( + isJust newPredecessor + && oldPredecessor /= newPredecessor' + -- case: predecessor has changed in some way => own responsibility has changed in some way + -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data + -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy + -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it + && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) + migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') + -- TODO: deal with migration failure, e.g retry + pure () + ) + newPredecessor - let - oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs - newPredecessor = headMay $ predecessors newNs - -- manage need for service data migration: - maybe (pure ()) (\newPredecessor' -> - when ( - isJust newPredecessor - && oldPredecessor /= newPredecessor' - -- case: predecessor has changed in some way => own responsibility has changed in some way - -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data - -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy - -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it - && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do - ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) - migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') - -- TODO: deal with migration failure, e.g retry - pure () - ) - newPredecessor - - ) - - threadDelay . confStabiliseInterval . nodeConfig $ node + stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) + threadDelay stabiliseDelay where -- | send a stabilise request to the n-th neighbour -- (specified by the provided getter function) and on failure retry @@ -645,23 +605,20 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO () -fediMainThreads sock nodeSTM = do - node <- readTVarIO nodeSTM +fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () +fediMainThreads sock nsSTM = do + ns <- readTVarIO nsSTM putStrLn "launching threads" sendQ <- newTQueueIO recvQ <- newTQueueIO -- concurrently launch all handler threads, if one of them throws an exception -- all get cancelled concurrently_ - (fediMessageHandler sendQ recvQ nodeSTM) $ - -- decision whether to [1] launch 1 thread per VS or [2] let a single - -- thread process all VSes sequentially: - -- choose option 2 for the sake of limiting concurrency in simulation scenario - concurrently_ (stabiliseThread nodeSTM) $ - concurrently_ (nodeCacheVerifyThread nodeSTM) $ - concurrently_ (convergenceSampleThread nodeSTM) $ - concurrently_ (lookupCacheCleanup nodeSTM) $ + (fediMessageHandler sendQ recvQ nsSTM) $ + concurrently_ (stabiliseThread nsSTM) $ + concurrently_ (nodeCacheVerifyThread nsSTM) $ + concurrently_ (convergenceSampleThread nsSTM) $ + concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ concurrently_ (sendThread sock sendQ) (recvThread sock recvQ) @@ -690,17 +647,20 @@ requestMapPurge purgeAge mapVar = forever $ do fediMessageHandler :: Service s (RealNodeSTM s) => TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue - -> RealNodeSTM s -- ^ node + -> LocalNodeStateSTM s -- ^ acting NodeState -> IO () -fediMessageHandler sendQ recvQ nodeSTM = do - nodeConf <- nodeConfig <$> readTVarIO nodeSTM +fediMessageHandler sendQ recvQ nsSTM = do + -- Read node state just once, assuming that all relevant data for this function does + -- not change. + -- Other functions are passed the nsSTM reference and thus can get the latest state. + nsSnap <- readTVarIO nsSTM + nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap) -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) -- run receive loop and requestMapPurge concurrently, so that an exception makes -- both of them fail concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do - node <- readTVarIO nodeSTM -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg @@ -710,14 +670,12 @@ fediMessageHandler sendQ recvQ nodeSTM = do ) (\validMsg -> case validMsg of - aRequest@Request{} -> case dispatchVS node aRequest of - -- if no match to an active vserver ID, just ignore - Nothing -> pure () + aRequest@Request{} -- if not a multipart message, handle immediately. Response is at the same time an ACK - Just nsSTM | part aRequest == 1 && isFinalPart aRequest -> + | part aRequest == 1 && isFinalPart aRequest -> forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () -- otherwise collect all message parts first before handling the whole request - Just nsSTM | otherwise -> do + | otherwise -> do now <- getPOSIXTime -- critical locking section of requestMap rMapState <- takeMVar requestMap @@ -735,7 +693,7 @@ fediMessageHandler sendQ recvQ nodeSTM = do -- put map back into MVar, end of critical section putMVar requestMap newMapState -- ACK the received part - forM_ (ackRequest aRequest) $ + forM_ (ackRequest (getNid nsSnap) aRequest) $ \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) -- if all parts received, then handle request. let @@ -751,8 +709,6 @@ fediMessageHandler sendQ recvQ nodeSTM = do aMsg pure () - where - dispatchVS node req = HMap.lookup (receiverID req) (vservers node) -- ==== interface to service layer ==== diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 04396d6..fd9d0f9 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -69,10 +69,10 @@ import Control.Exception import Data.Foldable (foldr') import Data.Function (on) import qualified Data.Hashable as Hashable -import Data.HashMap.Strict (HashMap) -import qualified Data.HashMap.Strict as HMap import Data.List (delete, nub, sortBy) import qualified Data.Map.Strict as Map +import Data.HashMap.Strict (HashMap) +import qualified Data.HashMap.Strict as HMap import Data.Maybe (fromJust, fromMaybe, isJust, isNothing, mapMaybe) import qualified Data.Set as Set @@ -153,19 +153,15 @@ a `localCompare` b -- Also contains shared data and config values. -- TODO: more data structures for k-choices bookkeeping data RealNode s = RealNode - { vservers :: HashMap NodeID (LocalNodeStateSTM s) + { vservers :: HashMap NodeID (LocalNodeStateSTM s) -- ^ map of all active VServer node IDs to their node state - , nodeConfig :: FediChordConf + , nodeConfig :: FediChordConf -- ^ holds the initial configuration read at program start - , bootstrapNodes :: [(String, PortNumber)] + , bootstrapNodes :: [(String, PortNumber)] -- ^ nodes to be used as bootstrapping points, new ones learned during operation - , lookupCacheSTM :: TVar LookupCache + , lookupCacheSTM :: TVar LookupCache -- ^ a global cache of looked up keys and their associated nodes - , globalNodeCacheSTM :: TVar NodeCache - -- ^ EpiChord node cache with expiry times for nodes. - , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'globalNodeCacheSTM' - , nodeService :: s (RealNodeSTM s) + , nodeService :: s (RealNodeSTM s) } type RealNodeSTM s = TVar (RealNode s) @@ -194,9 +190,9 @@ data LocalNodeState s = LocalNodeState { nodeState :: RemoteNodeState -- ^ represents common data present both in remote and local node representations , nodeCacheSTM :: TVar NodeCache - -- ^ reference to the 'globalNodeCacheSTM' + -- ^ EpiChord node cache with expiry times for nodes , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ reference to the 'globalCacheWriteQueue + -- ^ cache updates are not written directly to the 'nodeCache' but queued and , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well -- ^ successor nodes in ascending order by distance , predecessors :: [RemoteNodeState]