Compare commits
No commits in common. "ebc0d54ddc37a7ef8d4b5adf3f1b8ae74f34c4cb" and "c37fe88b354644f4b497623d27a2b3e8cfa2f5f8" have entirely different histories.
ebc0d54ddc
...
c37fe88b35
|
@ -51,5 +51,4 @@ readConfig = do
|
||||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
||||||
--, confStabiliseInterval = 60
|
--, confStabiliseInterval = 60
|
||||||
, confBootstrapSamplingInterval = 180
|
, confBootstrapSamplingInterval = 180
|
||||||
, confMaxLookupCacheAge = 300
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -535,8 +535,7 @@ sendThread sock sendQ = forever $ do
|
||||||
-- | Sets up and manages the main server threads of FediChord
|
-- | Sets up and manages the main server threads of FediChord
|
||||||
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
|
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO ()
|
||||||
fediMainThreads sock nsSTM = do
|
fediMainThreads sock nsSTM = do
|
||||||
ns <- readTVarIO nsSTM
|
(\x -> putStrLn $ "launching threads, ns: " <> show x) =<< readTVarIO nsSTM
|
||||||
putStrLn $ "launching threads, ns: " <> show ns
|
|
||||||
sendQ <- newTQueueIO
|
sendQ <- newTQueueIO
|
||||||
recvQ <- newTQueueIO
|
recvQ <- newTQueueIO
|
||||||
-- concurrently launch all handler threads, if one of them throws an exception
|
-- concurrently launch all handler threads, if one of them throws an exception
|
||||||
|
@ -546,10 +545,9 @@ fediMainThreads sock nsSTM = do
|
||||||
concurrently_ (stabiliseThread nsSTM) $
|
concurrently_ (stabiliseThread nsSTM) $
|
||||||
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
||||||
concurrently_ (convergenceSampleThread nsSTM) $
|
concurrently_ (convergenceSampleThread nsSTM) $
|
||||||
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
concurrently_
|
||||||
concurrently_
|
(sendThread sock sendQ)
|
||||||
(sendThread sock sendQ)
|
(recvThread sock recvQ)
|
||||||
(recvThread sock recvQ)
|
|
||||||
|
|
||||||
|
|
||||||
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
|
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
|
||||||
|
@ -639,65 +637,3 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
||||||
aMsg
|
aMsg
|
||||||
|
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
-- ==== interface to service layer ====
|
|
||||||
|
|
||||||
|
|
||||||
-- | Returns the hostname and port of the host responsible for a key.
|
|
||||||
-- Information is provided from a cache, only on a cache miss a new DHT lookup
|
|
||||||
-- is triggered.
|
|
||||||
getKeyResponsibility :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
|
|
||||||
getKeyResponsibility nodeSTM lookupKey = do
|
|
||||||
node <- readTVarIO nodeSTM
|
|
||||||
cache <- readTVarIO $ lookupCacheSTM node
|
|
||||||
now <- getPOSIXTime
|
|
||||||
let cacheResult = Map.lookup lookupKey cache
|
|
||||||
case cacheResult of
|
|
||||||
Just (CacheEntry _ connInfo ts)
|
|
||||||
| now - ts < confMaxLookupCacheAge (nodeConfig node) -> pure (Just connInfo)
|
|
||||||
| otherwise -> updateLookupCache_ nodeSTM lookupKey
|
|
||||||
Nothing -> updateLookupCache_ nodeSTM lookupKey
|
|
||||||
|
|
||||||
|
|
||||||
-- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the
|
|
||||||
-- new entry.
|
|
||||||
-- If no vserver is active in the DHT, 'Nothing' is returned.
|
|
||||||
updateLookupCache_ :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber))
|
|
||||||
updateLookupCache_ nodeSTM lookupKey = do
|
|
||||||
(node, lookupSource) <- atomically $ do
|
|
||||||
node <- readTVar nodeSTM
|
|
||||||
let firstVs = headMay (vservers node)
|
|
||||||
lookupSource <- case firstVs of
|
|
||||||
Nothing -> pure Nothing
|
|
||||||
Just vs -> Just <$> readTVar vs
|
|
||||||
pure (node, lookupSource)
|
|
||||||
maybe (do
|
|
||||||
-- if no local node available, delete cache entry and return Nothing
|
|
||||||
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey
|
|
||||||
pure Nothing
|
|
||||||
)
|
|
||||||
(\n -> do
|
|
||||||
-- start a lookup from the node, update the cache with the lookup result and return it
|
|
||||||
newResponsible <- requestQueryID n lookupKey
|
|
||||||
let newEntry = (getDomain newResponsible, getServicePort newResponsible)
|
|
||||||
now <- getPOSIXTime
|
|
||||||
-- atomic update against lost updates
|
|
||||||
atomically $ modifyTVar' (lookupCacheSTM node) $
|
|
||||||
Map.insert lookupKey (CacheEntry False newEntry now)
|
|
||||||
pure $ Just newEntry
|
|
||||||
) lookupSource
|
|
||||||
|
|
||||||
|
|
||||||
-- | Periodically clean the lookup cache from expired entries.
|
|
||||||
lookupCacheCleanup :: RealNodeSTM -> IO ()
|
|
||||||
lookupCacheCleanup nodeSTM = do
|
|
||||||
node <- readTVarIO nodeSTM
|
|
||||||
forever $ do
|
|
||||||
now <- getPOSIXTime
|
|
||||||
atomically $ modifyTVar' (lookupCacheSTM node) (
|
|
||||||
Map.filter (\(CacheEntry _ _ ts) ->
|
|
||||||
now - ts < confMaxLookupCacheAge (nodeConfig node)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
threadDelay $ round (confMaxLookupCacheAge $ nodeConfig node) * (10^5)
|
|
||||||
|
|
|
@ -19,8 +19,6 @@ module Hash2Pub.FediChordTypes (
|
||||||
, setPredecessors
|
, setPredecessors
|
||||||
, NodeCache
|
, NodeCache
|
||||||
, NodeCacheEntry
|
, NodeCacheEntry
|
||||||
, LookupCache
|
|
||||||
, LookupCacheEntry
|
|
||||||
, CacheEntry(..)
|
, CacheEntry(..)
|
||||||
, RingEntry(..)
|
, RingEntry(..)
|
||||||
, RingMap(..)
|
, RingMap(..)
|
||||||
|
@ -607,8 +605,6 @@ data FediChordConf = FediChordConf
|
||||||
-- ^ list of potential bootstrapping nodes
|
-- ^ list of potential bootstrapping nodes
|
||||||
, confBootstrapSamplingInterval :: Int
|
, confBootstrapSamplingInterval :: Int
|
||||||
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
|
-- ^ pause between sampling the own ID through bootstrap nodes, in seconds
|
||||||
, confMaxLookupCacheAge :: POSIXTime
|
|
||||||
-- ^ maximum age of lookup cache entries in seconds
|
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue