From 5810c14b26cc19218a9af2f07ef1682b98c0ca29 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 13 May 2020 13:44:51 +0200 Subject: [PATCH] encapsulate NodeCache into IORef and initilise queue for writes --- Hash2Pub/Hash2Pub.cabal | 2 +- Hash2Pub/src/Hash2Pub/FediChord.hs | 108 ++++++++++++++++++----------- Hash2Pub/src/Hash2Pub/Main.hs | 1 + 3 files changed, 68 insertions(+), 43 deletions(-) diff --git a/Hash2Pub/Hash2Pub.cabal b/Hash2Pub/Hash2Pub.cabal index 7792198..084b096 100644 --- a/Hash2Pub/Hash2Pub.cabal +++ b/Hash2Pub/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute ghc-options: -Wall diff --git a/Hash2Pub/src/Hash2Pub/FediChord.hs b/Hash2Pub/src/Hash2Pub/FediChord.hs index 17013a6..87c9d5d 100644 --- a/Hash2Pub/src/Hash2Pub/FediChord.hs +++ b/Hash2Pub/src/Hash2Pub/FediChord.hs @@ -56,6 +56,10 @@ import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteArray as BA import qualified Network.ByteOrder as NetworkBytes import Data.IP (IPv6, fromHostAddress6, toHostAddress6) +import Data.IORef +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Data.Typeable (Typeable(..), typeOf) import Hash2Pub.Utils @@ -134,9 +138,15 @@ data NodeState = NodeState { -- | encapsulates all data and parameters that are not present for remote nodes data InternalNodeState = InternalNodeState { - nodeCache :: NodeCache + nodeCache :: IORef NodeCache -- ^ EpiChord node cache with expiry times for nodes - -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@ + -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@. + -- encapsulated into an IORef for allowing concurrent reads without locking + , cacheWriteQueue :: TQueue (NodeCache -> IO NodeCache) + -- ^ cache updates are not written directly to the 'nodeCache' but queued and + -- only processed by a single writer thread to prevent lost updates. + -- All nodeCache modifying functions have to be partially applied enough before + -- being put into the queue. , successors :: [NodeID] -- could be a set instead as these are ordered as well -- ^ successor nodes in ascending order by distance , predecessors :: [NodeID] @@ -156,7 +166,14 @@ data InternalNodeState = InternalNodeState { -- ^ number of desired entries per cache slice -- needs to be parameterisable for simulation purposes } deriving (Show, Eq) --- + +-- | defining Show instances to be able to print NodeState for debug purposes +instance Typeable a => Show (IORef a) where + show x = show (typeOf x) + +instance Typeable a => Show (TQueue a) where + show x = show (typeOf x) + -- | extract a value from the internals of a 'NodeState' getInternals_ :: (InternalNodeState -> a) -> NodeState -> Maybe a getInternals_ func ns = func <$> internals ns @@ -170,13 +187,13 @@ putInternals_ func ns = let ns {internals = newInternals } -- | convenience function for extracting the 'NodeCache' from a 'NodeState' -getNodeCache :: NodeState -> Maybe NodeCache +getNodeCache :: NodeState -> Maybe (IORef NodeCache) getNodeCache = getInternals_ nodeCache -- | convenience function for updating the 'NodeCache' on 'NodeState' s that have -- internals. -- NodeStates without a cache (without internals) are returned unchanged -putNodeCache :: NodeCache -> NodeState -> NodeState +putNodeCache :: IORef NodeCache -> NodeState -> NodeState putNodeCache nc = putInternals_ (\i -> i {nodeCache = nc}) -- | convenience function for extracting the @successors@ from a 'NodeState' @@ -355,43 +372,43 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs --- TODO: needs testing +-- TODO: complete rewrite -- |checks wether the cache entries fulfill the logarithmic EpiChord invariant -- of having j entries per slice, and creates a list of necessary lookup actions. -- Should be invoked periodically. -checkCacheSlices :: NodeState -> [IO ()] -checkCacheSlices state = case getNodeCache state of - -- don't do anything on nodes without a cache - Nothing -> [return ()] - Just cache' -> checkSlice jEntries (nid state) startBound lastSucc cache' - -- TODO: do the same for predecessors - where - jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state - lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) - startBound = NodeID 2^(255::Integer) + nid state - checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] - checkSlice _ _ _ Nothing _ = [] - checkSlice j ownID upperBound (Just lastSuccNode) cache - | upperBound < lastSuccNode = [] - | otherwise = - -- continuously half the DHT namespace, take the upper part as a slice, - -- check for existing entries in that slice and create a lookup action - -- and recursively do this on the lower half. - -- recursion edge case: all successors/ predecessors need to be in the - -- first slice. - let - diff = getNodeID $ upperBound - ownID - lowerBound = ownID + NodeID (diff `div` 2) - in - -- TODO: replace empty IO actions with actual lookups to middle of slice - -- TODO: validate ID before adding to cache - case Map.lookupLT upperBound cache of - Nothing -> return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache - Just (matchID, _) -> - if - matchID <= lowerBound then return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache - else - checkSlice j ownID lowerBound (Just lastSuccNode) cache +--checkCacheSlices :: NodeState -> IO [()] +--checkCacheSlices state = case getNodeCache state of +-- -- don't do anything on nodes without a cache +-- Nothing -> return [()] +-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache' +-- -- TODO: do the same for predecessors +-- where +-- jEntries = fromMaybe 0 $ getInternals_ jEntriesPerSlice state +-- lastSucc = last <$> maybeEmpty (fromMaybe [] $ getSuccessors state) +-- startBound = NodeID 2^(255::Integer) + nid state +-- checkSlice :: Int -> NodeID -> NodeID -> Maybe NodeID -> NodeCache -> [IO ()] +-- checkSlice _ _ _ Nothing _ = [] +-- checkSlice j ownID upperBound (Just lastSuccNode) cache +-- | upperBound < lastSuccNode = [] +-- | otherwise = +-- -- continuously half the DHT namespace, take the upper part as a slice, +-- -- check for existing entries in that slice and create a lookup action +-- -- and recursively do this on the lower half. +-- -- recursion edge case: all successors/ predecessors need to be in the +-- -- first slice. +-- let +-- diff = getNodeID $ upperBound - ownID +-- lowerBound = ownID + NodeID (diff `div` 2) +-- in +-- -- TODO: replace empty IO actions with actual lookups to middle of slice +-- -- TODO: validate ID before adding to cache +-- case Map.lookupLT upperBound cache of +-- Nothing -> return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- Just (matchID, _) -> +-- if +-- matchID <= lowerBound then return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache +-- else +-- checkSlice j ownID lowerBound (Just lastSuccNode) cache -- Todo: DHT backend can learn potential initial bootstrapping points through the instances mentioned in the received AP-relay messages @@ -408,6 +425,8 @@ data FediChordConf = FediChordConf { -- ToDo: load persisted state, thus this function already operates in IO fediChordInit :: FediChordConf -> IO (Socket, NodeState) fediChordInit conf = do + cacheRef <- newIORef initCache + cacheWriterQueue <- atomically newTQueue let initialState = NodeState { domain = confDomain conf @@ -419,7 +438,8 @@ fediChordInit conf = do , internals = Just internalsInit } internalsInit = InternalNodeState { - nodeCache = initCache + nodeCache = cacheRef + , cacheWriteQueue = cacheWriterQueue , successors = [] , predecessors = [] , kNeighbours = 3 @@ -434,10 +454,14 @@ fediChordInit conf = do --fediChordJoin :: NodeState -- ^ the local 'NodeState' -- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node +-- -> Socket -- ^ socket used for sending and receiving the join message -- -> IO Either String NodeState -- ^ the joined 'NodeState' after a successful -- -- join, otherwise an error message ---fediChordJoin - +--fediChordJoin ns (joinHost, joinPort) sock = do +-- -- 1. get routed to destination until FOUND +-- -- 2. then send a join to the currently responsible node +-- -- ToDo: implement cache management, as already all received replies should be stored in cache +-- -- ====== network socket operations ====== diff --git a/Hash2Pub/src/Hash2Pub/Main.hs b/Hash2Pub/src/Hash2Pub/Main.hs index 67d75f2..7b55230 100644 --- a/Hash2Pub/src/Hash2Pub/Main.hs +++ b/Hash2Pub/src/Hash2Pub/Main.hs @@ -14,6 +14,7 @@ main = do (serverSock, thisNode) <- fediChordInit conf print thisNode print serverSock + -- idea: list of bootstrapping nodes, try joining within a timeout return () readConfig :: IO FediChordConf