encapsulate NodeCache into IORef and initilise queue for writes

This commit is contained in:
Trolli Schmittlauch 2020-05-13 13:44:51 +02:00
parent 0682bf4bad
commit 5810c14b26
3 changed files with 68 additions and 43 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute
ghc-options: -Wall

View file

@ -56,6 +56,10 @@ import qualified Data.ByteString.UTF8 as BSU
import qualified Data.ByteArray as BA
import qualified Network.ByteOrder as NetworkBytes
import Data.IP (IPv6, fromHostAddress6, toHostAddress6)
import Data.IORef
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Data.Typeable (Typeable(..), typeOf)
import Hash2Pub.Utils
@ -134,9 +138,15 @@ data NodeState = NodeState {
-- | encapsulates all data and parameters that are not present for remote nodes
data InternalNodeState = InternalNodeState {
nodeCache :: NodeCache
nodeCache :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
-- encapsulated into an IORef for allowing concurrent reads without locking
, cacheWriteQueue :: TQueue (NodeCache -> IO NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- only processed by a single writer thread to prevent lost updates.
-- All nodeCache modifying functions have to be partially applied enough before
-- being put into the queue.
, successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [NodeID]
@ -156,7 +166,14 @@ data InternalNodeState = InternalNodeState {
-- ^ number of desired entries per cache slice
-- needs to be parameterisable for simulation purposes
} deriving (Show, Eq)
--
-- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (IORef a) where
show x = show (typeOf x)
instance Typeable a => Show (TQueue a) where
show x = show (typeOf x)
-- | extract a value from the internals of a 'NodeState'
getInternals_ :: (InternalNodeState -> a) -> NodeState -> Maybe a
getInternals_ func ns = func <$> internals ns
@ -170,13 +187,13 @@ putInternals_ func ns = let
ns {internals = newInternals }
-- | convenience function for extracting the 'NodeCache' from a 'NodeState'
getNodeCache :: NodeState -> Maybe NodeCache
getNodeCache :: NodeState -> Maybe (IORef NodeCache)
getNodeCache = getInternals_ nodeCache
-- | convenience function for updating the 'NodeCache' on 'NodeState' s that have
-- internals.
-- NodeStates without a cache (without internals) are returned unchanged
putNodeCache :: NodeCache -> NodeState -> NodeState
putNodeCache :: IORef NodeCache -> NodeState -> NodeState
putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc})
-- | convenience function for extracting the @successors@ from a 'NodeState'
@ -355,43 +372,43 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- TODO: needs testing
-- TODO: complete rewrite
-- |checks wether the cache entries fulfill the logarithmic EpiChord invariant
-- of having j entries per slice, and creates a list of necessary lookup actions.
-- Should be invoked periodically.
checkCacheSlices :: NodeState -> [IO ()]
checkCacheSlices state = case getNodeCache state of
-- don't do anything on nodes without a cache
Nothing -> [return ()]
Just cache' -> checkSlice jEntries (nid state) startBound lastSucc cache'
-- TODO: do the same for predecessors
where
jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state
lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state)
startBound = NodeID 2^(255::Integer) + nid state
checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()]
checkSlice _ _ _ Nothing _ = []
checkSlice j ownID upperBound (Just lastSuccNode) cache
| upperBound < lastSuccNode = []
| otherwise =
-- continuously half the DHT namespace, take the upper part as a slice,
-- check for existing entries in that slice and create a lookup action
-- and recursively do this on the lower half.
-- recursion edge case: all successors/ predecessors need to be in the
-- first slice.
let
diff = getNodeID $ upperBound - ownID
lowerBound = ownID + NodeID (diff `div` 2)
in
-- TODO: replace empty IO actions with actual lookups to middle of slice
-- TODO: validate ID before adding to cache
case Map.lookupLT upperBound cache of
Nothing -> return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
Just (matchID, _) ->
if
matchID <= lowerBound then return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
else
checkSlice j ownID lowerBound (Just lastSuccNode) cache
--checkCacheSlices :: NodeState -> IO [()]
--checkCacheSlices state = case getNodeCache state of
-- -- don't do anything on nodes without a cache
-- Nothing -> return [()]
-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache'
-- -- TODO: do the same for predecessors
-- where
-- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state
-- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state)
-- startBound = NodeID 2^(255::Integer) + nid state
-- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()]
-- checkSlice _ _ _ Nothing _ = []
-- checkSlice j ownID upperBound (Just lastSuccNode) cache
-- | upperBound < lastSuccNode = []
-- | otherwise =
-- -- continuously half the DHT namespace, take the upper part as a slice,
-- -- check for existing entries in that slice and create a lookup action
-- -- and recursively do this on the lower half.
-- -- recursion edge case: all successors/ predecessors need to be in the
-- -- first slice.
-- let
-- diff = getNodeID $ upperBound - ownID
-- lowerBound = ownID + NodeID (diff `div` 2)
-- in
-- -- TODO: replace empty IO actions with actual lookups to middle of slice
-- -- TODO: validate ID before adding to cache
-- case Map.lookupLT upperBound cache of
-- Nothing -> return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- Just (matchID, _) ->
-- if
-- matchID <= lowerBound then return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- else
-- checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages
@ -408,6 +425,8 @@ data FediChordConf = FediChordConf {
-- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, NodeState)
fediChordInit conf = do
cacheRef <- newIORef initCache
cacheWriterQueue <- atomically newTQueue
let
initialState = NodeState {
domain = confDomain conf
@ -419,7 +438,8 @@ fediChordInit conf = do
, internals = Just internalsInit
}
internalsInit = InternalNodeState {
nodeCache = initCache
nodeCache = cacheRef
, cacheWriteQueue = cacheWriterQueue
, successors = []
, predecessors = []
, kNeighbours = 3
@ -434,10 +454,14 @@ fediChordInit conf = do
--fediChordJoin :: NodeState -- ^ the local 'NodeState'
-- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-- -> Socket -- ^ socket used for sending and receiving the join message
-- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful
-- -- join, otherwise an error message
--fediChordJoin
--fediChordJoin ns (joinHost, joinPort) sock = do
-- -- 1. get routed to destination until FOUND
-- -- 2. then send a join to the currently responsible node
-- -- ToDo: implement cache management, as already all received replies should be stored in cache
--
-- ====== network socket operations ======

View file

@ -14,6 +14,7 @@ main = do
(serverSock, thisNode) <- fediChordInit conf
print thisNode
print serverSock
-- idea: list of bootstrapping nodes, try joining within a timeout
return ()
readConfig :: IO FediChordConf