From 5ffe1b074e723f8ffaa29716baf066fe08f79a7d Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 30 Jul 2020 02:19:26 +0200 Subject: [PATCH] add reference from RealNode to Service This required to make both RealNode(STM) and LocalNodeState(STM) parameterisable polymorphic types --- src/Hash2Pub/DHTProtocol.hs | 44 +++++++++++++------------- src/Hash2Pub/FediChord.hs | 57 ++++++++++++++++------------------ src/Hash2Pub/FediChordTypes.hs | 30 +++++++++--------- src/Hash2Pub/PostService.hs | 2 ++ test/FediChordSpec.hs | 2 +- 5 files changed, 68 insertions(+), 67 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 546c10f..f962d58 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -92,7 +92,7 @@ import Debug.Trace (trace) -- TODO: evaluate more fine-grained argument passing to allow granular locking -- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache -queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse +queryLocalCache :: LocalNodeState s -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache ownState nCache lBestNodes targetID -- as target ID falls between own ID and first predecessor, it is handled by this node -- This only makes sense if the node is part of the DHT by having joined. @@ -130,7 +130,7 @@ closestCachePredecessors remainingLookups lastID nCache -- Looks up the successor of the lookup key on a 'RingMap' representation of the -- predecessor list with the node itself added. If the result is the same as the node -- itself then it falls into the responsibility interval. -isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState -> Bool +isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs) where predecessorList = predecessors ownNs @@ -140,10 +140,10 @@ isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyI ownRemote = toRemoteNodeState ownNs closestPredecessor = headMay predecessorList -isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool +isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool isPossiblePredecessor = isInOwnResponsibilitySlice -isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool +isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs) where successorList = successors ownNs @@ -224,7 +224,7 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g -- | uses the successor and predecessor list of a node as an indicator for whether a -- node has properly joined the DHT -isJoined :: LocalNodeState -> Bool +isJoined :: LocalNodeState s -> Bool isJoined ns = not . all null $ [successors ns, predecessors ns] -- | the size limit to be used when serialising messages for sending @@ -248,7 +248,7 @@ ackRequest _ _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- the response to be sent. -handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node +handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> Set.Set FediChordMessage -- ^ all parts of the request to handle -> SockAddr -- ^ source address of the request @@ -287,7 +287,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do -- | execute a key ID lookup on local cache and respond with the result -respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondQueryID nsSTM msgSet = do putStrLn "responding to a QueryID request" -- this message cannot be split reasonably, so just @@ -328,7 +328,7 @@ respondQueryID nsSTM msgSet = do -- | Respond to a Leave request by removing the leaving node from local data structures -- and confirming with response. -- TODO: copy over key data from leaver and confirm -respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondLeave nsSTM msgSet = do -- combine payload of all parts let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) -> @@ -359,7 +359,7 @@ respondLeave nsSTM msgSet = do pure $ serialiseMessage sendMessageSize responseMsg -- | respond to stabilise requests by returning successor and predecessor list -respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondStabilise :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondStabilise nsSTM msgSet = do nsSnap <- readTVarIO nsSTM let @@ -381,7 +381,7 @@ respondStabilise nsSTM msgSet = do -- | respond to Ping request by returning all active vserver NodeStates -respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondPing :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondPing nsSTM msgSet = do -- TODO: respond with all active VS when implementing k-choices nsSnap <- readTVarIO nsSTM @@ -400,7 +400,7 @@ respondPing nsSTM msgSet = do -- this modifies node state, so locking and IO seems to be necessary. -- Still try to keep as much code as possible pure -respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do -- atomically read and modify the node state according to the parsed request responseMsg <- atomically $ do @@ -451,8 +451,8 @@ respondJoin nsSTM msgSet = do -- | send a join request and return the joined 'LocalNodeState' including neighbours requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted - -> LocalNodeStateSTM -- ^ joining NodeState - -> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information + -> LocalNodeStateSTM s -- ^ joining NodeState + -> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information requestJoin toJoinOn ownStateSTM = do ownState <- readTVarIO ownStateSTM prn <- readTVarIO $ parentRealNode ownState @@ -500,7 +500,7 @@ requestJoin toJoinOn ownStateSTM = do -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: LocalNodeState -- ^ NodeState of the querying node +requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node -> NodeID -- ^ target key ID to look up -> IO RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes @@ -515,7 +515,7 @@ requestQueryID ns targetID = do queryIdLookupLoop firstCacheSnapshot ns 50 targetID -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState +queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState -- return node itself as default fallback value against infinite recursion. -- TODO: consider using an Either instead of a default value queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns @@ -541,7 +541,7 @@ queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do sendQueryIdMessages :: (Integral i) => NodeID -- ^ target key ID to look up - -> LocalNodeState -- ^ node state of the node doing the query + -> LocalNodeState s -- ^ node state of the node doing the query -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned -> [RemoteNodeState] -- ^ nodes to query -> IO QueryResponse -- ^ accumulated response @@ -579,7 +579,7 @@ sendQueryIdMessages targetID ns lParam targets = do -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i => NodeID -- ^ target ID - -> LocalNodeState -- ^ sender node state + -> LocalNodeState s -- ^ sender node state -> Maybe i -- ^ optionally provide a different l parameter -> (Integer -> FediChordMessage) lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) @@ -589,7 +589,7 @@ lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 -- | Send a stabilise request to provided 'RemoteNode' and, if successful, -- return parsed neighbour lists -requestStabilise :: LocalNodeState -- ^ sending node +requestStabilise :: LocalNodeState s -- ^ sending node -> RemoteNodeState -- ^ neighbour node to send to -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node requestStabilise ns neighbour = do @@ -624,7 +624,7 @@ requestStabilise ns neighbour = do ) responses -requestPing :: LocalNodeState -- ^ sending node +requestPing :: LocalNodeState s -- ^ sending node -> RemoteNodeState -- ^ node to be PINGed -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node requestPing ns target = do @@ -723,7 +723,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache queueAddEntries :: Foldable c => c RemoteCacheEntry - -> LocalNodeState + -> LocalNodeState s -> IO () queueAddEntries entries ns = do now <- getPOSIXTime @@ -733,14 +733,14 @@ queueAddEntries entries ns = do -- | enque a list of node IDs to be deleted from the global NodeCache queueDeleteEntries :: Foldable c => c NodeID - -> LocalNodeState + -> LocalNodeState s -> IO () queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry -- | enque a single node ID to be deleted from the global NodeCache queueDeleteEntry :: NodeID - -> LocalNodeState + -> LocalNodeState s -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 70c9ff7..914ea57 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -78,7 +78,6 @@ import Data.Maybe (catMaybes, fromJust, fromMaybe, isJust, isNothing, mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX -import Data.Typeable (Typeable (..), typeOf) import Data.Word import qualified Network.ByteOrder as NetworkBytes import Network.Socket hiding (recv, recvFrom, send, @@ -95,10 +94,10 @@ import Debug.Trace (trace) -- | initialise data structures, compute own IDs and bind to listening socket -- ToDo: load persisted state, thus this function already operates in IO -fediChordInit :: (Service s RealNodeSTM) +fediChordInit :: (Service s (RealNodeSTM s)) => FediChordConf - -> (RealNodeSTM -> IO (s RealNodeSTM)) -- ^ runner function for service - -> IO (Socket, LocalNodeStateSTM) + -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service + -> IO (Socket, LocalNodeStateSTM s) fediChordInit initConf serviceRunner = do emptyLookupCache <- newTVarIO Map.empty let realNode = RealNode { @@ -119,7 +118,7 @@ fediChordInit initConf serviceRunner = do -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. -nodeStateInit :: RealNodeSTM -> IO LocalNodeState +nodeStateInit :: RealNodeSTM s -> IO (LocalNodeState s) nodeStateInit realNodeSTM = do realNode <- readTVarIO realNodeSTM cacheSTM <- newTVarIO initCache @@ -151,9 +150,9 @@ nodeStateInit realNodeSTM = do -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- for resolving the new node's position. -fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' +fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordBootstrapJoin nsSTM bootstrapNode = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined @@ -169,7 +168,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: LocalNodeStateSTM -> IO () +convergenceSampleThread :: LocalNodeStateSTM s -> IO () convergenceSampleThread nsSTM = forever $ do nsSnap <- readTVarIO nsSTM parentNode <- readTVarIO $ parentRealNode nsSnap @@ -200,7 +199,7 @@ convergenceSampleThread nsSTM = forever $ do -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM) +tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) tryBootstrapJoining nsSTM = do bss <- atomically $ do nsSnap <- readTVar nsSTM @@ -217,7 +216,7 @@ tryBootstrapJoining nsSTM = do -- | Look up a key just based on the responses of a single bootstrapping node. -bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) +bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do ns <- readTVarIO nsSTM srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) @@ -248,8 +247,8 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do -- | join a node to the DHT using the global node cache -- node's position. -fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' - -> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a +fediChordJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordJoin nsSTM = do ns <- readTVarIO nsSTM @@ -265,7 +264,7 @@ fediChordJoin nsSTM = do -- | Wait for new cache entries to appear and then try joining on them. -- Exits after successful joining. -joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () +joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO () joinOnNewEntriesThread nsSTM = loop where loop = do @@ -278,8 +277,7 @@ joinOnNewEntriesThread nsSTM = loop result -> pure (result, cache) case lookupResult of -- already joined - FOUND _ -> do - print =<< readTVarIO nsSTM + FOUND _ -> pure () -- otherwise try joining FORWARD _ -> do @@ -295,7 +293,7 @@ joinOnNewEntriesThread nsSTM = loop -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -nodeCacheWriter :: LocalNodeStateSTM -> IO () +nodeCacheWriter :: LocalNodeStateSTM s -> IO () nodeCacheWriter nsSTM = forever $ atomically $ do ns <- readTVar nsSTM @@ -309,7 +307,7 @@ maxEntryAge = 600 -- | Periodically iterate through cache, clean up expired entries and verify unverified ones -nodeCacheVerifyThread :: LocalNodeStateSTM -> IO () +nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () nodeCacheVerifyThread nsSTM = forever $ do putStrLn "cache verify run: begin" -- get cache @@ -370,7 +368,7 @@ nodeCacheVerifyThread nsSTM = forever $ do -- | Checks the invariant of at least @jEntries@ per cache slice. -- If this invariant does not hold, the middle of the slice is returned for -- making lookups to that ID -checkCacheSliceInvariants :: LocalNodeState +checkCacheSliceInvariants :: LocalNodeState s -> NodeCache -> [NodeID] -- ^ list of middle IDs of slices not -- ^ fulfilling the invariant @@ -426,12 +424,11 @@ checkCacheSliceInvariants ns -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. -stabiliseThread :: LocalNodeStateSTM -> IO () +stabiliseThread :: LocalNodeStateSTM s -> IO () stabiliseThread nsSTM = forever $ do ns <- readTVarIO nsSTM putStrLn "stabilise run: begin" - print ns -- iterate through the same snapshot, collect potential new neighbours -- and nodes to be deleted, and modify these changes only at the end of @@ -489,8 +486,8 @@ stabiliseThread nsSTM = forever $ do -- with the n+1-th neighbour. -- On success, return 2 lists: The failed nodes and the potential neighbours -- returned by the queried node. - stabiliseClosestResponder :: LocalNodeState -- ^ own node - -> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors + stabiliseClosestResponder :: LocalNodeState s -- ^ own node + -> (LocalNodeState s -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors -> Int -- ^ index of neighbour to query -> [RemoteNodeState] -- ^ delete accumulator -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours) @@ -514,7 +511,7 @@ stabiliseThread nsSTM = forever $ do currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns - checkReachability :: LocalNodeState -- ^ this node + checkReachability :: LocalNodeState s -- ^ this node -> RemoteNodeState -- ^ node to Ping for reachability -> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one checkReachability ns toCheck = do @@ -543,10 +540,10 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Socket -> LocalNodeStateSTM -> IO () +fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO () fediMainThreads sock nsSTM = do ns <- readTVarIO nsSTM - putStrLn $ "launching threads, ns: " <> show ns + putStrLn $ "launching threads" sendQ <- newTQueueIO recvQ <- newTQueueIO -- concurrently launch all handler threads, if one of them throws an exception @@ -588,7 +585,7 @@ requestMapPurge mapVar = forever $ do -- and pass them to their specific handling function. fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue - -> LocalNodeStateSTM -- ^ acting NodeState + -> LocalNodeStateSTM s -- ^ acting NodeState -> IO () fediMessageHandler sendQ recvQ nsSTM = do -- Read node state just once, assuming that all relevant data for this function does @@ -653,14 +650,14 @@ fediMessageHandler sendQ recvQ nsSTM = do -- ==== interface to service layer ==== -instance DHT RealNodeSTM where +instance DHT (RealNodeSTM s) where lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring -- | Returns the hostname and port of the host responsible for a key. -- Information is provided from a cache, only on a cache miss a new DHT lookup -- is triggered. -getKeyResponsibility :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber)) +getKeyResponsibility :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) getKeyResponsibility nodeSTM lookupKey = do node <- readTVarIO nodeSTM cache <- readTVarIO $ lookupCacheSTM node @@ -676,7 +673,7 @@ getKeyResponsibility nodeSTM lookupKey = do -- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the -- new entry. -- If no vserver is active in the DHT, 'Nothing' is returned. -updateLookupCache :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber)) +updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) updateLookupCache nodeSTM lookupKey = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM @@ -703,7 +700,7 @@ updateLookupCache nodeSTM lookupKey = do -- | Periodically clean the lookup cache from expired entries. -lookupCacheCleanup :: RealNodeSTM -> IO () +lookupCacheCleanup :: RealNodeSTM s -> IO () lookupCacheCleanup nodeSTM = do node <- readTVarIO nodeSTM forever $ do diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 604519e..5b8ef17 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -147,9 +147,8 @@ a `localCompare` b -- | Data for managing the virtual server nodes of this real node. -- Also contains shared data and config values. -- TODO: more data structures for k-choices bookkeeping ---data RealNode s = RealNode -data RealNode = RealNode - { vservers :: [LocalNodeStateSTM] +data RealNode s = RealNode + { vservers :: [LocalNodeStateSTM s] -- ^ references to all active versers , nodeConfig :: FediChordConf -- ^ holds the initial configuration read at program start @@ -157,10 +156,10 @@ data RealNode = RealNode -- ^ nodes to be used as bootstrapping points, new ones learned during operation , lookupCacheSTM :: TVar LookupCache -- ^ a global cache of looked up keys and their associated nodes + , nodeService :: s (RealNodeSTM s) } ---type RealNodeSTM s = TVar (RealNode s) -type RealNodeSTM = TVar RealNode +type RealNodeSTM s = TVar (RealNode s) -- | represents a node and all its important state data RemoteNodeState = RemoteNodeState @@ -182,7 +181,7 @@ instance Ord RemoteNodeState where a `compare` b = nid a `compare` nid b -- | represents a node and encapsulates all data and parameters that are not present for remote nodes -data LocalNodeState = LocalNodeState +data LocalNodeState s = LocalNodeState { nodeState :: RemoteNodeState -- ^ represents common data present both in remote and local node representations , nodeCacheSTM :: TVar NodeCache @@ -201,13 +200,13 @@ data LocalNodeState = LocalNodeState -- ^ number of parallel sent queries , jEntriesPerSlice :: Int -- ^ number of desired entries per cache slice - , parentRealNode :: RealNodeSTM + , parentRealNode :: RealNodeSTM s -- ^ the parent node managing this vserver instance } deriving (Show, Eq) -- | for concurrent access, LocalNodeState is wrapped in a TVar -type LocalNodeStateSTM = TVar LocalNodeState +type LocalNodeStateSTM s = TVar (LocalNodeState s) -- | class for various NodeState representations, providing -- getters and setters for common values @@ -244,14 +243,14 @@ instance NodeState RemoteNodeState where toRemoteNodeState = id -- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState' -propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState +propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState s -> LocalNodeState s propagateNodeStateSet_ func ns = let newNs = func $ nodeState ns in ns {nodeState = newNs} -instance NodeState LocalNodeState where +instance NodeState (LocalNodeState s) where getNid = getNid . nodeState getDomain = getDomain . nodeState getIpAddr = getIpAddr . nodeState @@ -273,21 +272,24 @@ instance Typeable a => Show (TVar a) where instance Typeable a => Show (TQueue a) where show x = show (typeOf x) +instance Typeable a => Show (TChan a) where + show x = show (typeOf x) + -- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list -setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +setPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds} -- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list -setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +setSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs} -- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined -addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +addPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns} -- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined -addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState +addSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns} instance HasKeyID NodeID RemoteNodeState where diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 264bccb..d5dd30d 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -25,6 +25,7 @@ import qualified Data.Text.Lazy as Txt import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX +import Data.Typeable (Typeable) import System.Random import qualified Network.Wai.Handler.Warp as Warp @@ -48,6 +49,7 @@ data PostService d = PostService , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously } + deriving (Typeable) type Hashtag = Txt.Text type PostID = Txt.Text diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 1cace7a..bcc2eaf 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -292,7 +292,7 @@ exampleNodeState = RemoteNodeState { , vServerID = 0 } -exampleLocalNode :: IO LocalNodeState +exampleLocalNode :: IO (LocalNodeState s) exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode { vservers = [] , nodeConfig = exampleFediConf