From 33ae904d17055c37225b165440058d3e816aff42 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 21 Sep 2020 02:11:43 +0200 Subject: [PATCH] re-structure cacheVerifyThread to work on a RealNode and iterate over all joined vservers contributes to #34 --- src/Hash2Pub/DHTProtocol.hs | 26 ++--- src/Hash2Pub/FediChord.hs | 225 +++++++++++++++++++----------------- 2 files changed, 135 insertions(+), 116 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index eca145a..f462a26 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do Nothing -> pure () Just aPart -> do let (SockAddrInet6 _ _ sourceIP _) = sourceAddr - queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns + queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue maybe (pure ()) ( mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) @@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions - mapM_ (\f -> f joinedState) cacheInsertQ + mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) else do @@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do _ -> Set.empty -- forward entries to global cache - queueAddEntries entrySet ns + queueAddEntries entrySet (cacheWriteQueue ns) -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result @@ -670,7 +670,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache queueAddEntries :: Foldable c => c RemoteCacheEntry - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () -queueAddEntries entries ns = do +queueAddEntries entries cacheQ = do now <- getPOSIXTime - forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry + forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry -- | enque a list of node IDs to be deleted from the global NodeCache queueDeleteEntries :: Foldable c => c NodeID - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () -queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry +queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry -- | enque a single node ID to be deleted from the global NodeCache queueDeleteEntry :: NodeID - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete @@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete -- global 'NodeCache'. queueUpdateVerifieds :: Foldable c => c NodeID - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () -queueUpdateVerifieds nIds ns = do +queueUpdateVerifieds nIds cacheQ = do now <- getPOSIXTime - forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ + forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ markCacheEntryAsVerified (Just now) nid' -- | retry an IO action at most *i* times until it delivers a result diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index ee5e9b6..9565bae 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -366,59 +366,72 @@ nodeCacheWriter nodeSTM = do -- | Periodically iterate through cache, clean up expired entries and verify unverified ones -nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () -nodeCacheVerifyThread nsSTM = forever $ do - -- get cache - (ns, cache, maxEntryAge) <- atomically $ do - ns <- readTVar nsSTM - cache <- readTVar $ nodeCacheSTM ns - maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) - pure (ns, cache, maxEntryAge) +nodeCacheVerifyThread :: RealNodeSTM s -> IO () +nodeCacheVerifyThread nodeSTM = forever $ do + (node, firstVSSTM) <- atomically $ do + node <- readTVar nodeSTM + case headMay (HMap.elems $ vservers node) of + -- wait until first VS is joined + Nothing -> retry + Just vs' -> pure (node, vs') + let + maxEntryAge = confMaxNodeCacheAge $ nodeConfig node + cacheQ = globalCacheWriteQueue node + cache <- readTVarIO $ globalNodeCacheSTM node + -- always use the first active VS as a sender for operations like Ping + firstVS <- readTVarIO firstVSSTM -- iterate entries: -- for avoiding too many time syscalls, get current time before iterating. now <- getPOSIXTime - forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) -> + forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) -> -- case too old: delete (future work: decide whether pinging and resetting timestamp is better) if (now - ts) > maxEntryAge then - queueDeleteEntry (getNid node) ns - -- case unverified: try verifying, otherwise delete + queueDeleteEntry (getNid cacheNode) cacheQ + -- case unverified: try verifying, otherwise delete else if not validated then do -- marking as verified is done by 'requestPing' as well - pong <- requestPing ns node + pong <- requestPing firstVS cacheNode either (\_-> - queueDeleteEntry (getNid node) ns + queueDeleteEntry (getNid cacheNode) cacheQ ) (\vss -> - if node `notElem` vss - then queueDeleteEntry (getNid node) ns + if cacheNode `notElem` vss + then queueDeleteEntry (getNid cacheNode) firstVS -- after verifying a node, check whether it can be a closer neighbour - else do - if node `isPossiblePredecessor` ns + -- do this for each node + -- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor + else forM_ (vservers node) (\nsSTM -> do + ns <- readTVarIO nsSTM + if cacheNode `isPossiblePredecessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [node] ns' + writeTVar nsSTM $ addPredecessors [cacheNode] ns' else pure () - if node `isPossibleSuccessor` ns + if cacheNode `isPossibleSuccessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [node] ns' + writeTVar nsSTM $ addSuccessors [cacheNode] ns' else pure () + ) ) pong else pure () ) -- check the cache invariant per slice and, if necessary, do a single lookup to the -- middle of each slice not verifying the invariant - latestNs <- readTVarIO nsSTM - latestCache <- readTVarIO $ nodeCacheSTM latestNs - let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of - FOUND node -> [node] - FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet - forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> - forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle - ) + latestNode <- readTVarIO nodeSTM + forM_ (vservers latestNode) (\nsSTM -> do + latestNs <- readTVarIO nsSTM + latestCache <- readTVarIO $ nodeCacheSTM latestNs + let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of + FOUND node -> [node] + FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet + forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> + forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle + ) + ) threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds @@ -482,90 +495,93 @@ checkCacheSliceInvariants ns -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. -stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () -stabiliseThread nsSTM = forever $ do - oldNs <- readTVarIO nsSTM +stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () +stabiliseThread nodeSTM = forever $ do + node <- readTVarIO nodeSTM + forM_ (vservers node) (\nsSTM -> do + oldNs <- readTVarIO nsSTM - -- iterate through the same snapshot, collect potential new neighbours - -- and nodes to be deleted, and modify these changes only at the end of - -- each stabilise run. - -- This decision makes iterating through a potentially changing list easier. + -- iterate through the same snapshot, collect potential new neighbours + -- and nodes to be deleted, and modify these changes only at the end of + -- each stabilise run. + -- This decision makes iterating through a potentially changing list easier. - -- don't contact all neighbours unless the previous one failed/ Left ed + -- don't contact all neighbours unless the previous one failed/ Left ed - predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] - succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] - let - (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise - (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise - allDeletes = predDeletes <> succDeletes - allNeighbours = predNeighbours <> succNeighbours - - -- now actually modify the node state's neighbours - updatedNs <- atomically $ do - newerNsSnap <- readTVar nsSTM let - -- sorting and taking only k neighbours is taken care of by the - -- setSuccessors/ setPredecessors functions - newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours - newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours - newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap - writeTVar nsSTM newNs - pure newNs - -- delete unresponding nodes from cache as well - mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes + (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise + (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise + allDeletes = predDeletes <> succDeletes + allNeighbours = predNeighbours <> succNeighbours - -- try looking up additional neighbours if list too short - forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [entry] latestNs - ) - nextEntry - ) + -- now actually modify the node state's neighbours + updatedNs <- atomically $ do + newerNsSnap <- readTVar nsSTM + let + -- sorting and taking only k neighbours is taken care of by the + -- setSuccessors/ setPredecessors functions + newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours + newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours + newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap + writeTVar nsSTM newNs + pure newNs + -- delete unresponding nodes from cache as well + mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes - forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [entry] latestNs - ) - nextEntry - ) + -- try looking up additional neighbours if list too short + forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry + ) - newNs <- readTVarIO nsSTM + forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry + ) - let - oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs - newPredecessor = headMay $ predecessors newNs - -- manage need for service data migration: - maybe (pure ()) (\newPredecessor' -> - when ( - isJust newPredecessor - && oldPredecessor /= newPredecessor' - -- case: predecessor has changed in some way => own responsibility has changed in some way - -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data - -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy - -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it - && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do - ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) - migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') - -- TODO: deal with migration failure, e.g retry - pure () - ) - newPredecessor + newNs <- readTVarIO nsSTM - stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) - threadDelay stabiliseDelay + let + oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs + newPredecessor = headMay $ predecessors newNs + -- manage need for service data migration: + maybe (pure ()) (\newPredecessor' -> + when ( + isJust newPredecessor + && oldPredecessor /= newPredecessor' + -- case: predecessor has changed in some way => own responsibility has changed in some way + -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data + -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy + -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it + && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) + migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') + -- TODO: deal with migration failure, e.g retry + pure () + ) + newPredecessor + + ) + + threadDelay . confStabiliseInterval . nodeConfig $ node where -- | send a stabilise request to the n-th neighbour -- (specified by the provided getter function) and on failure retry @@ -636,8 +652,11 @@ fediMainThreads sock nodeSTM = do -- all get cancelled concurrently_ (fediMessageHandler sendQ recvQ nodeSTM) $ - concurrently_ (stabiliseThread nsSTM) $ - concurrently_ (nodeCacheVerifyThread nsSTM) $ + -- decision whether to [1] launch 1 thread per VS or [2] let a single + -- thread process all VSes sequentially: + -- choose option 2 for the sake of limiting concurrency in simulation scenario + concurrently_ (stabiliseThread nodeSTM) $ + concurrently_ (nodeCacheVerifyThread nodeSTM) $ concurrently_ (convergenceSampleThread nsSTM) $ concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ concurrently_