writer thread for processing all cache modification through a queue

This commit is contained in:
Trolli Schmittlauch 2020-05-13 19:54:02 +02:00
parent 5810c14b26
commit f5d0777cc4
3 changed files with 42 additions and 10 deletions

View file

@ -25,8 +25,6 @@ import Network.Socket.ByteString
import Hash2Pub.FediChord import Hash2Pub.FediChord
( NodeID ( NodeID
, NodeState (..) , NodeState (..)
, getNodeCache
, putNodeCache
, getSuccessors , getSuccessors
, putSuccessors , putSuccessors
, getPredecessors , getPredecessors
@ -168,8 +166,16 @@ remoteNode_ (RemoteCacheEntry ns _) = ns
addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from network addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from network
-> NodeCache -- ^ node cache to insert to -> NodeCache -- ^ node cache to insert to
-> IO NodeCache -- ^ new node cache with the element inserted -> IO NodeCache -- ^ new node cache with the element inserted
addCacheEntry (RemoteCacheEntry ns ts) cache = do addCacheEntry entry cache = do
now <- getPOSIXTime now <- getPOSIXTime
return $ addCacheEntryPure now entry cache
-- | pure version of 'addCacheEntry' with current time explicitly specified as argument
addCacheEntryPure :: POSIXTime -- ^ current time
-> RemoteCacheEntry -- ^ a remote cache entry received from network
-> NodeCache -- ^ node cache to insert to
-> NodeCache -- ^ new node cache with the element inserted
addCacheEntryPure now (RemoteCacheEntry ns ts) cache =
let let
-- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity -- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity
timestamp' = if ts <= now then ts else now timestamp' = if ts <= now then ts else now
@ -178,7 +184,8 @@ addCacheEntry (RemoteCacheEntry ns ts) cache = do
case oldVal of case oldVal of
ProxyEntry n _ -> ProxyEntry n (Just newVal) ProxyEntry n _ -> ProxyEntry n (Just newVal)
NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp) NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp)
return newCache in
newCache
-- | delete the node with given ID from cache -- | delete the node with given ID from cache
deleteCacheEntry :: NodeID -- ^ID of the node to be deleted deleteCacheEntry :: NodeID -- ^ID of the node to be deleted

View file

@ -15,7 +15,7 @@ module Hash2Pub.FediChord (
, toNodeID , toNodeID
, NodeState (..) , NodeState (..)
, InternalNodeState (..) , InternalNodeState (..)
, getNodeCache , getNodeCacheRef
, putNodeCache , putNodeCache
, getSuccessors , getSuccessors
, putSuccessors , putSuccessors
@ -40,6 +40,7 @@ module Hash2Pub.FediChord (
, fediChordInit , fediChordInit
, mkServerSocket , mkServerSocket
, resolve , resolve
, cacheWriter
) where ) where
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
@ -59,6 +60,7 @@ import Data.IP (IPv6, fromHostAddress6, toHostAddress6)
import Data.IORef import Data.IORef
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Monad (forever)
import Data.Typeable (Typeable(..), typeOf) import Data.Typeable (Typeable(..), typeOf)
import Hash2Pub.Utils import Hash2Pub.Utils
@ -142,11 +144,12 @@ data InternalNodeState = InternalNodeState {
-- ^ EpiChord node cache with expiry times for nodes -- ^ EpiChord node cache with expiry times for nodes
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@. -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
-- encapsulated into an IORef for allowing concurrent reads without locking -- encapsulated into an IORef for allowing concurrent reads without locking
, cacheWriteQueue :: TQueue (NodeCache -> IO NodeCache) , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and -- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- only processed by a single writer thread to prevent lost updates. -- only processed by a single writer thread to prevent lost updates.
-- All nodeCache modifying functions have to be partially applied enough before -- All nodeCache modifying functions have to be partially applied enough before
-- being put into the queue. -- being put into the queue.
--
, successors :: [NodeID] -- could be a set instead as these are ordered as well , successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance -- ^ successor nodes in ascending order by distance
, predecessors :: [NodeID] , predecessors :: [NodeID]
@ -187,8 +190,8 @@ putInternals_ func ns = let
ns {internals = newInternals } ns {internals = newInternals }
-- | convenience function for extracting the 'NodeCache' from a 'NodeState' -- | convenience function for extracting the 'NodeCache' from a 'NodeState'
getNodeCache :: NodeState -> Maybe (IORef NodeCache) getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache)
getNodeCache = getInternals_ nodeCache getNodeCacheRef = getInternals_ nodeCache
-- | convenience function for updating the 'NodeCache' on 'NodeState' s that have -- | convenience function for updating the 'NodeCache' on 'NodeState' s that have
-- internals. -- internals.
@ -196,6 +199,9 @@ getNodeCache = getInternals_ nodeCache
putNodeCache :: IORef NodeCache -> NodeState -> NodeState putNodeCache :: IORef NodeCache -> NodeState -> NodeState
putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc}) putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc})
getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache))
getCacheWriteQueue = getInternals_ cacheWriteQueue
-- | convenience function for extracting the @successors@ from a 'NodeState' -- | convenience function for extracting the @successors@ from a 'NodeState'
getSuccessors :: NodeState -> Maybe [NodeID] getSuccessors :: NodeState -> Maybe [NodeID]
getSuccessors = getInternals_ successors getSuccessors = getInternals_ successors
@ -426,7 +432,7 @@ data FediChordConf = FediChordConf {
fediChordInit :: FediChordConf -> IO (Socket, NodeState) fediChordInit :: FediChordConf -> IO (Socket, NodeState)
fediChordInit conf = do fediChordInit conf = do
cacheRef <- newIORef initCache cacheRef <- newIORef initCache
cacheWriterQueue <- atomically newTQueue q <- atomically newTQueue
let let
initialState = NodeState { initialState = NodeState {
domain = confDomain conf domain = confDomain conf
@ -439,7 +445,7 @@ fediChordInit conf = do
} }
internalsInit = InternalNodeState { internalsInit = InternalNodeState {
nodeCache = cacheRef nodeCache = cacheRef
, cacheWriteQueue = cacheWriterQueue , cacheWriteQueue = q
, successors = [] , successors = []
, predecessors = [] , predecessors = []
, kNeighbours = 3 , kNeighbours = 3
@ -463,6 +469,20 @@ fediChordInit conf = do
-- -- ToDo: implement cache management, as already all received replies should be stored in cache -- -- ToDo: implement cache management, as already all received replies should be stored in cache
-- --
cacheWriter :: NodeState -> IO ()
cacheWriter ns = do
let writeQueue' = getCacheWriteQueue ns
case writeQueue' of
Nothing -> return ()
Just writeQueue -> forever $ do
f <- atomically $ readTQueue writeQueue
let
refModifier :: NodeCache -> (NodeCache, ())
refModifier nc = (f nc, ())
maybe (return ()) (
\ref -> atomicModifyIORef' ref refModifier
) $ getNodeCacheRef ns
-- ====== network socket operations ====== -- ====== network socket operations ======
-- | resolve a specified host and return the 'AddrInfo' for it. -- | resolve a specified host and return the 'AddrInfo' for it.

View file

@ -2,6 +2,7 @@ module Main where
import System.Environment import System.Environment
import Data.IP (IPv6, toHostAddress6) -- iproute, just for IPv6 string parsing import Data.IP (IPv6, toHostAddress6) -- iproute, just for IPv6 string parsing
import Control.Concurrent
import Hash2Pub.FediChord import Hash2Pub.FediChord
@ -14,7 +15,11 @@ main = do
(serverSock, thisNode) <- fediChordInit conf (serverSock, thisNode) <- fediChordInit conf
print thisNode print thisNode
print serverSock print serverSock
-- currently no masking is necessary, as there is nothing to clean up
cacheWriterThread <- forkIO $ cacheWriter thisNode
-- idea: list of bootstrapping nodes, try joining within a timeout -- idea: list of bootstrapping nodes, try joining within a timeout
-- stop main thread from terminating during development
getChar
return () return ()
readConfig :: IO FediChordConf readConfig :: IO FediChordConf