close #29: periodic cache maintenance

periodically delete expired cache entries, check unverified ones and
potentially use them as neighbour
This commit is contained in:
Trolli Schmittlauch 2020-06-25 01:24:25 +02:00
parent 5e8cfb0ccd
commit 7f5dac55ea
2 changed files with 94 additions and 10 deletions

View file

@ -20,11 +20,16 @@ module Hash2Pub.DHTProtocol
, requestPing , requestPing
, requestStabilise , requestStabilise
, queryIdLookupLoop , queryIdLookupLoop
, queueAddEntries
, queueDeleteEntries
, queueDeleteEntry
, resolve , resolve
, mkSendSocket , mkSendSocket
, mkServerSocket , mkServerSocket
, handleIncomingRequest , handleIncomingRequest
, ackRequest , ackRequest
, isPossibleSuccessor
, isPossiblePredecessor
) )
where where
@ -81,7 +86,7 @@ import Debug.Trace (trace)
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node -- as target ID falls between own ID and first predecessor, it is handled by this node
| isInOwnResponsibilitySlice ownState targetID = FOUND . toRemoteNodeState $ ownState | targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState
-- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors
@ -110,8 +115,8 @@ queryLocalCache ownState nCache lBestNodes targetID
-- Looks up the successor of the lookup key on a 'RingMap' representation of the -- Looks up the successor of the lookup key on a 'RingMap' representation of the
-- predecessor list with the node itself added. If the result is the same as the node -- predecessor list with the node itself added. If the result is the same as the node
-- itself then it falls into the responsibility interval. -- itself then it falls into the responsibility interval.
isInOwnResponsibilitySlice :: HasKeyID a => LocalNodeState -> a -> Bool isInOwnResponsibilitySlice :: HasKeyID a => a -> LocalNodeState -> Bool
isInOwnResponsibilitySlice ownNs lookupTarget = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs) isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs)
where where
predecessorList = predecessors ownNs predecessorList = predecessors ownNs
-- add node itself to RingMap representation, to distinguish between -- add node itself to RingMap representation, to distinguish between
@ -119,6 +124,16 @@ isInOwnResponsibilitySlice ownNs lookupTarget = (getKeyID <$> rMapLookupSucc (ge
predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList
closestPredecessor = headMay predecessorList closestPredecessor = headMay predecessorList
isPossiblePredecessor :: HasKeyID a => a -> LocalNodeState -> Bool
isPossiblePredecessor = isInOwnResponsibilitySlice
isPossibleSuccessor :: HasKeyID a => a -> LocalNodeState -> Bool
isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget) successorRMap) == pure (getNid ownNs)
where
successorList = successors ownNs
successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList
closestSuccessor = headMay successorList
-- cache operations -- cache operations
-- | update or insert a 'RemoteCacheEntry' into the cache, -- | update or insert a 'RemoteCacheEntry' into the cache,
@ -662,6 +677,21 @@ queueAddEntries entries ns = do
now <- getPOSIXTime now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c
=> c NodeID
-> LocalNodeState
-> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID
-> LocalNodeState
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry

View file

@ -63,6 +63,7 @@ import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldr') import Data.Foldable (foldr')
import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6, import Data.IP (IPv6, fromHostAddress6,
toHostAddress6) toHostAddress6)
import Data.List ((\\)) import Data.List ((\\))
@ -183,6 +184,58 @@ cacheWriter nsSTM =
modifyTVar' (nodeCacheSTM ns) cacheModifier modifyTVar' (nodeCacheSTM ns) cacheModifier
-- TODO: make max entry age configurable
maxEntryAge :: POSIXTime
maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
cacheVerifyThread :: LocalNodeStateSTM -> IO ()
cacheVerifyThread nsSTM = forever $ do
-- get cache
(ns, cache) <- atomically $ do
ns <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM ns
pure (ns, cache)
-- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime
forM_ (cacheEntries cache) (\(CacheEntry validated node ts) ->
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
if (now - ts) > maxEntryAge
then
queueDeleteEntry (getNid node) ns
-- case unverified: try verifying, otherwise delete
else if not validated
then do
-- marking as verified is done by 'requestPing' as well
pong <- requestPing ns node
either (\_->
queueDeleteEntry (getNid node) ns
)
(\vss ->
if node `notElem` vss
then queueDeleteEntry (getNid node) ns
-- after verifying a node, check whether it can be a closer neighbour
else do
if node `isPossiblePredecessor` ns
then atomically $ do
ns' <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [node] ns'
else pure ()
if node `isPossibleSuccessor` ns
then atomically $ do
ns' <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [node] ns'
else pure ()
) pong
else pure ()
)
threadDelay $ toEnum (fromEnum maxEntryAge `div` 20)
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists. -- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours. -- If necessary, request new neighbours.
@ -308,9 +361,10 @@ fediMainThreads sock nsSTM = do
concurrently_ concurrently_
(fediMessageHandler sendQ recvQ nsSTM) $ (fediMessageHandler sendQ recvQ nsSTM) $
concurrently_ (stabiliseThread nsSTM) $ concurrently_ (stabiliseThread nsSTM) $
concurrently_ concurrently_ (cacheVerifyThread nsSTM) $
(sendThread sock sendQ) concurrently_
(recvThread sock recvQ) (sendThread sock sendQ)
(recvThread sock recvQ)
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler. -- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
@ -322,17 +376,17 @@ data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer
-- TODO: make purge age configurable -- TODO: make purge age configurable
-- | periodically clean up old request parts -- | periodically clean up old request parts
purgeAge :: POSIXTime responsePurgeAge :: POSIXTime
purgeAge = 60 -- seconds responsePurgeAge = 60 -- seconds
requestMapPurge :: MVar RequestMap -> IO () requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do requestMapPurge mapVar = forever $ do
rMapState <- takeMVar mapVar rMapState <- takeMVar mapVar
now <- getPOSIXTime now <- getPOSIXTime
putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
now - ts < purgeAge now - ts < responsePurgeAge
) rMapState ) rMapState
threadDelay $ fromEnum purgeAge * 2000 threadDelay $ fromEnum responsePurgeAge * 2000
-- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- | Wait for messages, deserialise them, manage parts and acknowledgement status,