From ebc0d54ddc37a7ef8d4b5adf3f1b8ae74f34c4cb Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 16 Jul 2020 00:24:44 +0200 Subject: [PATCH] periodically purge lookup cache of expired entries closes #24 --- app/Main.hs | 1 + src/Hash2Pub/FediChord.hs | 72 ++++++++++++++++++++++++++++++++-- src/Hash2Pub/FediChordTypes.hs | 2 + 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index 85db925..8887ee8 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -51,4 +51,5 @@ readConfig = do , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] --, confStabiliseInterval = 60 , confBootstrapSamplingInterval = 180 + , confMaxLookupCacheAge = 300 } diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index dc32c77..3ff4d8e 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -535,7 +535,8 @@ sendThread sock sendQ = forever $ do -- | Sets up and manages the main server threads of FediChord fediMainThreads :: Socket -> LocalNodeStateSTM -> IO () fediMainThreads sock nsSTM = do - (\x -> putStrLn $ "launching threads, ns: " <> show x) =<< readTVarIO nsSTM + ns <- readTVarIO nsSTM + putStrLn $ "launching threads, ns: " <> show ns sendQ <- newTQueueIO recvQ <- newTQueueIO -- concurrently launch all handler threads, if one of them throws an exception @@ -545,9 +546,10 @@ fediMainThreads sock nsSTM = do concurrently_ (stabiliseThread nsSTM) $ concurrently_ (nodeCacheVerifyThread nsSTM) $ concurrently_ (convergenceSampleThread nsSTM) $ - concurrently_ - (sendThread sock sendQ) - (recvThread sock recvQ) + concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ + concurrently_ + (sendThread sock sendQ) + (recvThread sock recvQ) -- defining this here as, for now, the RequestMap is only used by fediMessageHandler. @@ -637,3 +639,65 @@ fediMessageHandler sendQ recvQ nsSTM = do aMsg pure () + + +-- ==== interface to service layer ==== + + +-- | Returns the hostname and port of the host responsible for a key. +-- Information is provided from a cache, only on a cache miss a new DHT lookup +-- is triggered. +getKeyResponsibility :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber)) +getKeyResponsibility nodeSTM lookupKey = do + node <- readTVarIO nodeSTM + cache <- readTVarIO $ lookupCacheSTM node + now <- getPOSIXTime + let cacheResult = Map.lookup lookupKey cache + case cacheResult of + Just (CacheEntry _ connInfo ts) + | now - ts < confMaxLookupCacheAge (nodeConfig node) -> pure (Just connInfo) + | otherwise -> updateLookupCache_ nodeSTM lookupKey + Nothing -> updateLookupCache_ nodeSTM lookupKey + + +-- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the +-- new entry. +-- If no vserver is active in the DHT, 'Nothing' is returned. +updateLookupCache_ :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber)) +updateLookupCache_ nodeSTM lookupKey = do + (node, lookupSource) <- atomically $ do + node <- readTVar nodeSTM + let firstVs = headMay (vservers node) + lookupSource <- case firstVs of + Nothing -> pure Nothing + Just vs -> Just <$> readTVar vs + pure (node, lookupSource) + maybe (do + -- if no local node available, delete cache entry and return Nothing + atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey + pure Nothing + ) + (\n -> do + -- start a lookup from the node, update the cache with the lookup result and return it + newResponsible <- requestQueryID n lookupKey + let newEntry = (getDomain newResponsible, getServicePort newResponsible) + now <- getPOSIXTime + -- atomic update against lost updates + atomically $ modifyTVar' (lookupCacheSTM node) $ + Map.insert lookupKey (CacheEntry False newEntry now) + pure $ Just newEntry + ) lookupSource + + +-- | Periodically clean the lookup cache from expired entries. +lookupCacheCleanup :: RealNodeSTM -> IO () +lookupCacheCleanup nodeSTM = do + node <- readTVarIO nodeSTM + forever $ do + now <- getPOSIXTime + atomically $ modifyTVar' (lookupCacheSTM node) ( + Map.filter (\(CacheEntry _ _ ts) -> + now - ts < confMaxLookupCacheAge (nodeConfig node) + ) + ) + threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5) diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 7fd5df4..f1ca5b6 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -607,6 +607,8 @@ data FediChordConf = FediChordConf -- ^ list of potential bootstrapping nodes , confBootstrapSamplingInterval :: Int -- ^ pause between sampling the own ID through bootstrap nodes, in seconds + , confMaxLookupCacheAge :: POSIXTime + -- ^ maximum age of lookup cache entries in seconds } deriving (Show, Eq)