Compare commits

..

6 commits

4 changed files with 245 additions and 216 deletions

View file

@ -18,29 +18,10 @@ main = do
-- ToDo: parse and pass config
-- probably use `tomland` for that
(fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode
either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
wait =<< async (fediMainThreads serverSock thisNode)
)
joinedState
pure ()
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- wait for all DHT threads to terminate, this keeps the main thread running
wait fediThreads
readConfig :: IO (FediChordConf, ServiceConf)

View file

@ -241,16 +241,16 @@ sendMessageSize = 1200
-- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
requestID = requestID req
, senderID = ownID
, senderID = receiverID req
, part = part req
, isFinalPart = False
, action = action req
, payload = Nothing
}
ackRequest _ _ = Map.empty
ackRequest _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
Nothing -> pure ()
Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) (
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do
writeTVar ownStateSTM newState
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do
@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do
_ -> Set.empty
-- forward entries to global cache
queueAddEntries entrySet ns
queueAddEntries entrySet (cacheWriteQueue ns)
-- return accumulated QueryResult
pure $ case acc of
-- once a FOUND as been encountered, return this as a result
@ -670,7 +670,7 @@ requestStabilise ns neighbour = do
)
([],[]) respSet
-- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)
@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueAddEntries entries ns = do
queueAddEntries entries cacheQ = do
now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c
=> c NodeID
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> TQueue (NodeCache -> NodeCache)
-> IO ()
queueUpdateVerifieds nIds ns = do
queueUpdateVerifieds nIds cacheQ = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result

View file

@ -69,12 +69,12 @@ import qualified Data.ByteString.UTF8 as BSU
import Data.Either (rights)
import Data.Foldable (foldr')
import Data.Functor.Identity
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.List ((\\))
import qualified Data.Map.Strict as Map
import qualified Data.HashMap.Strict as HMap
import Data.HashMap.Strict (HashMap)
import Data.Maybe (catMaybes, fromJust, fromMaybe,
isJust, isNothing, mapMaybe)
import qualified Data.Set as Set
@ -98,38 +98,59 @@ import Debug.Trace (trace)
fediChordInit :: (Service s (RealNodeSTM s))
=> FediChordConf
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
-> IO (Socket, RealNodeSTM s)
-> IO (Async (), RealNodeSTM s)
fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode {
vservers = HMap.empty
cacheSTM <- newTVarIO initCache
cacheQ <- atomically newTQueue
let realNode = RealNode
{ vservers = HMap.empty
, nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache
, nodeService = undefined
, globalNodeCacheSTM = cacheSTM
, globalCacheWriteQueue = cacheQ
}
realNodeSTM <- newTVarIO realNode
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
-- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
-- prepare for joining: start node cache writer thread
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
-- TODO: k-choices way of joining, so far just initialise a single vserver
firstVS <- nodeStateInit realNodeSTM
firstVS <- nodeStateInit realNodeSTM 0
firstVSSTM <- newTVarIO firstVS
-- add vserver to list at RealNode
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
pure (serverSock, realNodeSTM)
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining firstVSSTM
fediThreadsAsync <- either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
async (fediMainThreads serverSock realNodeSTM)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
async (fediMainThreads serverSock realNodeSTM)
)
joinedState
pure (fediThreadsAsync, realNodeSTM)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s)
nodeStateInit realNodeSTM = do
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s)
nodeStateInit realNodeSTM vsID' = do
realNode <- readTVarIO realNodeSTM
cacheSTM <- newTVarIO initCache
q <- atomically newTQueue
let
conf = nodeConfig realNode
vsID = 0
vsID = vsID'
containedState = RemoteNodeState {
domain = confDomain conf
, ipAddr = confIP conf
@ -140,8 +161,8 @@ nodeStateInit realNodeSTM = do
}
initialState = LocalNodeState {
nodeState = containedState
, nodeCacheSTM = cacheSTM
, cacheWriteQueue = q
, nodeCacheSTM = globalNodeCacheSTM realNode
, cacheWriteQueue = globalCacheWriteQueue realNode
, successors = []
, predecessors = []
, kNeighbours = 3
@ -174,8 +195,10 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead.
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do
convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
convergenceSampleThread nodeSTM = forever $ do
node <- readTVarIO nodeSTM
forM_ (vservers node) $ \nsSTM -> do
nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap
if isJoined nsSnap
@ -200,7 +223,8 @@ convergenceSampleThread nsSTM = forever $ do
) >> pure ()
-- unjoined node: try joining through all bootstrapping nodes
else tryBootstrapJoining nsSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node
threadDelay delaySecs
@ -336,60 +360,72 @@ joinOnNewEntriesThread nsSTM = loop
-- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
nodeCacheWriter nsSTM =
nodeCacheWriter :: RealNodeSTM s -> IO ()
nodeCacheWriter nodeSTM = do
node <- readTVarIO nodeSTM
forever $ atomically $ do
ns <- readTVar nsSTM
cacheModifier <- readTQueue $ cacheWriteQueue ns
modifyTVar' (nodeCacheSTM ns) cacheModifier
cacheModifier <- readTQueue $ globalCacheWriteQueue node
modifyTVar' (globalNodeCacheSTM node) cacheModifier
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do
-- get cache
(ns, cache, maxEntryAge) <- atomically $ do
ns <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM ns
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
pure (ns, cache, maxEntryAge)
nodeCacheVerifyThread :: RealNodeSTM s -> IO ()
nodeCacheVerifyThread nodeSTM = forever $ do
(node, firstVSSTM) <- atomically $ do
node <- readTVar nodeSTM
case headMay (HMap.elems $ vservers node) of
-- wait until first VS is joined
Nothing -> retry
Just vs' -> pure (node, vs')
let
maxEntryAge = confMaxNodeCacheAge $ nodeConfig node
cacheQ = globalCacheWriteQueue node
cache <- readTVarIO $ globalNodeCacheSTM node
-- always use the first active VS as a sender for operations like Ping
firstVS <- readTVarIO firstVSSTM
-- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime
forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) ->
forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) ->
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
if (now - ts) > maxEntryAge
then
queueDeleteEntry (getNid node) ns
queueDeleteEntry (getNid cacheNode) cacheQ
-- case unverified: try verifying, otherwise delete
else if not validated
then do
-- marking as verified is done by 'requestPing' as well
pong <- requestPing ns node
pong <- requestPing firstVS cacheNode
either (\_->
queueDeleteEntry (getNid node) ns
queueDeleteEntry (getNid cacheNode) cacheQ
)
(\vss ->
if node `notElem` vss
then queueDeleteEntry (getNid node) ns
if cacheNode `notElem` vss
then queueDeleteEntry (getNid cacheNode) cacheQ
-- after verifying a node, check whether it can be a closer neighbour
else do
if node `isPossiblePredecessor` ns
-- do this for each node
-- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor
else forM_ (vservers node) (\nsSTM -> do
ns <- readTVarIO nsSTM
if cacheNode `isPossiblePredecessor` ns
then atomically $ do
ns' <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [node] ns'
writeTVar nsSTM $ addPredecessors [cacheNode] ns'
else pure ()
if node `isPossibleSuccessor` ns
if cacheNode `isPossibleSuccessor` ns
then atomically $ do
ns' <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [node] ns'
writeTVar nsSTM $ addSuccessors [cacheNode] ns'
else pure ()
)
) pong
else pure ()
)
-- check the cache invariant per slice and, if necessary, do a single lookup to the
-- middle of each slice not verifying the invariant
latestNode <- readTVarIO nodeSTM
forM_ (vservers latestNode) (\nsSTM -> do
latestNs <- readTVarIO nsSTM
latestCache <- readTVarIO $ nodeCacheSTM latestNs
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
@ -398,6 +434,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
)
)
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
@ -461,8 +498,10 @@ checkCacheSliceInvariants ns
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours.
stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do
stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
stabiliseThread nodeSTM = forever $ do
node <- readTVarIO nodeSTM
forM_ (vservers node) (\nsSTM -> do
oldNs <- readTVarIO nsSTM
@ -543,8 +582,9 @@ stabiliseThread nsSTM = forever $ do
)
newPredecessor
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
threadDelay stabiliseDelay
)
threadDelay . confStabiliseInterval . nodeConfig $ node
where
-- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retry
@ -605,20 +645,23 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO ()
fediMainThreads sock nodeSTM = do
node <- readTVarIO nodeSTM
putStrLn "launching threads"
sendQ <- newTQueueIO
recvQ <- newTQueueIO
-- concurrently launch all handler threads, if one of them throws an exception
-- all get cancelled
concurrently_
(fediMessageHandler sendQ recvQ nsSTM) $
concurrently_ (stabiliseThread nsSTM) $
concurrently_ (nodeCacheVerifyThread nsSTM) $
concurrently_ (convergenceSampleThread nsSTM) $
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
(fediMessageHandler sendQ recvQ nodeSTM) $
-- decision whether to [1] launch 1 thread per VS or [2] let a single
-- thread process all VSes sequentially:
-- choose option 2 for the sake of limiting concurrency in simulation scenario
concurrently_ (stabiliseThread nodeSTM) $
concurrently_ (nodeCacheVerifyThread nodeSTM) $
concurrently_ (convergenceSampleThread nodeSTM) $
concurrently_ (lookupCacheCleanup nodeSTM) $
concurrently_
(sendThread sock sendQ)
(recvThread sock recvQ)
@ -647,20 +690,17 @@ requestMapPurge purgeAge mapVar = forever $ do
fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM s -- ^ acting NodeState
-> RealNodeSTM s -- ^ node
-> IO ()
fediMessageHandler sendQ recvQ nsSTM = do
-- Read node state just once, assuming that all relevant data for this function does
-- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
fediMessageHandler sendQ recvQ nodeSTM = do
nodeConf <- nodeConfig <$> readTVarIO nodeSTM
-- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
node <- readTVarIO nodeSTM
-- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg
@ -670,12 +710,14 @@ fediMessageHandler sendQ recvQ nsSTM = do
)
(\validMsg ->
case validMsg of
aRequest@Request{}
aRequest@Request{} -> case dispatchVS node aRequest of
-- if no match to an active vserver ID, just ignore
Nothing -> pure ()
-- if not a multipart message, handle immediately. Response is at the same time an ACK
| part aRequest == 1 && isFinalPart aRequest ->
Just nsSTM | part aRequest == 1 && isFinalPart aRequest ->
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
-- otherwise collect all message parts first before handling the whole request
| otherwise -> do
Just nsSTM | otherwise -> do
now <- getPOSIXTime
-- critical locking section of requestMap
rMapState <- takeMVar requestMap
@ -693,7 +735,7 @@ fediMessageHandler sendQ recvQ nsSTM = do
-- put map back into MVar, end of critical section
putMVar requestMap newMapState
-- ACK the received part
forM_ (ackRequest (getNid nsSnap) aRequest) $
forM_ (ackRequest aRequest) $
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
-- if all parts received, then handle request.
let
@ -709,6 +751,8 @@ fediMessageHandler sendQ recvQ nsSTM = do
aMsg
pure ()
where
dispatchVS node req = HMap.lookup (receiverID req) (vservers node)
-- ==== interface to service layer ====

View file

@ -69,10 +69,10 @@ import Control.Exception
import Data.Foldable (foldr')
import Data.Function (on)
import qualified Data.Hashable as Hashable
import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe)
import qualified Data.Set as Set
@ -161,6 +161,10 @@ data RealNode s = RealNode
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes
, globalNodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes.
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
, nodeService :: s (RealNodeSTM s)
}
@ -190,9 +194,9 @@ data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache
-- ^ EpiChord node cache with expiry times for nodes
-- ^ reference to the 'globalNodeCacheSTM'
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- ^ reference to the 'globalCacheWriteQueue
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState]