Compare commits
6 commits
12dfc56a73
...
499c90e63a
Author | SHA1 | Date | |
---|---|---|---|
499c90e63a | |||
1a7afed062 | |||
8e8ea41dc4 | |||
33ae904d17 | |||
68de73d919 | |||
0ab6ee9c8f |
4 changed files with 245 additions and 216 deletions
25
app/Main.hs
25
app/Main.hs
|
@ -18,29 +18,10 @@ main = do
|
|||
-- ToDo: parse and pass config
|
||||
-- probably use `tomland` for that
|
||||
(fConf, sConf) <- readConfig
|
||||
-- TODO: first initialise 'RealNode', then the vservers
|
||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
||||
-- currently no masking is necessary, as there is nothing to clean up
|
||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||
joinedState <- tryBootstrapJoining thisNode
|
||||
either (\err -> do
|
||||
-- handle unsuccessful join
|
||||
|
||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||
print =<< readTVarIO thisNode
|
||||
-- launch thread attempting to join on new cache entries
|
||||
_ <- forkIO $ joinOnNewEntriesThread thisNode
|
||||
wait =<< async (fediMainThreads serverSock thisNode)
|
||||
)
|
||||
(\joinedNS -> do
|
||||
-- launch main eventloop with successfully joined state
|
||||
putStrLn "successful join"
|
||||
wait =<< async (fediMainThreads serverSock thisNode)
|
||||
)
|
||||
joinedState
|
||||
pure ()
|
||||
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
||||
-- wait for all DHT threads to terminate, this keeps the main thread running
|
||||
wait fediThreads
|
||||
|
||||
|
||||
readConfig :: IO (FediChordConf, ServiceConf)
|
||||
|
|
|
@ -241,16 +241,16 @@ sendMessageSize = 1200
|
|||
-- ====== message send and receive operations ======
|
||||
|
||||
-- encode the response to a request that just signals successful receipt
|
||||
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
||||
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
|
||||
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||
requestID = requestID req
|
||||
, senderID = ownID
|
||||
, senderID = receiverID req
|
||||
, part = part req
|
||||
, isFinalPart = False
|
||||
, action = action req
|
||||
, payload = Nothing
|
||||
}
|
||||
ackRequest _ _ = Map.empty
|
||||
ackRequest _ = Map.empty
|
||||
|
||||
|
||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||
|
@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
|||
Nothing -> pure ()
|
||||
Just aPart -> do
|
||||
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
|
||||
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
|
||||
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns)
|
||||
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
|
||||
maybe (pure ()) (
|
||||
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
|
||||
|
@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do
|
|||
writeTVar ownStateSTM newState
|
||||
pure (cacheInsertQ, newState)
|
||||
-- execute the cache insertions
|
||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ
|
||||
if responses == Set.empty
|
||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||
else do
|
||||
|
@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do
|
|||
_ -> Set.empty
|
||||
|
||||
-- forward entries to global cache
|
||||
queueAddEntries entrySet ns
|
||||
queueAddEntries entrySet (cacheWriteQueue ns)
|
||||
-- return accumulated QueryResult
|
||||
pure $ case acc of
|
||||
-- once a FOUND as been encountered, return this as a result
|
||||
|
@ -670,7 +670,7 @@ requestStabilise ns neighbour = do
|
|||
)
|
||||
([],[]) respSet
|
||||
-- update successfully responded neighbour in cache
|
||||
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
|
||||
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet)
|
||||
pure $ if null responsePreds && null responseSuccs
|
||||
then Left "no neighbours returned"
|
||||
else Right (responsePreds, responseSuccs)
|
||||
|
@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
|||
|
||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueAddEntries entries ns = do
|
||||
queueAddEntries entries cacheQ = do
|
||||
now <- getPOSIXTime
|
||||
forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
|
||||
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry
|
||||
|
||||
|
||||
-- | enque a list of node IDs to be deleted from the global NodeCache
|
||||
queueDeleteEntries :: Foldable c
|
||||
=> c NodeID
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
|
||||
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry
|
||||
|
||||
|
||||
-- | enque a single node ID to be deleted from the global NodeCache
|
||||
queueDeleteEntry :: NodeID
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
||||
|
||||
|
@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
|
|||
-- global 'NodeCache'.
|
||||
queueUpdateVerifieds :: Foldable c
|
||||
=> c NodeID
|
||||
-> LocalNodeState s
|
||||
-> TQueue (NodeCache -> NodeCache)
|
||||
-> IO ()
|
||||
queueUpdateVerifieds nIds ns = do
|
||||
queueUpdateVerifieds nIds cacheQ = do
|
||||
now <- getPOSIXTime
|
||||
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
|
||||
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $
|
||||
markCacheEntryAsVerified (Just now) nid'
|
||||
|
||||
-- | retry an IO action at most *i* times until it delivers a result
|
||||
|
|
|
@ -69,12 +69,12 @@ import qualified Data.ByteString.UTF8 as BSU
|
|||
import Data.Either (rights)
|
||||
import Data.Foldable (foldr')
|
||||
import Data.Functor.Identity
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import Data.IP (IPv6, fromHostAddress6,
|
||||
toHostAddress6)
|
||||
import Data.List ((\\))
|
||||
import qualified Data.Map.Strict as Map
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import Data.Maybe (catMaybes, fromJust, fromMaybe,
|
||||
isJust, isNothing, mapMaybe)
|
||||
import qualified Data.Set as Set
|
||||
|
@ -98,38 +98,59 @@ import Debug.Trace (trace)
|
|||
fediChordInit :: (Service s (RealNodeSTM s))
|
||||
=> FediChordConf
|
||||
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
|
||||
-> IO (Socket, RealNodeSTM s)
|
||||
-> IO (Async (), RealNodeSTM s)
|
||||
fediChordInit initConf serviceRunner = do
|
||||
emptyLookupCache <- newTVarIO Map.empty
|
||||
let realNode = RealNode {
|
||||
vservers = HMap.empty
|
||||
cacheSTM <- newTVarIO initCache
|
||||
cacheQ <- atomically newTQueue
|
||||
let realNode = RealNode
|
||||
{ vservers = HMap.empty
|
||||
, nodeConfig = initConf
|
||||
, bootstrapNodes = confBootstrapNodes initConf
|
||||
, lookupCacheSTM = emptyLookupCache
|
||||
, nodeService = undefined
|
||||
}
|
||||
, globalNodeCacheSTM = cacheSTM
|
||||
, globalCacheWriteQueue = cacheQ
|
||||
}
|
||||
realNodeSTM <- newTVarIO realNode
|
||||
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
|
||||
-- launch service and set the reference in the RealNode
|
||||
serv <- serviceRunner realNodeSTM
|
||||
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
|
||||
-- prepare for joining: start node cache writer thread
|
||||
-- currently no masking is necessary, as there is nothing to clean up
|
||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
|
||||
-- TODO: k-choices way of joining, so far just initialise a single vserver
|
||||
firstVS <- nodeStateInit realNodeSTM
|
||||
firstVS <- nodeStateInit realNodeSTM 0
|
||||
firstVSSTM <- newTVarIO firstVS
|
||||
-- add vserver to list at RealNode
|
||||
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
|
||||
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
|
||||
pure (serverSock, realNodeSTM)
|
||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||
joinedState <- tryBootstrapJoining firstVSSTM
|
||||
fediThreadsAsync <- either (\err -> do
|
||||
-- handle unsuccessful join
|
||||
|
||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||
-- launch thread attempting to join on new cache entries
|
||||
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
|
||||
async (fediMainThreads serverSock realNodeSTM)
|
||||
)
|
||||
(\joinedNS -> do
|
||||
-- launch main eventloop with successfully joined state
|
||||
putStrLn "successful join"
|
||||
async (fediMainThreads serverSock realNodeSTM)
|
||||
)
|
||||
joinedState
|
||||
pure (fediThreadsAsync, realNodeSTM)
|
||||
|
||||
-- | initialises the 'NodeState' for this local node.
|
||||
-- Separated from 'fediChordInit' to be usable in tests.
|
||||
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s)
|
||||
nodeStateInit realNodeSTM = do
|
||||
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s)
|
||||
nodeStateInit realNodeSTM vsID' = do
|
||||
realNode <- readTVarIO realNodeSTM
|
||||
cacheSTM <- newTVarIO initCache
|
||||
q <- atomically newTQueue
|
||||
let
|
||||
conf = nodeConfig realNode
|
||||
vsID = 0
|
||||
vsID = vsID'
|
||||
containedState = RemoteNodeState {
|
||||
domain = confDomain conf
|
||||
, ipAddr = confIP conf
|
||||
|
@ -140,8 +161,8 @@ nodeStateInit realNodeSTM = do
|
|||
}
|
||||
initialState = LocalNodeState {
|
||||
nodeState = containedState
|
||||
, nodeCacheSTM = cacheSTM
|
||||
, cacheWriteQueue = q
|
||||
, nodeCacheSTM = globalNodeCacheSTM realNode
|
||||
, cacheWriteQueue = globalCacheWriteQueue realNode
|
||||
, successors = []
|
||||
, predecessors = []
|
||||
, kNeighbours = 3
|
||||
|
@ -174,33 +195,36 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
|
|||
|
||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
||||
-- Unjoined try joining instead.
|
||||
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
||||
convergenceSampleThread nsSTM = forever $ do
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
parentNode <- readTVarIO $ parentRealNode nsSnap
|
||||
if isJoined nsSnap
|
||||
then
|
||||
runExceptT (do
|
||||
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
||||
let bss = bootstrapNodes parentNode
|
||||
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
||||
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
||||
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
||||
currentlyResponsible <- liftEither lookupResult
|
||||
if getNid currentlyResponsible /= getNid nsSnap
|
||||
-- if mismatch, stabilise on the result, else do nothing
|
||||
then do
|
||||
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
||||
(preds, succs) <- liftEither stabResult
|
||||
-- TODO: verify neighbours before adding, see #55
|
||||
liftIO . atomically $ do
|
||||
ns <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors preds ns
|
||||
else pure ()
|
||||
) >> pure ()
|
||||
-- unjoined node: try joining through all bootstrapping nodes
|
||||
else tryBootstrapJoining nsSTM >> pure ()
|
||||
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
|
||||
convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
||||
convergenceSampleThread nodeSTM = forever $ do
|
||||
node <- readTVarIO nodeSTM
|
||||
forM_ (vservers node) $ \nsSTM -> do
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
parentNode <- readTVarIO $ parentRealNode nsSnap
|
||||
if isJoined nsSnap
|
||||
then
|
||||
runExceptT (do
|
||||
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility
|
||||
let bss = bootstrapNodes parentNode
|
||||
randIndex <- liftIO $ randomRIO (0, length bss - 1)
|
||||
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
|
||||
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
|
||||
currentlyResponsible <- liftEither lookupResult
|
||||
if getNid currentlyResponsible /= getNid nsSnap
|
||||
-- if mismatch, stabilise on the result, else do nothing
|
||||
then do
|
||||
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
|
||||
(preds, succs) <- liftEither stabResult
|
||||
-- TODO: verify neighbours before adding, see #55
|
||||
liftIO . atomically $ do
|
||||
ns <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors preds ns
|
||||
else pure ()
|
||||
) >> pure ()
|
||||
-- unjoined node: try joining through all bootstrapping nodes
|
||||
else tryBootstrapJoining nsSTM >> pure ()
|
||||
|
||||
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node
|
||||
threadDelay delaySecs
|
||||
|
||||
|
||||
|
@ -336,68 +360,81 @@ joinOnNewEntriesThread nsSTM = loop
|
|||
|
||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||
nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
|
||||
nodeCacheWriter nsSTM =
|
||||
nodeCacheWriter :: RealNodeSTM s -> IO ()
|
||||
nodeCacheWriter nodeSTM = do
|
||||
node <- readTVarIO nodeSTM
|
||||
forever $ atomically $ do
|
||||
ns <- readTVar nsSTM
|
||||
cacheModifier <- readTQueue $ cacheWriteQueue ns
|
||||
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
||||
cacheModifier <- readTQueue $ globalCacheWriteQueue node
|
||||
modifyTVar' (globalNodeCacheSTM node) cacheModifier
|
||||
|
||||
|
||||
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
||||
nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
|
||||
nodeCacheVerifyThread nsSTM = forever $ do
|
||||
-- get cache
|
||||
(ns, cache, maxEntryAge) <- atomically $ do
|
||||
ns <- readTVar nsSTM
|
||||
cache <- readTVar $ nodeCacheSTM ns
|
||||
maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
|
||||
pure (ns, cache, maxEntryAge)
|
||||
nodeCacheVerifyThread :: RealNodeSTM s -> IO ()
|
||||
nodeCacheVerifyThread nodeSTM = forever $ do
|
||||
(node, firstVSSTM) <- atomically $ do
|
||||
node <- readTVar nodeSTM
|
||||
case headMay (HMap.elems $ vservers node) of
|
||||
-- wait until first VS is joined
|
||||
Nothing -> retry
|
||||
Just vs' -> pure (node, vs')
|
||||
let
|
||||
maxEntryAge = confMaxNodeCacheAge $ nodeConfig node
|
||||
cacheQ = globalCacheWriteQueue node
|
||||
cache <- readTVarIO $ globalNodeCacheSTM node
|
||||
-- always use the first active VS as a sender for operations like Ping
|
||||
firstVS <- readTVarIO firstVSSTM
|
||||
-- iterate entries:
|
||||
-- for avoiding too many time syscalls, get current time before iterating.
|
||||
now <- getPOSIXTime
|
||||
forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) ->
|
||||
forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) ->
|
||||
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
|
||||
if (now - ts) > maxEntryAge
|
||||
then
|
||||
queueDeleteEntry (getNid node) ns
|
||||
-- case unverified: try verifying, otherwise delete
|
||||
queueDeleteEntry (getNid cacheNode) cacheQ
|
||||
-- case unverified: try verifying, otherwise delete
|
||||
else if not validated
|
||||
then do
|
||||
-- marking as verified is done by 'requestPing' as well
|
||||
pong <- requestPing ns node
|
||||
pong <- requestPing firstVS cacheNode
|
||||
either (\_->
|
||||
queueDeleteEntry (getNid node) ns
|
||||
queueDeleteEntry (getNid cacheNode) cacheQ
|
||||
)
|
||||
(\vss ->
|
||||
if node `notElem` vss
|
||||
then queueDeleteEntry (getNid node) ns
|
||||
if cacheNode `notElem` vss
|
||||
then queueDeleteEntry (getNid cacheNode) cacheQ
|
||||
-- after verifying a node, check whether it can be a closer neighbour
|
||||
else do
|
||||
if node `isPossiblePredecessor` ns
|
||||
-- do this for each node
|
||||
-- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor
|
||||
else forM_ (vservers node) (\nsSTM -> do
|
||||
ns <- readTVarIO nsSTM
|
||||
if cacheNode `isPossiblePredecessor` ns
|
||||
then atomically $ do
|
||||
ns' <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors [node] ns'
|
||||
writeTVar nsSTM $ addPredecessors [cacheNode] ns'
|
||||
else pure ()
|
||||
if node `isPossibleSuccessor` ns
|
||||
if cacheNode `isPossibleSuccessor` ns
|
||||
then atomically $ do
|
||||
ns' <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addSuccessors [node] ns'
|
||||
writeTVar nsSTM $ addSuccessors [cacheNode] ns'
|
||||
else pure ()
|
||||
)
|
||||
) pong
|
||||
else pure ()
|
||||
)
|
||||
|
||||
-- check the cache invariant per slice and, if necessary, do a single lookup to the
|
||||
-- middle of each slice not verifying the invariant
|
||||
latestNs <- readTVarIO nsSTM
|
||||
latestCache <- readTVarIO $ nodeCacheSTM latestNs
|
||||
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
||||
FOUND node -> [node]
|
||||
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
||||
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
||||
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
||||
)
|
||||
latestNode <- readTVarIO nodeSTM
|
||||
forM_ (vservers latestNode) (\nsSTM -> do
|
||||
latestNs <- readTVarIO nsSTM
|
||||
latestCache <- readTVarIO $ nodeCacheSTM latestNs
|
||||
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
|
||||
FOUND node -> [node]
|
||||
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
|
||||
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
|
||||
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
|
||||
)
|
||||
)
|
||||
|
||||
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
|
||||
|
||||
|
@ -461,90 +498,93 @@ checkCacheSliceInvariants ns
|
|||
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
|
||||
-- one responds, and get their neighbours for maintaining the own neighbour lists.
|
||||
-- If necessary, request new neighbours.
|
||||
stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
||||
stabiliseThread nsSTM = forever $ do
|
||||
oldNs <- readTVarIO nsSTM
|
||||
stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
||||
stabiliseThread nodeSTM = forever $ do
|
||||
node <- readTVarIO nodeSTM
|
||||
forM_ (vservers node) (\nsSTM -> do
|
||||
oldNs <- readTVarIO nsSTM
|
||||
|
||||
|
||||
-- iterate through the same snapshot, collect potential new neighbours
|
||||
-- and nodes to be deleted, and modify these changes only at the end of
|
||||
-- each stabilise run.
|
||||
-- This decision makes iterating through a potentially changing list easier.
|
||||
-- iterate through the same snapshot, collect potential new neighbours
|
||||
-- and nodes to be deleted, and modify these changes only at the end of
|
||||
-- each stabilise run.
|
||||
-- This decision makes iterating through a potentially changing list easier.
|
||||
|
||||
-- don't contact all neighbours unless the previous one failed/ Left ed
|
||||
-- don't contact all neighbours unless the previous one failed/ Left ed
|
||||
|
||||
predStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
||||
succStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
||||
predStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
||||
succStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
|
||||
|
||||
let
|
||||
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
|
||||
(succDeletes, succNeighbours) = either (const ([], [])) id succStabilise
|
||||
allDeletes = predDeletes <> succDeletes
|
||||
allNeighbours = predNeighbours <> succNeighbours
|
||||
|
||||
-- now actually modify the node state's neighbours
|
||||
updatedNs <- atomically $ do
|
||||
newerNsSnap <- readTVar nsSTM
|
||||
let
|
||||
-- sorting and taking only k neighbours is taken care of by the
|
||||
-- setSuccessors/ setPredecessors functions
|
||||
newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||
newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
|
||||
writeTVar nsSTM newNs
|
||||
pure newNs
|
||||
-- delete unresponding nodes from cache as well
|
||||
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
|
||||
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
|
||||
(succDeletes, succNeighbours) = either (const ([], [])) id succStabilise
|
||||
allDeletes = predDeletes <> succDeletes
|
||||
allNeighbours = predNeighbours <> succNeighbours
|
||||
|
||||
-- try looking up additional neighbours if list too short
|
||||
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||
ns' <- readTVarIO nsSTM
|
||||
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
||||
either
|
||||
(const $ pure ())
|
||||
(\entry -> atomically $ do
|
||||
latestNs <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors [entry] latestNs
|
||||
)
|
||||
nextEntry
|
||||
)
|
||||
-- now actually modify the node state's neighbours
|
||||
updatedNs <- atomically $ do
|
||||
newerNsSnap <- readTVar nsSTM
|
||||
let
|
||||
-- sorting and taking only k neighbours is taken care of by the
|
||||
-- setSuccessors/ setPredecessors functions
|
||||
newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||
newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
|
||||
writeTVar nsSTM newNs
|
||||
pure newNs
|
||||
-- delete unresponding nodes from cache as well
|
||||
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
|
||||
|
||||
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||
ns' <- readTVarIO nsSTM
|
||||
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
|
||||
either
|
||||
(const $ pure ())
|
||||
(\entry -> atomically $ do
|
||||
latestNs <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addSuccessors [entry] latestNs
|
||||
)
|
||||
nextEntry
|
||||
)
|
||||
-- try looking up additional neighbours if list too short
|
||||
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||
ns' <- readTVarIO nsSTM
|
||||
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
||||
either
|
||||
(const $ pure ())
|
||||
(\entry -> atomically $ do
|
||||
latestNs <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors [entry] latestNs
|
||||
)
|
||||
nextEntry
|
||||
)
|
||||
|
||||
newNs <- readTVarIO nsSTM
|
||||
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||
ns' <- readTVarIO nsSTM
|
||||
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
|
||||
either
|
||||
(const $ pure ())
|
||||
(\entry -> atomically $ do
|
||||
latestNs <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addSuccessors [entry] latestNs
|
||||
)
|
||||
nextEntry
|
||||
)
|
||||
|
||||
let
|
||||
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs
|
||||
newPredecessor = headMay $ predecessors newNs
|
||||
-- manage need for service data migration:
|
||||
maybe (pure ()) (\newPredecessor' ->
|
||||
when (
|
||||
isJust newPredecessor
|
||||
&& oldPredecessor /= newPredecessor'
|
||||
-- case: predecessor has changed in some way => own responsibility has changed in some way
|
||||
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
|
||||
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
|
||||
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
|
||||
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
|
||||
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
|
||||
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
|
||||
-- TODO: deal with migration failure, e.g retry
|
||||
pure ()
|
||||
)
|
||||
newPredecessor
|
||||
newNs <- readTVarIO nsSTM
|
||||
|
||||
stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
|
||||
threadDelay stabiliseDelay
|
||||
let
|
||||
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs
|
||||
newPredecessor = headMay $ predecessors newNs
|
||||
-- manage need for service data migration:
|
||||
maybe (pure ()) (\newPredecessor' ->
|
||||
when (
|
||||
isJust newPredecessor
|
||||
&& oldPredecessor /= newPredecessor'
|
||||
-- case: predecessor has changed in some way => own responsibility has changed in some way
|
||||
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
|
||||
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
|
||||
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
|
||||
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
|
||||
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
|
||||
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
|
||||
-- TODO: deal with migration failure, e.g retry
|
||||
pure ()
|
||||
)
|
||||
newPredecessor
|
||||
|
||||
)
|
||||
|
||||
threadDelay . confStabiliseInterval . nodeConfig $ node
|
||||
where
|
||||
-- | send a stabilise request to the n-th neighbour
|
||||
-- (specified by the provided getter function) and on failure retry
|
||||
|
@ -605,20 +645,23 @@ sendThread sock sendQ = forever $ do
|
|||
sendAllTo sock packet addr
|
||||
|
||||
-- | Sets up and manages the main server threads of FediChord
|
||||
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
|
||||
fediMainThreads sock nsSTM = do
|
||||
ns <- readTVarIO nsSTM
|
||||
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO ()
|
||||
fediMainThreads sock nodeSTM = do
|
||||
node <- readTVarIO nodeSTM
|
||||
putStrLn "launching threads"
|
||||
sendQ <- newTQueueIO
|
||||
recvQ <- newTQueueIO
|
||||
-- concurrently launch all handler threads, if one of them throws an exception
|
||||
-- all get cancelled
|
||||
concurrently_
|
||||
(fediMessageHandler sendQ recvQ nsSTM) $
|
||||
concurrently_ (stabiliseThread nsSTM) $
|
||||
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
||||
concurrently_ (convergenceSampleThread nsSTM) $
|
||||
concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
|
||||
(fediMessageHandler sendQ recvQ nodeSTM) $
|
||||
-- decision whether to [1] launch 1 thread per VS or [2] let a single
|
||||
-- thread process all VSes sequentially:
|
||||
-- choose option 2 for the sake of limiting concurrency in simulation scenario
|
||||
concurrently_ (stabiliseThread nodeSTM) $
|
||||
concurrently_ (nodeCacheVerifyThread nodeSTM) $
|
||||
concurrently_ (convergenceSampleThread nodeSTM) $
|
||||
concurrently_ (lookupCacheCleanup nodeSTM) $
|
||||
concurrently_
|
||||
(sendThread sock sendQ)
|
||||
(recvThread sock recvQ)
|
||||
|
@ -647,20 +690,17 @@ requestMapPurge purgeAge mapVar = forever $ do
|
|||
fediMessageHandler :: Service s (RealNodeSTM s)
|
||||
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
||||
-> LocalNodeStateSTM s -- ^ acting NodeState
|
||||
-> RealNodeSTM s -- ^ node
|
||||
-> IO ()
|
||||
fediMessageHandler sendQ recvQ nsSTM = do
|
||||
-- Read node state just once, assuming that all relevant data for this function does
|
||||
-- not change.
|
||||
-- Other functions are passed the nsSTM reference and thus can get the latest state.
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
|
||||
fediMessageHandler sendQ recvQ nodeSTM = do
|
||||
nodeConf <- nodeConfig <$> readTVarIO nodeSTM
|
||||
-- handling multipart messages:
|
||||
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
||||
requestMap <- newMVar (Map.empty :: RequestMap)
|
||||
-- run receive loop and requestMapPurge concurrently, so that an exception makes
|
||||
-- both of them fail
|
||||
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
|
||||
node <- readTVarIO nodeSTM
|
||||
-- wait for incoming messages
|
||||
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
||||
let aMsg = deserialiseMessage rawMsg
|
||||
|
@ -670,12 +710,14 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
|||
)
|
||||
(\validMsg ->
|
||||
case validMsg of
|
||||
aRequest@Request{}
|
||||
aRequest@Request{} -> case dispatchVS node aRequest of
|
||||
-- if no match to an active vserver ID, just ignore
|
||||
Nothing -> pure ()
|
||||
-- if not a multipart message, handle immediately. Response is at the same time an ACK
|
||||
| part aRequest == 1 && isFinalPart aRequest ->
|
||||
Just nsSTM | part aRequest == 1 && isFinalPart aRequest ->
|
||||
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
|
||||
-- otherwise collect all message parts first before handling the whole request
|
||||
| otherwise -> do
|
||||
Just nsSTM | otherwise -> do
|
||||
now <- getPOSIXTime
|
||||
-- critical locking section of requestMap
|
||||
rMapState <- takeMVar requestMap
|
||||
|
@ -693,7 +735,7 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
|||
-- put map back into MVar, end of critical section
|
||||
putMVar requestMap newMapState
|
||||
-- ACK the received part
|
||||
forM_ (ackRequest (getNid nsSnap) aRequest) $
|
||||
forM_ (ackRequest aRequest) $
|
||||
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
|
||||
-- if all parts received, then handle request.
|
||||
let
|
||||
|
@ -709,6 +751,8 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
|||
aMsg
|
||||
|
||||
pure ()
|
||||
where
|
||||
dispatchVS node req = HMap.lookup (receiverID req) (vservers node)
|
||||
|
||||
|
||||
-- ==== interface to service layer ====
|
||||
|
|
|
@ -69,10 +69,10 @@ import Control.Exception
|
|||
import Data.Foldable (foldr')
|
||||
import Data.Function (on)
|
||||
import qualified Data.Hashable as Hashable
|
||||
import Data.List (delete, nub, sortBy)
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.HashMap.Strict (HashMap)
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import Data.List (delete, nub, sortBy)
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
||||
isNothing, mapMaybe)
|
||||
import qualified Data.Set as Set
|
||||
|
@ -153,15 +153,19 @@ a `localCompare` b
|
|||
-- Also contains shared data and config values.
|
||||
-- TODO: more data structures for k-choices bookkeeping
|
||||
data RealNode s = RealNode
|
||||
{ vservers :: HashMap NodeID (LocalNodeStateSTM s)
|
||||
{ vservers :: HashMap NodeID (LocalNodeStateSTM s)
|
||||
-- ^ map of all active VServer node IDs to their node state
|
||||
, nodeConfig :: FediChordConf
|
||||
, nodeConfig :: FediChordConf
|
||||
-- ^ holds the initial configuration read at program start
|
||||
, bootstrapNodes :: [(String, PortNumber)]
|
||||
, bootstrapNodes :: [(String, PortNumber)]
|
||||
-- ^ nodes to be used as bootstrapping points, new ones learned during operation
|
||||
, lookupCacheSTM :: TVar LookupCache
|
||||
, lookupCacheSTM :: TVar LookupCache
|
||||
-- ^ a global cache of looked up keys and their associated nodes
|
||||
, nodeService :: s (RealNodeSTM s)
|
||||
, globalNodeCacheSTM :: TVar NodeCache
|
||||
-- ^ EpiChord node cache with expiry times for nodes.
|
||||
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
|
||||
, nodeService :: s (RealNodeSTM s)
|
||||
}
|
||||
|
||||
type RealNodeSTM s = TVar (RealNode s)
|
||||
|
@ -190,9 +194,9 @@ data LocalNodeState s = LocalNodeState
|
|||
{ nodeState :: RemoteNodeState
|
||||
-- ^ represents common data present both in remote and local node representations
|
||||
, nodeCacheSTM :: TVar NodeCache
|
||||
-- ^ EpiChord node cache with expiry times for nodes
|
||||
-- ^ reference to the 'globalNodeCacheSTM'
|
||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||
-- ^ reference to the 'globalCacheWriteQueue
|
||||
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
|
||||
-- ^ successor nodes in ascending order by distance
|
||||
, predecessors :: [RemoteNodeState]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue