From 7f5dac55ea5418ddf21d24eb3b5770627ee11456 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 25 Jun 2020 01:24:25 +0200 Subject: [PATCH] close #29: periodic cache maintenance periodically delete expired cache entries, check unverified ones and potentially use them as neighbour --- src/Hash2Pub/DHTProtocol.hs | 36 ++++++++++++++++++-- src/Hash2Pub/FediChord.hs | 68 +++++++++++++++++++++++++++++++++---- 2 files changed, 94 insertions(+), 10 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index f02bd59..7af7699 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -20,11 +20,16 @@ module Hash2Pub.DHTProtocol , requestPing , requestStabilise , queryIdLookupLoop + , queueAddEntries + , queueDeleteEntries + , queueDeleteEntry , resolve , mkSendSocket , mkServerSocket , handleIncomingRequest , ackRequest + , isPossibleSuccessor + , isPossiblePredecessor ) where @@ -81,7 +86,7 @@ import Debug.Trace (trace) queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache ownState nCache lBestNodes targetID -- as target ID falls between own ID and first predecessor, it is handled by this node - | isInOwnResponsibilitySlice ownState targetID = FOUND . toRemoteNodeState $ ownState + | targetID `isInOwnResponsibilitySlice` ownState = FOUND . toRemoteNodeState $ ownState -- my interpretation: the "l best next hops" are the l-1 closest preceding nodes and -- the closest succeeding node (like with the p initiated parallel queries | otherwise = FORWARD $ closestSuccessor `Set.union` closestPredecessors @@ -110,8 +115,8 @@ queryLocalCache ownState nCache lBestNodes targetID -- Looks up the successor of the lookup key on a 'RingMap' representation of the -- predecessor list with the node itself added. If the result is the same as the node -- itself then it falls into the responsibility interval. -isInOwnResponsibilitySlice :: HasKeyID a => LocalNodeState -> a -> Bool -isInOwnResponsibilitySlice ownNs lookupTarget = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs) +isInOwnResponsibilitySlice :: HasKeyID a => a -> LocalNodeState -> Bool +isInOwnResponsibilitySlice lookupTarget ownNs = (getKeyID <$> rMapLookupSucc (getKeyID lookupTarget) predecessorRMap) == pure (getNid ownNs) where predecessorList = predecessors ownNs -- add node itself to RingMap representation, to distinguish between @@ -119,6 +124,16 @@ isInOwnResponsibilitySlice ownNs lookupTarget = (getKeyID <$> rMapLookupSucc (ge predecessorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList predecessorList closestPredecessor = headMay predecessorList +isPossiblePredecessor :: HasKeyID a => a -> LocalNodeState -> Bool +isPossiblePredecessor = isInOwnResponsibilitySlice + +isPossibleSuccessor :: HasKeyID a => a -> LocalNodeState -> Bool +isPossibleSuccessor lookupTarget ownNs = (getKeyID <$> rMapLookupPred (getKeyID lookupTarget) successorRMap) == pure (getNid ownNs) + where + successorList = successors ownNs + successorRMap = addRMapEntry (toRemoteNodeState ownNs) $ rMapFromList successorList + closestSuccessor = headMay successorList + -- cache operations -- | update or insert a 'RemoteCacheEntry' into the cache, @@ -662,6 +677,21 @@ queueAddEntries entries ns = do now <- getPOSIXTime forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry + +-- | enque a list of node IDs to be deleted from the global NodeCache +queueDeleteEntries :: Foldable c + => c NodeID + -> LocalNodeState + -> IO () +queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry + + +-- | enque a single node ID to be deleted from the global NodeCache +queueDeleteEntry :: NodeID + -> LocalNodeState + -> IO () +queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index c425683..fe7fa83 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -63,6 +63,7 @@ import qualified Data.ByteString as BS import qualified Data.ByteString.UTF8 as BSU import Data.Either (rights) import Data.Foldable (foldr') +import Data.Functor.Identity import Data.IP (IPv6, fromHostAddress6, toHostAddress6) import Data.List ((\\)) @@ -183,6 +184,58 @@ cacheWriter nsSTM = modifyTVar' (nodeCacheSTM ns) cacheModifier +-- TODO: make max entry age configurable +maxEntryAge :: POSIXTime +maxEntryAge = 600 + + +-- | Periodically iterate through cache, clean up expired entries and verify unverified ones +cacheVerifyThread :: LocalNodeStateSTM -> IO () +cacheVerifyThread nsSTM = forever $ do + -- get cache + (ns, cache) <- atomically $ do + ns <- readTVar nsSTM + cache <- readTVar $ nodeCacheSTM ns + pure (ns, cache) + -- iterate entries: + -- for avoiding too many time syscalls, get current time before iterating. + now <- getPOSIXTime + forM_ (cacheEntries cache) (\(CacheEntry validated node ts) -> + -- case too old: delete (future work: decide whether pinging and resetting timestamp is better) + if (now - ts) > maxEntryAge + then + queueDeleteEntry (getNid node) ns + -- case unverified: try verifying, otherwise delete + else if not validated + then do + -- marking as verified is done by 'requestPing' as well + pong <- requestPing ns node + either (\_-> + queueDeleteEntry (getNid node) ns + ) + (\vss -> + if node `notElem` vss + then queueDeleteEntry (getNid node) ns + -- after verifying a node, check whether it can be a closer neighbour + else do + if node `isPossiblePredecessor` ns + then atomically $ do + ns' <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [node] ns' + else pure () + if node `isPossibleSuccessor` ns + then atomically $ do + ns' <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [node] ns' + else pure () + ) pong + else pure () + ) + + threadDelay $ toEnum (fromEnum maxEntryAge `div` 20) + + + -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. @@ -308,9 +361,10 @@ fediMainThreads sock nsSTM = do concurrently_ (fediMessageHandler sendQ recvQ nsSTM) $ concurrently_ (stabiliseThread nsSTM) $ - concurrently_ - (sendThread sock sendQ) - (recvThread sock recvQ) + concurrently_ (cacheVerifyThread nsSTM) $ + concurrently_ + (sendThread sock sendQ) + (recvThread sock recvQ) -- defining this here as, for now, the RequestMap is only used by fediMessageHandler. @@ -322,17 +376,17 @@ data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer -- TODO: make purge age configurable -- | periodically clean up old request parts -purgeAge :: POSIXTime -purgeAge = 60 -- seconds +responsePurgeAge :: POSIXTime +responsePurgeAge = 60 -- seconds requestMapPurge :: MVar RequestMap -> IO () requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> - now - ts < purgeAge + now - ts < responsePurgeAge ) rMapState - threadDelay $ fromEnum purgeAge * 2000 + threadDelay $ fromEnum responsePurgeAge * 2000 -- | Wait for messages, deserialise them, manage parts and acknowledgement status,