change NodeCache protection to STM
- putting the NodeCache behind an IORef had been chose because those could've been read non-blocking - the same is true for TVars. The performance characteristics are likely worse, but at the advantage of composability within STM monads
This commit is contained in:
parent
dc2e399d64
commit
914e07a412
|
@ -27,6 +27,7 @@ module Hash2Pub.DHTProtocol
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
|
import Control.Concurrent.STM.TVar
|
||||||
import Control.Concurrent.STM.TBQueue
|
import Control.Concurrent.STM.TBQueue
|
||||||
import Control.Concurrent.STM.TQueue
|
import Control.Concurrent.STM.TQueue
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
|
@ -35,7 +36,6 @@ import qualified Data.ByteString as BS
|
||||||
import Data.Either (rights)
|
import Data.Either (rights)
|
||||||
import Data.Foldable (foldl', foldr')
|
import Data.Foldable (foldl', foldr')
|
||||||
import Data.Functor.Identity
|
import Data.Functor.Identity
|
||||||
import Data.IORef
|
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
import Data.IP (IPv6, fromHostAddress6,
|
||||||
toHostAddress6)
|
toHostAddress6)
|
||||||
import Data.List (sortBy)
|
import Data.List (sortBy)
|
||||||
|
@ -272,7 +272,7 @@ requestQueryID :: LocalNodeState -- ^ NodeState of the querying node
|
||||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||||
-- TODO: deal with lookup failures
|
-- TODO: deal with lookup failures
|
||||||
requestQueryID ns targetID = do
|
requestQueryID ns targetID = do
|
||||||
firstCacheSnapshot <- readIORef . nodeCacheRef $ ns
|
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
|
||||||
queryIdLookupLoop firstCacheSnapshot ns targetID
|
queryIdLookupLoop firstCacheSnapshot ns targetID
|
||||||
|
|
||||||
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
|
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
|
||||||
|
|
|
@ -70,7 +70,6 @@ import Crypto.Hash
|
||||||
import qualified Data.ByteArray as BA
|
import qualified Data.ByteArray as BA
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
import Data.IORef
|
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
import Data.IP (IPv6, fromHostAddress6,
|
||||||
toHostAddress6)
|
toHostAddress6)
|
||||||
import Data.Typeable (Typeable (..), typeOf)
|
import Data.Typeable (Typeable (..), typeOf)
|
||||||
|
@ -96,7 +95,7 @@ fediChordInit conf = do
|
||||||
-- Separated from 'fediChordInit' to be usable in tests.
|
-- Separated from 'fediChordInit' to be usable in tests.
|
||||||
nodeStateInit :: FediChordConf -> IO LocalNodeState
|
nodeStateInit :: FediChordConf -> IO LocalNodeState
|
||||||
nodeStateInit conf = do
|
nodeStateInit conf = do
|
||||||
cacheRef <- newIORef initCache
|
cacheSTM <- newTVarIO initCache
|
||||||
q <- atomically newTQueue
|
q <- atomically newTQueue
|
||||||
let
|
let
|
||||||
containedState = RemoteNodeState {
|
containedState = RemoteNodeState {
|
||||||
|
@ -109,7 +108,7 @@ nodeStateInit conf = do
|
||||||
}
|
}
|
||||||
initialState = LocalNodeState {
|
initialState = LocalNodeState {
|
||||||
nodeState = containedState
|
nodeState = containedState
|
||||||
, nodeCacheRef = cacheRef
|
, nodeCacheSTM = cacheSTM
|
||||||
, cacheWriteQueue = q
|
, cacheWriteQueue = q
|
||||||
, successors = []
|
, successors = []
|
||||||
, predecessors = []
|
, predecessors = []
|
||||||
|
@ -172,15 +171,11 @@ fediChordJoin cacheSnapshot nsSTM = do
|
||||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||||
cacheWriter :: LocalNodeStateSTM -> IO ()
|
cacheWriter :: LocalNodeStateSTM -> IO ()
|
||||||
cacheWriter nsSTM = do
|
cacheWriter nsSTM =
|
||||||
ns <- readTVarIO nsSTM
|
forever $ atomically $ do
|
||||||
let writeQueue' = cacheWriteQueue ns
|
ns <- readTVar nsSTM
|
||||||
forever $ do
|
cacheModifier <- readTQueue $ cacheWriteQueue ns
|
||||||
f <- atomically $ readTQueue writeQueue'
|
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
||||||
let
|
|
||||||
refModifier :: NodeCache -> (NodeCache, ())
|
|
||||||
refModifier nc = (f nc, ())
|
|
||||||
atomicModifyIORef' (nodeCacheRef ns) refModifier
|
|
||||||
|
|
||||||
-- | Receives UDP packets and passes them to other threads via the given TQueue.
|
-- | Receives UDP packets and passes them to other threads via the given TQueue.
|
||||||
-- Shall be used as the single receiving thread on the server socket, as multiple
|
-- Shall be used as the single receiving thread on the server socket, as multiple
|
||||||
|
|
|
@ -47,7 +47,6 @@ import Crypto.Hash
|
||||||
import qualified Data.ByteArray as BA
|
import qualified Data.ByteArray as BA
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
import Data.IORef
|
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
import Data.IP (IPv6, fromHostAddress6,
|
||||||
toHostAddress6)
|
toHostAddress6)
|
||||||
import Data.Typeable (Typeable (..), typeOf)
|
import Data.Typeable (Typeable (..), typeOf)
|
||||||
|
@ -127,7 +126,7 @@ data RemoteNodeState = RemoteNodeState
|
||||||
data LocalNodeState = LocalNodeState
|
data LocalNodeState = LocalNodeState
|
||||||
{ nodeState :: RemoteNodeState
|
{ nodeState :: RemoteNodeState
|
||||||
-- ^ represents common data present both in remote and local node representations
|
-- ^ represents common data present both in remote and local node representations
|
||||||
, nodeCacheRef :: IORef NodeCache
|
, nodeCacheSTM :: TVar NodeCache
|
||||||
-- ^ EpiChord node cache with expiry times for nodes
|
-- ^ EpiChord node cache with expiry times for nodes
|
||||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||||
|
@ -206,7 +205,7 @@ instance NodeState LocalNodeState where
|
||||||
toRemoteNodeState = nodeState
|
toRemoteNodeState = nodeState
|
||||||
|
|
||||||
-- | defining Show instances to be able to print NodeState for debug purposes
|
-- | defining Show instances to be able to print NodeState for debug purposes
|
||||||
instance Typeable a => Show (IORef a) where
|
instance Typeable a => Show (TVar a) where
|
||||||
show x = show (typeOf x)
|
show x = show (typeOf x)
|
||||||
|
|
||||||
instance Typeable a => Show (TQueue a) where
|
instance Typeable a => Show (TQueue a) where
|
||||||
|
|
|
@ -4,7 +4,6 @@ module FediChordSpec where
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Data.ASN1.Parse (runParseASN1)
|
import Data.ASN1.Parse (runParseASN1)
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import Data.IORef
|
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Maybe (fromJust)
|
import Data.Maybe (fromJust)
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
|
|
Loading…
Reference in a new issue