diff --git a/Hash2Pub/src/Hash2Pub/DHTProtocol.hs b/Hash2Pub/src/Hash2Pub/DHTProtocol.hs index b5fc77c..3defeac 100644 --- a/Hash2Pub/src/Hash2Pub/DHTProtocol.hs +++ b/Hash2Pub/src/Hash2Pub/DHTProtocol.hs @@ -25,8 +25,6 @@ import Network.Socket.ByteString import Hash2Pub.FediChord ( NodeID , NodeState (..) - , getNodeCache - , putNodeCache , getSuccessors , putSuccessors , getPredecessors @@ -168,8 +166,16 @@ remoteNode_ (RemoteCacheEntry ns _) = ns addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from network -> NodeCache -- ^ node cache to insert to -> IO NodeCache -- ^ new node cache with the element inserted -addCacheEntry (RemoteCacheEntry ns ts) cache = do +addCacheEntry entry cache = do now <- getPOSIXTime + return $ addCacheEntryPure now entry cache + +-- | pure version of 'addCacheEntry' with current time explicitly specified as argument +addCacheEntryPure :: POSIXTime -- ^ current time + -> RemoteCacheEntry -- ^ a remote cache entry received from network + -> NodeCache -- ^ node cache to insert to + -> NodeCache -- ^ new node cache with the element inserted +addCacheEntryPure now (RemoteCacheEntry ns ts) cache = let -- TODO: limit diffSeconds to some maximum value to prevent malicious nodes from inserting entries valid nearly until eternity timestamp' = if ts <= now then ts else now @@ -178,7 +184,8 @@ addCacheEntry (RemoteCacheEntry ns ts) cache = do case oldVal of ProxyEntry n _ -> ProxyEntry n (Just newVal) NodeEntry oldValidationState _ oldTimestamp -> NodeEntry oldValidationState newNode (max oldTimestamp newTimestamp) - return newCache + in + newCache -- | delete the node with given ID from cache deleteCacheEntry :: NodeID -- ^ID of the node to be deleted diff --git a/Hash2Pub/src/Hash2Pub/FediChord.hs b/Hash2Pub/src/Hash2Pub/FediChord.hs index 87c9d5d..70d7fc0 100644 --- a/Hash2Pub/src/Hash2Pub/FediChord.hs +++ b/Hash2Pub/src/Hash2Pub/FediChord.hs @@ -15,7 +15,7 @@ module Hash2Pub.FediChord ( , toNodeID , NodeState (..) , InternalNodeState (..) - , getNodeCache + , getNodeCacheRef , putNodeCache , getSuccessors , putSuccessors @@ -40,6 +40,7 @@ module Hash2Pub.FediChord ( , fediChordInit , mkServerSocket , resolve + , cacheWriter ) where import qualified Data.Map.Strict as Map @@ -59,6 +60,7 @@ import Data.IP (IPv6, fromHostAddress6, toHostAddress6) import Data.IORef import Control.Concurrent.STM import Control.Concurrent.STM.TQueue +import Control.Monad (forever) import Data.Typeable (Typeable(..), typeOf) import Hash2Pub.Utils @@ -142,11 +144,12 @@ data InternalNodeState = InternalNodeState { -- ^ EpiChord node cache with expiry times for nodes -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@. -- encapsulated into an IORef for allowing concurrent reads without locking - , cacheWriteQueue :: TQueue (NodeCache -> IO NodeCache) + , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) -- ^ cache updates are not written directly to the 'nodeCache' but queued and -- only processed by a single writer thread to prevent lost updates. -- All nodeCache modifying functions have to be partially applied enough before -- being put into the queue. + -- , successors :: [NodeID] -- could be a set instead as these are ordered as well -- ^ successor nodes in ascending order by distance , predecessors :: [NodeID] @@ -187,8 +190,8 @@ putInternals_ func ns = let ns {internals = newInternals } -- | convenience function for extracting the 'NodeCache' from a 'NodeState' -getNodeCache :: NodeState -> Maybe (IORef NodeCache) -getNodeCache = getInternals_ nodeCache +getNodeCacheRef :: NodeState -> Maybe (IORef NodeCache) +getNodeCacheRef = getInternals_ nodeCache -- | convenience function for updating the 'NodeCache' on 'NodeState' s that have -- internals. @@ -196,6 +199,9 @@ getNodeCache = getInternals_ nodeCache putNodeCache :: IORef NodeCache -> NodeState -> NodeState putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc}) +getCacheWriteQueue :: NodeState -> Maybe (TQueue (NodeCache -> NodeCache)) +getCacheWriteQueue = getInternals_ cacheWriteQueue + -- | convenience function for extracting the @successors@ from a 'NodeState' getSuccessors :: NodeState -> Maybe [NodeID] getSuccessors = getInternals_ successors @@ -426,7 +432,7 @@ data FediChordConf = FediChordConf { fediChordInit :: FediChordConf -> IO (Socket, NodeState) fediChordInit conf = do cacheRef <- newIORef initCache - cacheWriterQueue <- atomically newTQueue + q <- atomically newTQueue let initialState = NodeState { domain = confDomain conf @@ -439,7 +445,7 @@ fediChordInit conf = do } internalsInit = InternalNodeState { nodeCache = cacheRef - , cacheWriteQueue = cacheWriterQueue + , cacheWriteQueue = q , successors = [] , predecessors = [] , kNeighbours = 3 @@ -463,6 +469,20 @@ fediChordInit conf = do -- -- ToDo: implement cache management, as already all received replies should be stored in cache -- +cacheWriter :: NodeState -> IO () +cacheWriter ns = do + let writeQueue' = getCacheWriteQueue ns + case writeQueue' of + Nothing -> return () + Just writeQueue -> forever $ do + f <- atomically $ readTQueue writeQueue + let + refModifier :: NodeCache -> (NodeCache, ()) + refModifier nc = (f nc, ()) + maybe (return ()) ( + \ref -> atomicModifyIORef' ref refModifier + ) $ getNodeCacheRef ns + -- ====== network socket operations ====== -- | resolve a specified host and return the 'AddrInfo' for it. diff --git a/Hash2Pub/src/Hash2Pub/Main.hs b/Hash2Pub/src/Hash2Pub/Main.hs index 7b55230..9c212b2 100644 --- a/Hash2Pub/src/Hash2Pub/Main.hs +++ b/Hash2Pub/src/Hash2Pub/Main.hs @@ -2,6 +2,7 @@ module Main where import System.Environment import Data.IP (IPv6, toHostAddress6) -- iproute, just for IPv6 string parsing +import Control.Concurrent import Hash2Pub.FediChord @@ -14,7 +15,11 @@ main = do (serverSock, thisNode) <- fediChordInit conf print thisNode print serverSock + -- currently no masking is necessary, as there is nothing to clean up + cacheWriterThread <- forkIO $ cacheWriter thisNode -- idea: list of bootstrapping nodes, try joining within a timeout + -- stop main thread from terminating during development + getChar return () readConfig :: IO FediChordConf