parent
0ea5402231
commit
ebc0d54ddc
|
@ -51,4 +51,5 @@ readConfig = do
|
||||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
||||||
--, confStabiliseInterval = 60
|
--, confStabiliseInterval = 60
|
||||||
, confBootstrapSamplingInterval = 180
|
, confBootstrapSamplingInterval = 180
|
||||||
|
, confMaxLookupCacheAge = 300
|
||||||
}
|
}
|
||||||
|
|
|
@ -535,7 +535,8 @@ sendThread sock sendQ = forever $ do
|
||||||
-- | Sets up and manages the main server threads of FediChord
|
-- | Sets up and manages the main server threads of FediChord
|
||||||
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
|
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
|
||||||
fediMainThreads sock nsSTM = do
|
fediMainThreads sock nsSTM = do
|
||||||
(\x -> putStrLn $ "launching threads, ns: " <> show x) =<< readTVarIO nsSTM
|
ns <- readTVarIO nsSTM
|
||||||
|
putStrLn $ "launching threads, ns: " <> show ns
|
||||||
sendQ <- newTQueueIO
|
sendQ <- newTQueueIO
|
||||||
recvQ <- newTQueueIO
|
recvQ <- newTQueueIO
|
||||||
-- concurrently launch all handler threads, if one of them throws an exception
|
-- concurrently launch all handler threads, if one of them throws an exception
|
||||||
|
@ -545,9 +546,10 @@ fediMainThreads sock nsSTM = do
|
||||||
concurrently_ (stabiliseThread nsSTM) $
|
concurrently_ (stabiliseThread nsSTM) $
|
||||||
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
||||||
concurrently_ (convergenceSampleThread nsSTM) $
|
concurrently_ (convergenceSampleThread nsSTM) $
|
||||||
concurrently_
|
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
||||||
(sendThread sock sendQ)
|
concurrently_
|
||||||
(recvThread sock recvQ)
|
(sendThread sock sendQ)
|
||||||
|
(recvThread sock recvQ)
|
||||||
|
|
||||||
|
|
||||||
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
|
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
|
||||||
|
@ -637,3 +639,65 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
||||||
aMsg
|
aMsg
|
||||||
|
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
|
-- ==== interface to service layer ====
|
||||||
|
|
||||||
|
|
||||||
|
-- | Returns the hostname and port of the host responsible for a key.
|
||||||
|
-- Information is provided from a cache, only on a cache miss a new DHT lookup
|
||||||
|
-- is triggered.
|
||||||
|
getKeyResponsibility :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
|
||||||
|
getKeyResponsibility nodeSTM lookupKey = do
|
||||||
|
node <- readTVarIO nodeSTM
|
||||||
|
cache <- readTVarIO $ lookupCacheSTM node
|
||||||
|
now <- getPOSIXTime
|
||||||
|
let cacheResult = Map.lookup lookupKey cache
|
||||||
|
case cacheResult of
|
||||||
|
Just (CacheEntry _ connInfo ts)
|
||||||
|
| now - ts < confMaxLookupCacheAge (nodeConfig node) -> pure (Just connInfo)
|
||||||
|
| otherwise -> updateLookupCache_ nodeSTM lookupKey
|
||||||
|
Nothing -> updateLookupCache_ nodeSTM lookupKey
|
||||||
|
|
||||||
|
|
||||||
|
-- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the
|
||||||
|
-- new entry.
|
||||||
|
-- If no vserver is active in the DHT, 'Nothing' is returned.
|
||||||
|
updateLookupCache_ :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
|
||||||
|
updateLookupCache_ nodeSTM lookupKey = do
|
||||||
|
(node, lookupSource) <- atomically $ do
|
||||||
|
node <- readTVar nodeSTM
|
||||||
|
let firstVs = headMay (vservers node)
|
||||||
|
lookupSource <- case firstVs of
|
||||||
|
Nothing -> pure Nothing
|
||||||
|
Just vs -> Just <$> readTVar vs
|
||||||
|
pure (node, lookupSource)
|
||||||
|
maybe (do
|
||||||
|
-- if no local node available, delete cache entry and return Nothing
|
||||||
|
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey
|
||||||
|
pure Nothing
|
||||||
|
)
|
||||||
|
(\n -> do
|
||||||
|
-- start a lookup from the node, update the cache with the lookup result and return it
|
||||||
|
newResponsible <- requestQueryID n lookupKey
|
||||||
|
let newEntry = (getDomain newResponsible, getServicePort newResponsible)
|
||||||
|
now <- getPOSIXTime
|
||||||
|
-- atomic update against lost updates
|
||||||
|
atomically $ modifyTVar' (lookupCacheSTM node) $
|
||||||
|
Map.insert lookupKey (CacheEntry False newEntry now)
|
||||||
|
pure $ Just newEntry
|
||||||
|
) lookupSource
|
||||||
|
|
||||||
|
|
||||||
|
-- | Periodically clean the lookup cache from expired entries.
|
||||||
|
lookupCacheCleanup :: RealNodeSTM -> IO ()
|
||||||
|
lookupCacheCleanup nodeSTM = do
|
||||||
|
node <- readTVarIO nodeSTM
|
||||||
|
forever $ do
|
||||||
|
now <- getPOSIXTime
|
||||||
|
atomically $ modifyTVar' (lookupCacheSTM node) (
|
||||||
|
Map.filter (\(CacheEntry _ _ ts) ->
|
||||||
|
now - ts < confMaxLookupCacheAge (nodeConfig node)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5)
|
||||||
|
|
|
@ -607,6 +607,8 @@ data FediChordConf = FediChordConf
|
||||||
-- ^ list of potential bootstrapping nodes
|
-- ^ list of potential bootstrapping nodes
|
||||||
, confBootstrapSamplingInterval :: Int
|
, confBootstrapSamplingInterval :: Int
|
||||||
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
|
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
|
||||||
|
, confMaxLookupCacheAge :: POSIXTime
|
||||||
|
-- ^ maximum age of lookup cache entries in seconds
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue