Compare commits

..

No commits in common. "499c90e63af74499ca895e26e697cdf81b433011" and "12dfc56a7321ffeb204e3978b6010992ee4df8dc" have entirely different histories.

4 changed files with 216 additions and 245 deletions

View file

@ -18,10 +18,29 @@ main = do
-- ToDo: parse and pass config -- ToDo: parse and pass config
-- probably use `tomland` for that -- probably use `tomland` for that
(fConf, sConf) <- readConfig (fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes … -- ToDo: load persisted caches, bootstrapping nodes …
(fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- wait for all DHT threads to terminate, this keeps the main thread running -- currently no masking is necessary, as there is nothing to clean up
wait fediThreads nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode
either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
print =<< readTVarIO thisNode
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread thisNode
wait =<< async (fediMainThreads serverSock thisNode)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
wait =<< async (fediMainThreads serverSock thisNode)
)
joinedState
pure ()
readConfig :: IO (FediChordConf, ServiceConf) readConfig :: IO (FediChordConf, ServiceConf)

View file

@ -241,16 +241,16 @@ sendMessageSize = 1200
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt -- encode the response to a request that just signals successful receipt
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
requestID = requestID req requestID = requestID req
, senderID = receiverID req , senderID = ownID
, part = part req , part = part req
, isFinalPart = False , isFinalPart = False
, action = action req , action = action req
, payload = Nothing , payload = Nothing
} }
ackRequest _ = Map.empty ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
Nothing -> pure () Nothing -> pure ()
Just aPart -> do Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) ( maybe (pure ()) (
mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr))
@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do else do
@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do
_ -> Set.empty _ -> Set.empty
-- forward entries to global cache -- forward entries to global cache
queueAddEntries entrySet (cacheWriteQueue ns) queueAddEntries entrySet ns
-- return accumulated QueryResult -- return accumulated QueryResult
pure $ case acc of pure $ case acc of
-- once a FOUND as been encountered, return this as a result -- once a FOUND as been encountered, return this as a result
@ -670,7 +670,7 @@ requestStabilise ns neighbour = do
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache -- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)
@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: Foldable c => c RemoteCacheEntry queueAddEntries :: Foldable c => c RemoteCacheEntry
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueAddEntries entries cacheQ = do queueAddEntries entries ns = do
now <- getPOSIXTime now <- getPOSIXTime
forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry
-- | enque a list of node IDs to be deleted from the global NodeCache -- | enque a list of node IDs to be deleted from the global NodeCache
queueDeleteEntries :: Foldable c queueDeleteEntries :: Foldable c
=> c NodeID => c NodeID
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry
-- | enque a single node ID to be deleted from the global NodeCache -- | enque a single node ID to be deleted from the global NodeCache
queueDeleteEntry :: NodeID queueDeleteEntry :: NodeID
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- global 'NodeCache'. -- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c queueUpdateVerifieds :: Foldable c
=> c NodeID => c NodeID
-> TQueue (NodeCache -> NodeCache) -> LocalNodeState s
-> IO () -> IO ()
queueUpdateVerifieds nIds cacheQ = do queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid' markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result

View file

@ -69,12 +69,12 @@ import qualified Data.ByteString.UTF8 as BSU
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldr') import Data.Foldable (foldr')
import Data.Functor.Identity import Data.Functor.Identity
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.IP (IPv6, fromHostAddress6, import Data.IP (IPv6, fromHostAddress6,
toHostAddress6) toHostAddress6)
import Data.List ((\\)) import Data.List ((\\))
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import qualified Data.HashMap.Strict as HMap
import Data.HashMap.Strict (HashMap)
import Data.Maybe (catMaybes, fromJust, fromMaybe, import Data.Maybe (catMaybes, fromJust, fromMaybe,
isJust, isNothing, mapMaybe) isJust, isNothing, mapMaybe)
import qualified Data.Set as Set import qualified Data.Set as Set
@ -98,59 +98,38 @@ import Debug.Trace (trace)
fediChordInit :: (Service s (RealNodeSTM s)) fediChordInit :: (Service s (RealNodeSTM s))
=> FediChordConf => FediChordConf
-> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service
-> IO (Async (), RealNodeSTM s) -> IO (Socket, RealNodeSTM s)
fediChordInit initConf serviceRunner = do fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty emptyLookupCache <- newTVarIO Map.empty
cacheSTM <- newTVarIO initCache let realNode = RealNode {
cacheQ <- atomically newTQueue vservers = HMap.empty
let realNode = RealNode
{ vservers = HMap.empty
, nodeConfig = initConf , nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf , bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache , lookupCacheSTM = emptyLookupCache
, nodeService = undefined , nodeService = undefined
, globalNodeCacheSTM = cacheSTM }
, globalCacheWriteQueue = cacheQ
}
realNodeSTM <- newTVarIO realNode realNodeSTM <- newTVarIO realNode
serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
-- launch service and set the reference in the RealNode -- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM serv <- serviceRunner realNodeSTM
atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv } atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv }
-- prepare for joining: start node cache writer thread
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM
-- TODO: k-choices way of joining, so far just initialise a single vserver -- TODO: k-choices way of joining, so far just initialise a single vserver
firstVS <- nodeStateInit realNodeSTM 0 firstVS <- nodeStateInit realNodeSTM
firstVSSTM <- newTVarIO firstVS firstVSSTM <- newTVarIO firstVS
-- add vserver to list at RealNode -- add vserver to list at RealNode
atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) } atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
-- try joining the DHT using one of the provided bootstrapping nodes serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf)
joinedState <- tryBootstrapJoining firstVSSTM pure (serverSock, realNodeSTM)
fediThreadsAsync <- either (\err -> do
-- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
-- launch thread attempting to join on new cache entries
_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
async (fediMainThreads serverSock realNodeSTM)
)
(\joinedNS -> do
-- launch main eventloop with successfully joined state
putStrLn "successful join"
async (fediMainThreads serverSock realNodeSTM)
)
joinedState
pure (fediThreadsAsync, realNodeSTM)
-- | initialises the 'NodeState' for this local node. -- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests. -- Separated from 'fediChordInit' to be usable in tests.
nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s) nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s)
nodeStateInit realNodeSTM vsID' = do nodeStateInit realNodeSTM = do
realNode <- readTVarIO realNodeSTM realNode <- readTVarIO realNodeSTM
cacheSTM <- newTVarIO initCache
q <- atomically newTQueue
let let
conf = nodeConfig realNode conf = nodeConfig realNode
vsID = vsID' vsID = 0
containedState = RemoteNodeState { containedState = RemoteNodeState {
domain = confDomain conf domain = confDomain conf
, ipAddr = confIP conf , ipAddr = confIP conf
@ -161,8 +140,8 @@ nodeStateInit realNodeSTM vsID' = do
} }
initialState = LocalNodeState { initialState = LocalNodeState {
nodeState = containedState nodeState = containedState
, nodeCacheSTM = globalNodeCacheSTM realNode , nodeCacheSTM = cacheSTM
, cacheWriteQueue = globalCacheWriteQueue realNode , cacheWriteQueue = q
, successors = [] , successors = []
, predecessors = [] , predecessors = []
, kNeighbours = 3 , kNeighbours = 3
@ -195,36 +174,33 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead. -- Unjoined try joining instead.
convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
convergenceSampleThread nodeSTM = forever $ do convergenceSampleThread nsSTM = forever $ do
node <- readTVarIO nodeSTM nsSnap <- readTVarIO nsSTM
forM_ (vservers node) $ \nsSTM -> do parentNode <- readTVarIO $ parentRealNode nsSnap
nsSnap <- readTVarIO nsSTM if isJoined nsSnap
parentNode <- readTVarIO $ parentRealNode nsSnap then
if isJoined nsSnap runExceptT (do
then -- joined node: choose random node, do queryIDLoop, compare result with own responsibility
runExceptT (do let bss = bootstrapNodes parentNode
-- joined node: choose random node, do queryIDLoop, compare result with own responsibility randIndex <- liftIO $ randomRIO (0, length bss - 1)
let bss = bootstrapNodes parentNode chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex
randIndex <- liftIO $ randomRIO (0, length bss - 1) lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap)
chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex currentlyResponsible <- liftEither lookupResult
lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) if getNid currentlyResponsible /= getNid nsSnap
currentlyResponsible <- liftEither lookupResult -- if mismatch, stabilise on the result, else do nothing
if getNid currentlyResponsible /= getNid nsSnap then do
-- if mismatch, stabilise on the result, else do nothing stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible
then do (preds, succs) <- liftEither stabResult
stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible -- TODO: verify neighbours before adding, see #55
(preds, succs) <- liftEither stabResult liftIO . atomically $ do
-- TODO: verify neighbours before adding, see #55 ns <- readTVar nsSTM
liftIO . atomically $ do writeTVar nsSTM $ addPredecessors preds ns
ns <- readTVar nsSTM else pure ()
writeTVar nsSTM $ addPredecessors preds ns ) >> pure ()
else pure () -- unjoined node: try joining through all bootstrapping nodes
) >> pure () else tryBootstrapJoining nsSTM >> pure ()
-- unjoined node: try joining through all bootstrapping nodes let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode
else tryBootstrapJoining nsSTM >> pure ()
let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node
threadDelay delaySecs threadDelay delaySecs
@ -360,81 +336,68 @@ joinOnNewEntriesThread nsSTM = loop
-- | cache updater thread that waits for incoming NodeCache update instructions on -- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: RealNodeSTM s -> IO () nodeCacheWriter :: LocalNodeStateSTM s -> IO ()
nodeCacheWriter nodeSTM = do nodeCacheWriter nsSTM =
node <- readTVarIO nodeSTM
forever $ atomically $ do forever $ atomically $ do
cacheModifier <- readTQueue $ globalCacheWriteQueue node ns <- readTVar nsSTM
modifyTVar' (globalNodeCacheSTM node) cacheModifier cacheModifier <- readTQueue $ cacheWriteQueue ns
modifyTVar' (nodeCacheSTM ns) cacheModifier
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones -- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: RealNodeSTM s -> IO () nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO ()
nodeCacheVerifyThread nodeSTM = forever $ do nodeCacheVerifyThread nsSTM = forever $ do
(node, firstVSSTM) <- atomically $ do -- get cache
node <- readTVar nodeSTM (ns, cache, maxEntryAge) <- atomically $ do
case headMay (HMap.elems $ vservers node) of ns <- readTVar nsSTM
-- wait until first VS is joined cache <- readTVar $ nodeCacheSTM ns
Nothing -> retry maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns)
Just vs' -> pure (node, vs') pure (ns, cache, maxEntryAge)
let
maxEntryAge = confMaxNodeCacheAge $ nodeConfig node
cacheQ = globalCacheWriteQueue node
cache <- readTVarIO $ globalNodeCacheSTM node
-- always use the first active VS as a sender for operations like Ping
firstVS <- readTVarIO firstVSSTM
-- iterate entries: -- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating. -- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime now <- getPOSIXTime
forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) -> forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) ->
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better) -- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
if (now - ts) > maxEntryAge if (now - ts) > maxEntryAge
then then
queueDeleteEntry (getNid cacheNode) cacheQ queueDeleteEntry (getNid node) ns
-- case unverified: try verifying, otherwise delete -- case unverified: try verifying, otherwise delete
else if not validated else if not validated
then do then do
-- marking as verified is done by 'requestPing' as well -- marking as verified is done by 'requestPing' as well
pong <- requestPing firstVS cacheNode pong <- requestPing ns node
either (\_-> either (\_->
queueDeleteEntry (getNid cacheNode) cacheQ queueDeleteEntry (getNid node) ns
) )
(\vss -> (\vss ->
if cacheNode `notElem` vss if node `notElem` vss
then queueDeleteEntry (getNid cacheNode) cacheQ then queueDeleteEntry (getNid node) ns
-- after verifying a node, check whether it can be a closer neighbour -- after verifying a node, check whether it can be a closer neighbour
-- do this for each node else do
-- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor if node `isPossiblePredecessor` ns
else forM_ (vservers node) (\nsSTM -> do
ns <- readTVarIO nsSTM
if cacheNode `isPossiblePredecessor` ns
then atomically $ do then atomically $ do
ns' <- readTVar nsSTM ns' <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [cacheNode] ns' writeTVar nsSTM $ addPredecessors [node] ns'
else pure () else pure ()
if cacheNode `isPossibleSuccessor` ns if node `isPossibleSuccessor` ns
then atomically $ do then atomically $ do
ns' <- readTVar nsSTM ns' <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [cacheNode] ns' writeTVar nsSTM $ addSuccessors [node] ns'
else pure () else pure ()
)
) pong ) pong
else pure () else pure ()
) )
-- check the cache invariant per slice and, if necessary, do a single lookup to the -- check the cache invariant per slice and, if necessary, do a single lookup to the
-- middle of each slice not verifying the invariant -- middle of each slice not verifying the invariant
latestNode <- readTVarIO nodeSTM latestNs <- readTVarIO nsSTM
forM_ (vservers latestNode) (\nsSTM -> do latestCache <- readTVarIO $ nodeCacheSTM latestNs
latestNs <- readTVarIO nsSTM let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of
latestCache <- readTVarIO $ nodeCacheSTM latestNs FOUND node -> [node]
let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet
FOUND node -> [node] forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID ->
FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> )
forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle
)
)
threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds
@ -498,93 +461,90 @@ checkCacheSliceInvariants ns
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists. -- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours. -- If necessary, request new neighbours.
stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
stabiliseThread nodeSTM = forever $ do stabiliseThread nsSTM = forever $ do
node <- readTVarIO nodeSTM oldNs <- readTVarIO nsSTM
forM_ (vservers node) (\nsSTM -> do
oldNs <- readTVarIO nsSTM
-- iterate through the same snapshot, collect potential new neighbours -- iterate through the same snapshot, collect potential new neighbours
-- and nodes to be deleted, and modify these changes only at the end of -- and nodes to be deleted, and modify these changes only at the end of
-- each stabilise run. -- each stabilise run.
-- This decision makes iterating through a potentially changing list easier. -- This decision makes iterating through a potentially changing list easier.
-- don't contact all neighbours unless the previous one failed/ Left ed -- don't contact all neighbours unless the previous one failed/ Left ed
predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] predStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] succStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
let
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
(succDeletes, succNeighbours) = either (const ([], [])) id succStabilise
allDeletes = predDeletes <> succDeletes
allNeighbours = predNeighbours <> succNeighbours
-- now actually modify the node state's neighbours
updatedNs <- atomically $ do
newerNsSnap <- readTVar nsSTM
let let
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise -- sorting and taking only k neighbours is taken care of by the
(succDeletes, succNeighbours) = either (const ([], [])) id succStabilise -- setSuccessors/ setPredecessors functions
allDeletes = predDeletes <> succDeletes newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours
allNeighbours = predNeighbours <> succNeighbours newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
writeTVar nsSTM newNs
pure newNs
-- delete unresponding nodes from cache as well
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
-- now actually modify the node state's neighbours -- try looking up additional neighbours if list too short
updatedNs <- atomically $ do forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
newerNsSnap <- readTVar nsSTM ns' <- readTVarIO nsSTM
let nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
-- sorting and taking only k neighbours is taken care of by the either
-- setSuccessors/ setPredecessors functions (const $ pure ())
newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours (\entry -> atomically $ do
newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours latestNs <- readTVar nsSTM
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap writeTVar nsSTM $ addPredecessors [entry] latestNs
writeTVar nsSTM newNs )
pure newNs nextEntry
-- delete unresponding nodes from cache as well )
mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes
-- try looking up additional neighbours if list too short forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM
ns' <- readTVarIO nsSTM nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') either
either (const $ pure ())
(const $ pure ()) (\entry -> atomically $ do
(\entry -> atomically $ do latestNs <- readTVar nsSTM
latestNs <- readTVar nsSTM writeTVar nsSTM $ addSuccessors [entry] latestNs
writeTVar nsSTM $ addPredecessors [entry] latestNs )
) nextEntry
nextEntry )
)
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do newNs <- readTVarIO nsSTM
ns' <- readTVarIO nsSTM
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
either
(const $ pure ())
(\entry -> atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [entry] latestNs
)
nextEntry
)
newNs <- readTVarIO nsSTM let
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs
newPredecessor = headMay $ predecessors newNs
-- manage need for service data migration:
maybe (pure ()) (\newPredecessor' ->
when (
isJust newPredecessor
&& oldPredecessor /= newPredecessor'
-- case: predecessor has changed in some way => own responsibility has changed in some way
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
-- TODO: deal with migration failure, e.g retry
pure ()
)
newPredecessor
let stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs)
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs threadDelay stabiliseDelay
newPredecessor = headMay $ predecessors newNs
-- manage need for service data migration:
maybe (pure ()) (\newPredecessor' ->
when (
isJust newPredecessor
&& oldPredecessor /= newPredecessor'
-- case: predecessor has changed in some way => own responsibility has changed in some way
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
-- TODO: deal with migration failure, e.g retry
pure ()
)
newPredecessor
)
threadDelay . confStabiliseInterval . nodeConfig $ node
where where
-- | send a stabilise request to the n-th neighbour -- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retry -- (specified by the provided getter function) and on failure retry
@ -645,23 +605,20 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord -- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO () fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nodeSTM = do fediMainThreads sock nsSTM = do
node <- readTVarIO nodeSTM ns <- readTVarIO nsSTM
putStrLn "launching threads" putStrLn "launching threads"
sendQ <- newTQueueIO sendQ <- newTQueueIO
recvQ <- newTQueueIO recvQ <- newTQueueIO
-- concurrently launch all handler threads, if one of them throws an exception -- concurrently launch all handler threads, if one of them throws an exception
-- all get cancelled -- all get cancelled
concurrently_ concurrently_
(fediMessageHandler sendQ recvQ nodeSTM) $ (fediMessageHandler sendQ recvQ nsSTM) $
-- decision whether to [1] launch 1 thread per VS or [2] let a single concurrently_ (stabiliseThread nsSTM) $
-- thread process all VSes sequentially: concurrently_ (nodeCacheVerifyThread nsSTM) $
-- choose option 2 for the sake of limiting concurrency in simulation scenario concurrently_ (convergenceSampleThread nsSTM) $
concurrently_ (stabiliseThread nodeSTM) $ concurrently_ (lookupCacheCleanup $ parentRealNode ns) $
concurrently_ (nodeCacheVerifyThread nodeSTM) $
concurrently_ (convergenceSampleThread nodeSTM) $
concurrently_ (lookupCacheCleanup nodeSTM) $
concurrently_ concurrently_
(sendThread sock sendQ) (sendThread sock sendQ)
(recvThread sock recvQ) (recvThread sock recvQ)
@ -690,17 +647,20 @@ requestMapPurge purgeAge mapVar = forever $ do
fediMessageHandler :: Service s (RealNodeSTM s) fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue => TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> RealNodeSTM s -- ^ node -> LocalNodeStateSTM s -- ^ acting NodeState
-> IO () -> IO ()
fediMessageHandler sendQ recvQ nodeSTM = do fediMessageHandler sendQ recvQ nsSTM = do
nodeConf <- nodeConfig <$> readTVarIO nodeSTM -- Read node state just once, assuming that all relevant data for this function does
-- not change.
-- Other functions are passed the nsSTM reference and thus can get the latest state.
nsSnap <- readTVarIO nsSTM
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
-- handling multipart messages: -- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap) requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes -- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail -- both of them fail
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
node <- readTVarIO nodeSTM
-- wait for incoming messages -- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg let aMsg = deserialiseMessage rawMsg
@ -710,14 +670,12 @@ fediMessageHandler sendQ recvQ nodeSTM = do
) )
(\validMsg -> (\validMsg ->
case validMsg of case validMsg of
aRequest@Request{} -> case dispatchVS node aRequest of aRequest@Request{}
-- if no match to an active vserver ID, just ignore
Nothing -> pure ()
-- if not a multipart message, handle immediately. Response is at the same time an ACK -- if not a multipart message, handle immediately. Response is at the same time an ACK
Just nsSTM | part aRequest == 1 && isFinalPart aRequest -> | part aRequest == 1 && isFinalPart aRequest ->
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
-- otherwise collect all message parts first before handling the whole request -- otherwise collect all message parts first before handling the whole request
Just nsSTM | otherwise -> do | otherwise -> do
now <- getPOSIXTime now <- getPOSIXTime
-- critical locking section of requestMap -- critical locking section of requestMap
rMapState <- takeMVar requestMap rMapState <- takeMVar requestMap
@ -735,7 +693,7 @@ fediMessageHandler sendQ recvQ nodeSTM = do
-- put map back into MVar, end of critical section -- put map back into MVar, end of critical section
putMVar requestMap newMapState putMVar requestMap newMapState
-- ACK the received part -- ACK the received part
forM_ (ackRequest aRequest) $ forM_ (ackRequest (getNid nsSnap) aRequest) $
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
-- if all parts received, then handle request. -- if all parts received, then handle request.
let let
@ -751,8 +709,6 @@ fediMessageHandler sendQ recvQ nodeSTM = do
aMsg aMsg
pure () pure ()
where
dispatchVS node req = HMap.lookup (receiverID req) (vservers node)
-- ==== interface to service layer ==== -- ==== interface to service layer ====

View file

@ -69,10 +69,10 @@ import Control.Exception
import Data.Foldable (foldr') import Data.Foldable (foldr')
import Data.Function (on) import Data.Function (on)
import qualified Data.Hashable as Hashable import qualified Data.Hashable as Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.List (delete, nub, sortBy) import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HMap
import Data.Maybe (fromJust, fromMaybe, isJust, import Data.Maybe (fromJust, fromMaybe, isJust,
isNothing, mapMaybe) isNothing, mapMaybe)
import qualified Data.Set as Set import qualified Data.Set as Set
@ -153,19 +153,15 @@ a `localCompare` b
-- Also contains shared data and config values. -- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping -- TODO: more data structures for k-choices bookkeeping
data RealNode s = RealNode data RealNode s = RealNode
{ vservers :: HashMap NodeID (LocalNodeStateSTM s) { vservers :: HashMap NodeID (LocalNodeStateSTM s)
-- ^ map of all active VServer node IDs to their node state -- ^ map of all active VServer node IDs to their node state
, nodeConfig :: FediChordConf , nodeConfig :: FediChordConf
-- ^ holds the initial configuration read at program start -- ^ holds the initial configuration read at program start
, bootstrapNodes :: [(String, PortNumber)] , bootstrapNodes :: [(String, PortNumber)]
-- ^ nodes to be used as bootstrapping points, new ones learned during operation -- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache , lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes -- ^ a global cache of looked up keys and their associated nodes
, globalNodeCacheSTM :: TVar NodeCache , nodeService :: s (RealNodeSTM s)
-- ^ EpiChord node cache with expiry times for nodes.
, globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'globalNodeCacheSTM'
, nodeService :: s (RealNodeSTM s)
} }
type RealNodeSTM s = TVar (RealNode s) type RealNodeSTM s = TVar (RealNode s)
@ -194,9 +190,9 @@ data LocalNodeState s = LocalNodeState
{ nodeState :: RemoteNodeState { nodeState :: RemoteNodeState
-- ^ represents common data present both in remote and local node representations -- ^ represents common data present both in remote and local node representations
, nodeCacheSTM :: TVar NodeCache , nodeCacheSTM :: TVar NodeCache
-- ^ reference to the 'globalNodeCacheSTM' -- ^ EpiChord node cache with expiry times for nodes
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache) , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ reference to the 'globalCacheWriteQueue -- ^ cache updates are not written directly to the 'nodeCache' but queued and
, successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance -- ^ successor nodes in ascending order by distance
, predecessors :: [RemoteNodeState] , predecessors :: [RemoteNodeState]