add reference from RealNode to Service

This required to make both RealNode(STM) and LocalNodeState(STM) parameterisable
polymorphic types
This commit is contained in:
Trolli Schmittlauch 2020-07-30 02:19:26 +02:00
parent 4bf8091143
commit 5ffe1b074e
5 changed files with 68 additions and 67 deletions

View file

@ -92,7 +92,7 @@ import Debug.Trace (trace)
-- TODO: evaluate more fine-grained argument passing to allow granular locking -- TODO: evaluate more fine-grained argument passing to allow granular locking
-- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache -- | look up an ID to either claim responsibility for it or return the closest l nodes from the local cache
queryLocalCache :: LocalNodeState -> NodeCache -> Int -> NodeID -> QueryResponse queryLocalCache :: LocalNodeState s -> NodeCache -> Int -> NodeID -> QueryResponse
queryLocalCache ownState nCache lBestNodes targetID queryLocalCache ownState nCache lBestNodes targetID
-- as target ID falls between own ID and first predecessor, it is handled by this node -- as target ID falls between own ID and first predecessor, it is handled by this node
-- This only makes sense if the node is part of the DHT by having joined. -- This only makes sense if the node is part of the DHT by having joined.
@ -130,7 +130,7 @@ closestCachePredecessors remainingLookups lastID nCache
-- Looks up the successor of the lookup key on a 'RingMap' representation of the -- Looks up the successor of the lookup key on a 'RingMap' representation of the
-- predecessor list with the node itself added. If the result is the same as the node -- predecessor list with the node itself added. If the result is the same as the node
-- itself then it falls into the responsibility interval. -- itself then it falls into the responsibility interval.
isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState -> Bool isInOwnResponsibilitySlice :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs) isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyID lookupTarget :: NodeID) predecessorRMap) == pure (getNid ownNs)
where where
predecessorList = predecessors ownNs predecessorList = predecessors ownNs
@ -140,10 +140,10 @@ isInOwnResponsibilitySlice lookupTarget ownNs = (fst <$> rMapLookupSucc (getKeyI
ownRemote = toRemoteNodeState ownNs ownRemote = toRemoteNodeState ownNs
closestPredecessor = headMay predecessorList closestPredecessor = headMay predecessorList
isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool isPossiblePredecessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossiblePredecessor = isInOwnResponsibilitySlice isPossiblePredecessor = isInOwnResponsibilitySlice
isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState -> Bool isPossibleSuccessor :: HasKeyID NodeID a => a -> LocalNodeState s -> Bool
isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs) isPossibleSuccessor lookupTarget ownNs = (fst <$> rMapLookupPred (getKeyID lookupTarget :: NodeID) successorRMap) == pure (getNid ownNs)
where where
successorList = successors ownNs successorList = successors ownNs
@ -224,7 +224,7 @@ markCacheEntryAsVerified timestamp nid = RingMap . Map.adjust adjustFunc nid . g
-- | uses the successor and predecessor list of a node as an indicator for whether a -- | uses the successor and predecessor list of a node as an indicator for whether a
-- node has properly joined the DHT -- node has properly joined the DHT
isJoined :: LocalNodeState -> Bool isJoined :: LocalNodeState s -> Bool
isJoined ns = not . all null $ [successors ns, predecessors ns] isJoined ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending -- | the size limit to be used when serialising messages for sending
@ -248,7 +248,7 @@ ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent. -- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM -- ^ the handling node handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle -> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request -> SockAddr -- ^ source address of the request
@ -287,7 +287,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- | execute a key ID lookup on local cache and respond with the result -- | execute a key ID lookup on local cache and respond with the result
respondQueryID :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID nsSTM msgSet = do respondQueryID nsSTM msgSet = do
putStrLn "responding to a QueryID request" putStrLn "responding to a QueryID request"
-- this message cannot be split reasonably, so just -- this message cannot be split reasonably, so just
@ -328,7 +328,7 @@ respondQueryID nsSTM msgSet = do
-- | Respond to a Leave request by removing the leaving node from local data structures -- | Respond to a Leave request by removing the leaving node from local data structures
-- and confirming with response. -- and confirming with response.
-- TODO: copy over key data from leaver and confirm -- TODO: copy over key data from leaver and confirm
respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave nsSTM msgSet = do respondLeave nsSTM msgSet = do
-- combine payload of all parts -- combine payload of all parts
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) -> let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
@ -359,7 +359,7 @@ respondLeave nsSTM msgSet = do
pure $ serialiseMessage sendMessageSize responseMsg pure $ serialiseMessage sendMessageSize responseMsg
-- | respond to stabilise requests by returning successor and predecessor list -- | respond to stabilise requests by returning successor and predecessor list
respondStabilise :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondStabilise :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondStabilise nsSTM msgSet = do respondStabilise nsSTM msgSet = do
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
let let
@ -381,7 +381,7 @@ respondStabilise nsSTM msgSet = do
-- | respond to Ping request by returning all active vserver NodeStates -- | respond to Ping request by returning all active vserver NodeStates
respondPing :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondPing :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondPing nsSTM msgSet = do respondPing nsSTM msgSet = do
-- TODO: respond with all active VS when implementing k-choices -- TODO: respond with all active VS when implementing k-choices
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
@ -400,7 +400,7 @@ respondPing nsSTM msgSet = do
-- this modifies node state, so locking and IO seems to be necessary. -- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure -- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request -- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do responseMsg <- atomically $ do
@ -451,8 +451,8 @@ respondJoin nsSTM msgSet = do
-- | send a join request and return the joined 'LocalNodeState' including neighbours -- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM -- ^ joining NodeState -> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String LocalNodeStateSTM) -- ^ node after join with all its new information -> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do requestJoin toJoinOn ownStateSTM = do
ownState <- readTVarIO ownStateSTM ownState <- readTVarIO ownStateSTM
prn <- readTVarIO $ parentRealNode ownState prn <- readTVarIO $ parentRealNode ownState
@ -500,7 +500,7 @@ requestJoin toJoinOn ownStateSTM = do
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: LocalNodeState -- ^ NodeState of the querying node requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up -> NodeID -- ^ target key ID to look up
-> IO RemoteNodeState -- ^ the node responsible for handling that key -> IO RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes -- 1. do a local lookup for the l closest nodes
@ -515,7 +515,7 @@ requestQueryID ns targetID = do
queryIdLookupLoop firstCacheSnapshot ns 50 targetID queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: NodeCache -> LocalNodeState -> Int -> NodeID -> IO RemoteNodeState queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState
-- return node itself as default fallback value against infinite recursion. -- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value -- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
@ -541,7 +541,7 @@ queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
sendQueryIdMessages :: (Integral i) sendQueryIdMessages :: (Integral i)
=> NodeID -- ^ target key ID to look up => NodeID -- ^ target key ID to look up
-> LocalNodeState -- ^ node state of the node doing the query -> LocalNodeState s -- ^ node state of the node doing the query
-> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned -> Maybe i -- ^ optionally provide an explicit @l@ parameter of number of nodes to be returned
-> [RemoteNodeState] -- ^ nodes to query -> [RemoteNodeState] -- ^ nodes to query
-> IO QueryResponse -- ^ accumulated response -> IO QueryResponse -- ^ accumulated response
@ -579,7 +579,7 @@ sendQueryIdMessages targetID ns lParam targets = do
-- | Create a QueryID message to be supplied to 'sendRequestTo' -- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i lookupMessage :: Integral i
=> NodeID -- ^ target ID => NodeID -- ^ target ID
-> LocalNodeState -- ^ sender node state -> LocalNodeState s -- ^ sender node state
-> Maybe i -- ^ optionally provide a different l parameter -> Maybe i -- ^ optionally provide a different l parameter
-> (Integer -> FediChordMessage) -> (Integer -> FediChordMessage)
lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
@ -589,7 +589,7 @@ lookupMessage targetID ns lParam = \rID -> Request rID (toRemoteNodeState ns) 1
-- | Send a stabilise request to provided 'RemoteNode' and, if successful, -- | Send a stabilise request to provided 'RemoteNode' and, if successful,
-- return parsed neighbour lists -- return parsed neighbour lists
requestStabilise :: LocalNodeState -- ^ sending node requestStabilise :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ neighbour node to send to -> RemoteNodeState -- ^ neighbour node to send to
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do requestStabilise ns neighbour = do
@ -624,7 +624,7 @@ requestStabilise ns neighbour = do
) responses ) responses
requestPing :: LocalNodeState -- ^ sending node requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed -> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
requestPing ns target = do requestPing ns target = do
@ -723,7 +723,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState -> LocalNodeState s
-> IO () -> IO ()
queueAddEntries entries ns = do queueAddEntries entries ns = do
now <- getPOSIXTime now <- getPOSIXTime
@ -733,14 +733,14 @@ queueAddEntries entries ns = do
-- | enque a list of node IDs to be deleted from the global NodeCache -- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c queueDeleteEntries :: Foldable c
=> c NodeID => c NodeID
-> LocalNodeState -> LocalNodeState s
-> IO () -> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache -- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID queueDeleteEntry :: NodeID
-> LocalNodeState -> LocalNodeState s
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete

View file

@ -78,7 +78,6 @@ import Data.Maybe (catMaybes, fromJust, fromMaybe,
isJust, isNothing, mapMaybe) isJust, isNothing, mapMaybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Typeable (Typeable (..), typeOf)
import Data.Word import Data.Word
import qualified Network.ByteOrder as NetworkBytes import qualified Network.ByteOrder as NetworkBytes
import Network.Socket hiding (recv, recvFrom, send, import Network.Socket hiding (recv, recvFrom, send,
@ -95,10 +94,10 @@ import Debug.Trace (trace)
-- | initialise data structures, compute own IDs and bind to listening socket -- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO -- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: (Service s RealNodeSTM) fediChordInit :: (Service s (RealNodeSTM s))
=> FediChordConf => FediChordConf
-> (RealNodeSTM -> IO (s RealNodeSTM)) -- ^ runner function for service -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
-> IO (Socket, LocalNodeStateSTM) -> IO (Socket, LocalNodeStateSTM s)
fediChordInit initConf serviceRunner = do fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode { let realNode = RealNode {
@ -119,7 +118,7 @@ fediChordInit initConf serviceRunner = do
-- | initialises the 'NodeState' for this local node. -- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests. -- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: RealNodeSTM -> IO LocalNodeState nodeStateInit :: RealNodeSTM s -> IO (LocalNodeState s)
nodeStateInit realNodeSTM = do nodeStateInit realNodeSTM = do
realNode <- readTVarIO realNodeSTM realNode <- readTVarIO realNodeSTM
cacheSTM <- newTVarIO initCache cacheSTM <- newTVarIO initCache
@ -151,9 +150,9 @@ nodeStateInit realNodeSTM = do
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position. -- for resolving the new node's position.
fediChordBootstrapJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordBootstrapJoin nsSTM bootstrapNode = do fediChordBootstrapJoin nsSTM bootstrapNode = do
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
@ -169,7 +168,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead. -- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM -> IO () convergenceSampleThread :: LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap parentNode <- readTVarIO $ parentRealNode nsSnap
@ -200,7 +199,7 @@ convergenceSampleThread nsSTM = forever $ do
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM -> IO (Either String LocalNodeStateSTM) tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining nsSTM = do tryBootstrapJoining nsSTM = do
bss <- atomically $ do bss <- atomically $ do
nsSnap <- readTVar nsSTM nsSnap <- readTVar nsSTM
@ -217,7 +216,7 @@ tryBootstrapJoining nsSTM = do
-- | Look up a key just based on the responses of a single bootstrapping node. -- | Look up a key just based on the responses of a single bootstrapping node.
bootstrapQueryId :: LocalNodeStateSTM -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState) bootstrapQueryId :: LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> IO (Either String RemoteNodeState)
bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
@ -248,8 +247,8 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
-- | join a node to the DHT using the global node cache -- | join a node to the DHT using the global node cache
-- node's position. -- node's position.
fediChordJoin :: LocalNodeStateSTM -- ^ the local 'NodeState' fediChordJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String LocalNodeStateSTM) -- ^ the joined 'NodeState' after a -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordJoin nsSTM = do fediChordJoin nsSTM = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
@ -265,7 +264,7 @@ fediChordJoin nsSTM = do
-- | Wait for new cache entries to appear and then try joining on them. -- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining. -- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM -> IO () joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop joinOnNewEntriesThread nsSTM = loop
where where
loop = do loop = do
@ -278,8 +277,7 @@ joinOnNewEntriesThread nsSTM = loop
result -> pure (result, cache) result -> pure (result, cache)
case lookupResult of case lookupResult of
-- already joined -- already joined
FOUND _ -> do FOUND _ ->
print =<< readTVarIO nsSTM
pure () pure ()
-- otherwise try joining -- otherwise try joining
FORWARD _ -> do FORWARD _ -> do
@ -295,7 +293,7 @@ joinOnNewEntriesThread nsSTM = loop
-- | cache updater thread that waits for incoming NodeCache update instructions on -- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: LocalNodeStateSTM -> IO () nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
nodeCacheWriter nsSTM = nodeCacheWriter nsSTM =
forever $ atomically $ do forever $ atomically $ do
ns <- readTVar nsSTM ns <- readTVar nsSTM
@ -309,7 +307,7 @@ maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones -- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM -> IO () nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do nodeCacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin" putStrLn "cache verify run: begin"
-- get cache -- get cache
@ -370,7 +368,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
-- | Checks the invariant of at least @jEntries@ per cache slice. -- | Checks the invariant of at least @jEntries@ per cache slice.
-- If this invariant does not hold, the middle of the slice is returned for -- If this invariant does not hold, the middle of the slice is returned for
-- making lookups to that ID -- making lookups to that ID
checkCacheSliceInvariants :: LocalNodeState checkCacheSliceInvariants :: LocalNodeState s
-> NodeCache -> NodeCache
-> [NodeID] -- ^ list of middle IDs of slices not -> [NodeID] -- ^ list of middle IDs of slices not
-- ^ fulfilling the invariant -- ^ fulfilling the invariant
@ -426,12 +424,11 @@ checkCacheSliceInvariants ns
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists. -- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours. -- If necessary, request new neighbours.
stabiliseThread :: LocalNodeStateSTM -> IO () stabiliseThread :: LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do stabiliseThread nsSTM = forever $ do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
putStrLn "stabilise run: begin" putStrLn "stabilise run: begin"
print ns
-- iterate through the same snapshot, collect potential new neighbours -- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of -- and nodes to be deleted, and modify these changes only at the end of
@ -489,8 +486,8 @@ stabiliseThread nsSTM = forever $ do
-- with the n+1-th neighbour. -- with the n+1-th neighbour.
-- On success, return 2 lists: The failed nodes and the potential neighbours -- On success, return 2 lists: The failed nodes and the potential neighbours
-- returned by the queried node. -- returned by the queried node.
stabiliseClosestResponder :: LocalNodeState -- ^ own node stabiliseClosestResponder :: LocalNodeState s -- ^ own node
-> (LocalNodeState -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors -> (LocalNodeState s -> [RemoteNodeState]) -- ^ getter function for either predecessors or successors
-> Int -- ^ index of neighbour to query -> Int -- ^ index of neighbour to query
-> [RemoteNodeState] -- ^ delete accumulator -> [RemoteNodeState] -- ^ delete accumulator
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours) -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
@ -514,7 +511,7 @@ stabiliseThread nsSTM = forever $ do
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
checkReachability :: LocalNodeState -- ^ this node checkReachability :: LocalNodeState s -- ^ this node
-> RemoteNodeState -- ^ node to Ping for reachability -> RemoteNodeState -- ^ node to Ping for reachability
-> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one -> IO (Maybe RemoteNodeState) -- ^ if the Pinged node handles the requested node state then that one
checkReachability ns toCheck = do checkReachability ns toCheck = do
@ -543,10 +540,10 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord -- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeStateSTM -> IO () fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
putStrLn $ "launching threads, ns: " <> show ns putStrLn $ "launching threads"
sendQ <- newTQueueIO sendQ <- newTQueueIO
recvQ <- newTQueueIO recvQ <- newTQueueIO
-- concurrently launch all handler threads, if one of them throws an exception -- concurrently launch all handler threads, if one of them throws an exception
@ -588,7 +585,7 @@ requestMapPurge mapVar = forever $ do
-- and pass them to their specific handling function. -- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM -- ^ acting NodeState -> LocalNodeStateSTM s -- ^ acting NodeState
-> IO () -> IO ()
fediMessageHandler sendQ recvQ nsSTM = do fediMessageHandler sendQ recvQ nsSTM = do
-- Read node state just once, assuming that all relevant data for this function does -- Read node state just once, assuming that all relevant data for this function does
@ -653,14 +650,14 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- ==== interface to service layer ==== -- ==== interface to service layer ====
instance DHT RealNodeSTM where instance DHT (RealNodeSTM s) where
lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring
forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring
-- | Returns the hostname and port of the host responsible for a key. -- | Returns the hostname and port of the host responsible for a key.
-- Information is provided from a cache, only on a cache miss a new DHT lookup -- Information is provided from a cache, only on a cache miss a new DHT lookup
-- is triggered. -- is triggered.
getKeyResponsibility :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber)) getKeyResponsibility :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
getKeyResponsibility nodeSTM lookupKey = do getKeyResponsibility nodeSTM lookupKey = do
node <- readTVarIO nodeSTM node <- readTVarIO nodeSTM
cache <- readTVarIO $ lookupCacheSTM node cache <- readTVarIO $ lookupCacheSTM node
@ -676,7 +673,7 @@ getKeyResponsibility nodeSTM lookupKey = do
-- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the -- | Triggers a new DHT lookup for a key, updates the lookup cache and returns the
-- new entry. -- new entry.
-- If no vserver is active in the DHT, 'Nothing' is returned. -- If no vserver is active in the DHT, 'Nothing' is returned.
updateLookupCache :: RealNodeSTM -> NodeID -> IO (Maybe (String, PortNumber)) updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM lookupKey = do updateLookupCache nodeSTM lookupKey = do
(node, lookupSource) <- atomically $ do (node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM node <- readTVar nodeSTM
@ -703,7 +700,7 @@ updateLookupCache nodeSTM lookupKey = do
-- | Periodically clean the lookup cache from expired entries. -- | Periodically clean the lookup cache from expired entries.
lookupCacheCleanup :: RealNodeSTM -> IO () lookupCacheCleanup :: RealNodeSTM s -> IO ()
lookupCacheCleanup nodeSTM = do lookupCacheCleanup nodeSTM = do
node <- readTVarIO nodeSTM node <- readTVarIO nodeSTM
forever $ do forever $ do

View file

@ -147,9 +147,8 @@ a `localCompare` b
-- | Data for managing the virtual server nodes of this real node. -- | Data for managing the virtual server nodes of this real node.
-- Also contains shared data and config values. -- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping -- TODO: more data structures for k-choices bookkeeping
--data RealNode s = RealNode data RealNode s = RealNode
data RealNode = RealNode { vservers :: [LocalNodeStateSTM s]
{ vservers :: [LocalNodeStateSTM]
-- ^ references to all active versers -- ^ references to all active versers
, nodeConfig :: FediChordConf , nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start -- ^ holds the initial configuration read at program start
@ -157,10 +156,10 @@ data RealNode = RealNode
-- ^ nodes to be used as bootstrapping points, new ones learned during operation -- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache , lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes -- ^ a global cache of looked up keys and their associated nodes
, nodeService :: s (RealNodeSTM s)
} }
--type RealNodeSTM s = TVar (RealNode s) type RealNodeSTM s = TVar (RealNode s)
type RealNodeSTM = TVar RealNode
-- | represents a node and all its important state -- | represents a node and all its important state
data RemoteNodeState = RemoteNodeState data RemoteNodeState = RemoteNodeState
@ -182,7 +181,7 @@ instance Ord RemoteNodeState where
a `compare` b = nid a `compare` nid b a `compare` b = nid a `compare` nid b
-- | represents a node and encapsulates all data and parameters that are not present for remote nodes -- | represents a node and encapsulates all data and parameters that are not present for remote nodes
data LocalNodeState = LocalNodeState data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState { nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations -- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache , nodeCacheSTM :: TVar NodeCache
@ -201,13 +200,13 @@ data LocalNodeState = LocalNodeState
-- ^ number of parallel sent queries -- ^ number of parallel sent queries
, jEntriesPerSlice :: Int , jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice -- ^ number of desired entries per cache slice
, parentRealNode :: RealNodeSTM , parentRealNode :: RealNodeSTM s
-- ^ the parent node managing this vserver instance -- ^ the parent node managing this vserver instance
} }
deriving (Show, Eq) deriving (Show, Eq)
-- | for concurrent access, LocalNodeState is wrapped in a TVar -- | for concurrent access, LocalNodeState is wrapped in a TVar
type LocalNodeStateSTM = TVar LocalNodeState type LocalNodeStateSTM s = TVar (LocalNodeState s)
-- | class for various NodeState representations, providing -- | class for various NodeState representations, providing
-- getters and setters for common values -- getters and setters for common values
@ -244,14 +243,14 @@ instance NodeState RemoteNodeState where
toRemoteNodeState = id toRemoteNodeState = id
-- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState' -- | helper function for setting values on the 'RemoteNodeState' contained in the 'LocalNodeState'
propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState -> LocalNodeState propagateNodeStateSet_ :: (RemoteNodeState -> RemoteNodeState) -> LocalNodeState s -> LocalNodeState s
propagateNodeStateSet_ func ns = let propagateNodeStateSet_ func ns = let
newNs = func $ nodeState ns newNs = func $ nodeState ns
in in
ns {nodeState = newNs} ns {nodeState = newNs}
instance NodeState LocalNodeState where instance NodeState (LocalNodeState s) where
getNid = getNid . nodeState getNid = getNid . nodeState
getDomain = getDomain . nodeState getDomain = getDomain . nodeState
getIpAddr = getIpAddr . nodeState getIpAddr = getIpAddr . nodeState
@ -273,21 +272,24 @@ instance Typeable a => Show (TVar a) where
instance Typeable a => Show (TQueue a) where instance Typeable a => Show (TQueue a) where
show x = show (typeOf x) show x = show (typeOf x)
instance Typeable a => Show (TChan a) where
show x = show (typeOf x)
-- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list -- | convenience function that replaces the predecessors of a 'LocalNodeState' with the k closest nodes from the provided list
setPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState setPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds} setPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ preds}
-- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list -- | convenience function that replaces the successors of a 'LocalNodeState' with the k closest nodes from the provided list
setSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState setSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs} setSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . rMapFromList . fmap keyValuePair . filter ((/=) (getNid ns) . getNid) $ succs}
-- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined -- | sets the predecessors of a 'LocalNodeState' to the closest k nodes of the current predecessors and the provided list, combined
addPredecessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState addPredecessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns} addPredecessors preds ns = ns {predecessors = takeRMapPredecessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) preds) . rMapFromList . fmap keyValuePair $ predecessors ns}
-- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined -- | sets the successors of a 'LocalNodeState' to the closest k nodes of the current successors and the provided list, combined
addSuccessors :: [RemoteNodeState] -> LocalNodeState -> LocalNodeState addSuccessors :: [RemoteNodeState] -> LocalNodeState s -> LocalNodeState s
addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns} addSuccessors succs ns = ns {successors = takeRMapSuccessors (getNid ns) (kNeighbours ns) . addRMapEntries (keyValuePair <$> filter ((/=) (getNid ns) . getNid) succs) . rMapFromList . fmap keyValuePair $ successors ns}
instance HasKeyID NodeID RemoteNodeState where instance HasKeyID NodeID RemoteNodeState where

View file

@ -25,6 +25,7 @@ import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC), import Data.Text.Normalize (NormalizationMode (NFC),
normalize) normalize)
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import System.Random import System.Random
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
@ -48,6 +49,7 @@ data PostService d = PostService
, relayInQueue :: TQueue (Hashtag, PostID, PostContent) , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously -- ^ Queue for processing incoming posts of own instance asynchronously
} }
deriving (Typeable)
type Hashtag = Txt.Text type Hashtag = Txt.Text
type PostID = Txt.Text type PostID = Txt.Text

View file

@ -292,7 +292,7 @@ exampleNodeState = RemoteNodeState {
, vServerID = 0 , vServerID = 0
} }
exampleLocalNode :: IO LocalNodeState exampleLocalNode :: IO (LocalNodeState s)
exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode { exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode {
vservers = [] vservers = []
, nodeConfig = exampleFediConf , nodeConfig = exampleFediConf