From 0ab6ee9c8fc6a4e2c3e3c27d723830303221d4fd Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 20 Sep 2020 19:30:35 +0200 Subject: [PATCH 1/6] re-strucuture fediChordInit flow to also do the bootstrapping --- app/Main.hs | 25 ++------------ src/Hash2Pub/FediChord.hs | 59 +++++++++++++++++++++++----------- src/Hash2Pub/FediChordTypes.hs | 11 +++++-- 3 files changed, 52 insertions(+), 43 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index eac223d..ed599f8 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -18,29 +18,10 @@ main = do -- ToDo: parse and pass config -- probably use `tomland` for that (fConf, sConf) <- readConfig - -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … - (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) - -- currently no masking is necessary, as there is nothing to clean up - nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode - -- try joining the DHT using one of the provided bootstrapping nodes - joinedState <- tryBootstrapJoining thisNode - either (\err -> do - -- handle unsuccessful join - - putStrLn $ err <> " Error joining, start listening for incoming requests anyways" - print =<< readTVarIO thisNode - -- launch thread attempting to join on new cache entries - _ <- forkIO $ joinOnNewEntriesThread thisNode - wait =<< async (fediMainThreads serverSock thisNode) - ) - (\joinedNS -> do - -- launch main eventloop with successfully joined state - putStrLn "successful join" - wait =<< async (fediMainThreads serverSock thisNode) - ) - joinedState - pure () + (fediThreads, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) + -- wait for all DHT threads to terminate, this keeps the main thread running + wait fediThreads readConfig :: IO (FediChordConf, ServiceConf) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 0dcba44..6f9caf6 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -98,38 +98,59 @@ import Debug.Trace (trace) fediChordInit :: (Service s (RealNodeSTM s)) => FediChordConf -> (RealNodeSTM s -> IO (s (RealNodeSTM s))) -- ^ runner function for service - -> IO (Socket, RealNodeSTM s) + -> IO (Async (), RealNodeSTM s) fediChordInit initConf serviceRunner = do emptyLookupCache <- newTVarIO Map.empty - let realNode = RealNode { - vservers = HMap.empty + cacheSTM <- newTVarIO initCache + cacheQ <- atomically newTQueue + let realNode = RealNode + { vservers = HMap.empty , nodeConfig = initConf , bootstrapNodes = confBootstrapNodes initConf , lookupCacheSTM = emptyLookupCache , nodeService = undefined - } + , globalNodeCacheSTM = cacheSTM + , globalCacheWriteQueue = cacheQ + } realNodeSTM <- newTVarIO realNode + serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf) -- launch service and set the reference in the RealNode serv <- serviceRunner realNodeSTM atomically . modifyTVar' realNodeSTM $ \rn -> rn { nodeService = serv } + -- prepare for joining: start node cache writer thread + -- currently no masking is necessary, as there is nothing to clean up + nodeCacheWriterThread <- forkIO $ nodeCacheWriter realNodeSTM -- TODO: k-choices way of joining, so far just initialise a single vserver - firstVS <- nodeStateInit realNodeSTM + firstVS <- nodeStateInit realNodeSTM 0 firstVSSTM <- newTVarIO firstVS -- add vserver to list at RealNode atomically . modifyTVar' realNodeSTM $ \rn -> rn { vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) } - serverSock <- mkServerSocket (confIP initConf) (fromIntegral $ confDhtPort initConf) - pure (serverSock, realNodeSTM) + -- try joining the DHT using one of the provided bootstrapping nodes + joinedState <- tryBootstrapJoining firstVSSTM + fediThreadsAsync <- either (\err -> do + -- handle unsuccessful join + + putStrLn $ err <> " Error joining, start listening for incoming requests anyways" + -- launch thread attempting to join on new cache entries + _ <- forkIO $ joinOnNewEntriesThread firstVSSTM + async (fediMainThreads serverSock firstVSSTM) + ) + (\joinedNS -> do + -- launch main eventloop with successfully joined state + putStrLn "successful join" + async (fediMainThreads serverSock firstVSSTM) + ) + joinedState + pure (fediThreadsAsync, realNodeSTM) -- | initialises the 'NodeState' for this local node. -- Separated from 'fediChordInit' to be usable in tests. -nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO (LocalNodeState s) -nodeStateInit realNodeSTM = do +nodeStateInit :: Service s (RealNodeSTM s) => RealNodeSTM s -> Integer -> IO (LocalNodeState s) +nodeStateInit realNodeSTM vsID' = do realNode <- readTVarIO realNodeSTM - cacheSTM <- newTVarIO initCache - q <- atomically newTQueue let conf = nodeConfig realNode - vsID = 0 + vsID = vsID' containedState = RemoteNodeState { domain = confDomain conf , ipAddr = confIP conf @@ -140,8 +161,8 @@ nodeStateInit realNodeSTM = do } initialState = LocalNodeState { nodeState = containedState - , nodeCacheSTM = cacheSTM - , cacheWriteQueue = q + , nodeCacheSTM = globalNodeCacheSTM realNode + , cacheWriteQueue = globalCacheWriteQueue realNode , successors = [] , predecessors = [] , kNeighbours = 3 @@ -336,12 +357,12 @@ joinOnNewEntriesThread nsSTM = loop -- | cache updater thread that waits for incoming NodeCache update instructions on -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -nodeCacheWriter :: LocalNodeStateSTM s -> IO () -nodeCacheWriter nsSTM = +nodeCacheWriter :: RealNodeSTM s -> IO () +nodeCacheWriter nodeSTM = do + node <- readTVarIO nodeSTM forever $ atomically $ do - ns <- readTVar nsSTM - cacheModifier <- readTQueue $ cacheWriteQueue ns - modifyTVar' (nodeCacheSTM ns) cacheModifier + cacheModifier <- readTQueue $ globalCacheWriteQueue node + modifyTVar' (globalNodeCacheSTM node) cacheModifier -- | Periodically iterate through cache, clean up expired entries and verify unverified ones diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index fd9d0f9..a1c0937 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -161,6 +161,13 @@ data RealNode s = RealNode -- ^ nodes to be used as bootstrapping points, new ones learned during operation , lookupCacheSTM :: TVar LookupCache -- ^ a global cache of looked up keys and their associated nodes + , globalNodeCacheSTM :: TVar NodeCache + -- ^ EpiChord node cache with expiry times for nodes. + -- Shared between all vservers, each 'LocalNodeState' holds a reference to + -- the same TVar for avoiding unnecessary reads of parent node + , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) + -- ^ cache updates are not written directly to the 'globalNodeCacheSTM' + -- but queued and processed by a single thread , nodeService :: s (RealNodeSTM s) } @@ -190,9 +197,9 @@ data LocalNodeState s = LocalNodeState { nodeState :: RemoteNodeState -- ^ represents common data present both in remote and local node representations , nodeCacheSTM :: TVar NodeCache - -- ^ EpiChord node cache with expiry times for nodes + -- ^ reference to the 'globalNodeCacheSTM' , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'nodeCache' but queued and + -- ^ reference to the 'globalCacheWriteQueue , successors :: [RemoteNodeState] -- could be a set instead as these are ordered as well -- ^ successor nodes in ascending order by distance , predecessors :: [RemoteNodeState] From 68de73d919b54dc7e4a8248e3c689f1c37be10d5 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 20 Sep 2020 21:19:17 +0200 Subject: [PATCH 2/6] re-structure fediChordMessageHandler to dispatch requests to the responsible vserver contributes to #34 --- src/Hash2Pub/DHTProtocol.hs | 8 ++++---- src/Hash2Pub/FediChord.hs | 33 +++++++++++++++++---------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 38c0dcb..eca145a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -241,16 +241,16 @@ sendMessageSize = 1200 -- ====== message send and receive operations ====== -- encode the response to a request that just signals successful receipt -ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString -ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { +ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString +ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { requestID = requestID req - , senderID = ownID + , senderID = receiverID req , part = part req , isFinalPart = False , action = action req , payload = Nothing } -ackRequest _ _ = Map.empty +ackRequest _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 6f9caf6..ee5e9b6 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -138,7 +138,7 @@ fediChordInit initConf serviceRunner = do (\joinedNS -> do -- launch main eventloop with successfully joined state putStrLn "successful join" - async (fediMainThreads serverSock firstVSSTM) + async (fediMainThreads serverSock realNodeSTM) ) joinedState pure (fediThreadsAsync, realNodeSTM) @@ -626,16 +626,16 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () -fediMainThreads sock nsSTM = do - ns <- readTVarIO nsSTM +fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO () +fediMainThreads sock nodeSTM = do + node <- readTVarIO nodeSTM putStrLn "launching threads" sendQ <- newTQueueIO recvQ <- newTQueueIO -- concurrently launch all handler threads, if one of them throws an exception -- all get cancelled concurrently_ - (fediMessageHandler sendQ recvQ nsSTM) $ + (fediMessageHandler sendQ recvQ nodeSTM) $ concurrently_ (stabiliseThread nsSTM) $ concurrently_ (nodeCacheVerifyThread nsSTM) $ concurrently_ (convergenceSampleThread nsSTM) $ @@ -668,20 +668,17 @@ requestMapPurge purgeAge mapVar = forever $ do fediMessageHandler :: Service s (RealNodeSTM s) => TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue - -> LocalNodeStateSTM s -- ^ acting NodeState + -> RealNodeSTM s -- ^ node -> IO () -fediMessageHandler sendQ recvQ nsSTM = do - -- Read node state just once, assuming that all relevant data for this function does - -- not change. - -- Other functions are passed the nsSTM reference and thus can get the latest state. - nsSnap <- readTVarIO nsSTM - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap) +fediMessageHandler sendQ recvQ nodeSTM = do + nodeConf <- nodeConfig <$> readTVarIO nodeSTM -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) -- run receive loop and requestMapPurge concurrently, so that an exception makes -- both of them fail concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do + node <- readTVarIO nodeSTM -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg @@ -691,12 +688,14 @@ fediMessageHandler sendQ recvQ nsSTM = do ) (\validMsg -> case validMsg of - aRequest@Request{} + aRequest@Request{} -> case dispatchVS node aRequest of + -- if no match to an active vserver ID, just ignore + Nothing -> pure () -- if not a multipart message, handle immediately. Response is at the same time an ACK - | part aRequest == 1 && isFinalPart aRequest -> + Just nsSTM | part aRequest == 1 && isFinalPart aRequest -> forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () -- otherwise collect all message parts first before handling the whole request - | otherwise -> do + Just nsSTM | otherwise -> do now <- getPOSIXTime -- critical locking section of requestMap rMapState <- takeMVar requestMap @@ -714,7 +713,7 @@ fediMessageHandler sendQ recvQ nsSTM = do -- put map back into MVar, end of critical section putMVar requestMap newMapState -- ACK the received part - forM_ (ackRequest (getNid nsSnap) aRequest) $ + forM_ (ackRequest aRequest) $ \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) -- if all parts received, then handle request. let @@ -730,6 +729,8 @@ fediMessageHandler sendQ recvQ nsSTM = do aMsg pure () + where + dispatchVS node req = HMap.lookup (receiverID req) (vservers node) -- ==== interface to service layer ==== From 33ae904d17055c37225b165440058d3e816aff42 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 21 Sep 2020 02:11:43 +0200 Subject: [PATCH 3/6] re-structure cacheVerifyThread to work on a RealNode and iterate over all joined vservers contributes to #34 --- src/Hash2Pub/DHTProtocol.hs | 26 ++--- src/Hash2Pub/FediChord.hs | 225 +++++++++++++++++++----------------- 2 files changed, 135 insertions(+), 116 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index eca145a..f462a26 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -269,7 +269,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do Nothing -> pure () Just aPart -> do let (SockAddrInet6 _ _ sourceIP _) = sourceAddr - queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns + queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) (cacheWriteQueue ns) -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue maybe (pure ()) ( mapM_ (\resp -> atomically $ writeTQueue sendQ (resp, sourceAddr)) @@ -542,7 +542,7 @@ requestJoin toJoinOn ownStateSTM = do writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions - mapM_ (\f -> f joinedState) cacheInsertQ + mapM_ (\f -> f (cacheWriteQueue joinedState)) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) else do @@ -624,7 +624,7 @@ sendQueryIdMessages targetID ns lParam targets = do _ -> Set.empty -- forward entries to global cache - queueAddEntries entrySet ns + queueAddEntries entrySet (cacheWriteQueue ns) -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result @@ -670,7 +670,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) (cacheWriteQueue ns)) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -832,24 +832,24 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache queueAddEntries :: Foldable c => c RemoteCacheEntry - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () -queueAddEntries entries ns = do +queueAddEntries entries cacheQ = do now <- getPOSIXTime - forM_ entries $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) $ addCacheEntryPure now entry + forM_ entries $ \entry -> atomically $ writeTQueue cacheQ $ addCacheEntryPure now entry -- | enque a list of node IDs to be deleted from the global NodeCache queueDeleteEntries :: Foldable c => c NodeID - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () -queueDeleteEntries ids ns = forM_ ids $ atomically . writeTQueue (cacheWriteQueue ns) . deleteCacheEntry +queueDeleteEntries ids cacheQ = forM_ ids $ atomically . writeTQueue cacheQ . deleteCacheEntry -- | enque a single node ID to be deleted from the global NodeCache queueDeleteEntry :: NodeID - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete @@ -858,11 +858,11 @@ queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete -- global 'NodeCache'. queueUpdateVerifieds :: Foldable c => c NodeID - -> LocalNodeState s + -> TQueue (NodeCache -> NodeCache) -> IO () -queueUpdateVerifieds nIds ns = do +queueUpdateVerifieds nIds cacheQ = do now <- getPOSIXTime - forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ + forM_ nIds $ \nid' -> atomically $ writeTQueue cacheQ $ markCacheEntryAsVerified (Just now) nid' -- | retry an IO action at most *i* times until it delivers a result diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index ee5e9b6..9565bae 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -366,59 +366,72 @@ nodeCacheWriter nodeSTM = do -- | Periodically iterate through cache, clean up expired entries and verify unverified ones -nodeCacheVerifyThread :: LocalNodeStateSTM s -> IO () -nodeCacheVerifyThread nsSTM = forever $ do - -- get cache - (ns, cache, maxEntryAge) <- atomically $ do - ns <- readTVar nsSTM - cache <- readTVar $ nodeCacheSTM ns - maxEntryAge <- confMaxNodeCacheAge . nodeConfig <$> readTVar (parentRealNode ns) - pure (ns, cache, maxEntryAge) +nodeCacheVerifyThread :: RealNodeSTM s -> IO () +nodeCacheVerifyThread nodeSTM = forever $ do + (node, firstVSSTM) <- atomically $ do + node <- readTVar nodeSTM + case headMay (HMap.elems $ vservers node) of + -- wait until first VS is joined + Nothing -> retry + Just vs' -> pure (node, vs') + let + maxEntryAge = confMaxNodeCacheAge $ nodeConfig node + cacheQ = globalCacheWriteQueue node + cache <- readTVarIO $ globalNodeCacheSTM node + -- always use the first active VS as a sender for operations like Ping + firstVS <- readTVarIO firstVSSTM -- iterate entries: -- for avoiding too many time syscalls, get current time before iterating. now <- getPOSIXTime - forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) -> + forM_ (nodeCacheEntries cache) (\(CacheEntry validated cacheNode ts) -> -- case too old: delete (future work: decide whether pinging and resetting timestamp is better) if (now - ts) > maxEntryAge then - queueDeleteEntry (getNid node) ns - -- case unverified: try verifying, otherwise delete + queueDeleteEntry (getNid cacheNode) cacheQ + -- case unverified: try verifying, otherwise delete else if not validated then do -- marking as verified is done by 'requestPing' as well - pong <- requestPing ns node + pong <- requestPing firstVS cacheNode either (\_-> - queueDeleteEntry (getNid node) ns + queueDeleteEntry (getNid cacheNode) cacheQ ) (\vss -> - if node `notElem` vss - then queueDeleteEntry (getNid node) ns + if cacheNode `notElem` vss + then queueDeleteEntry (getNid cacheNode) firstVS -- after verifying a node, check whether it can be a closer neighbour - else do - if node `isPossiblePredecessor` ns + -- do this for each node + -- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor + else forM_ (vservers node) (\nsSTM -> do + ns <- readTVarIO nsSTM + if cacheNode `isPossiblePredecessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [node] ns' + writeTVar nsSTM $ addPredecessors [cacheNode] ns' else pure () - if node `isPossibleSuccessor` ns + if cacheNode `isPossibleSuccessor` ns then atomically $ do ns' <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [node] ns' + writeTVar nsSTM $ addSuccessors [cacheNode] ns' else pure () + ) ) pong else pure () ) -- check the cache invariant per slice and, if necessary, do a single lookup to the -- middle of each slice not verifying the invariant - latestNs <- readTVarIO nsSTM - latestCache <- readTVarIO $ nodeCacheSTM latestNs - let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of - FOUND node -> [node] - FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet - forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> - forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle - ) + latestNode <- readTVarIO nodeSTM + forM_ (vservers latestNode) (\nsSTM -> do + latestNs <- readTVarIO nsSTM + latestCache <- readTVarIO $ nodeCacheSTM latestNs + let nodesToQuery targetID = case queryLocalCache latestNs latestCache (lNumBestNodes latestNs) targetID of + FOUND node -> [node] + FORWARD nodeSet -> remoteNode <$> Set.elems nodeSet + forM_ (checkCacheSliceInvariants latestNs latestCache) (\targetID -> + forkIO $ sendQueryIdMessages targetID latestNs (Just (1 + jEntriesPerSlice latestNs)) (nodesToQuery targetID) >> pure () -- ask for 1 entry more than j because of querying the middle + ) + ) threadDelay $ fromEnum (maxEntryAge / 20) `div` 10^6 -- convert from pico to milliseconds @@ -482,90 +495,93 @@ checkCacheSliceInvariants ns -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. -stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () -stabiliseThread nsSTM = forever $ do - oldNs <- readTVarIO nsSTM +stabiliseThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () +stabiliseThread nodeSTM = forever $ do + node <- readTVarIO nodeSTM + forM_ (vservers node) (\nsSTM -> do + oldNs <- readTVarIO nsSTM - -- iterate through the same snapshot, collect potential new neighbours - -- and nodes to be deleted, and modify these changes only at the end of - -- each stabilise run. - -- This decision makes iterating through a potentially changing list easier. + -- iterate through the same snapshot, collect potential new neighbours + -- and nodes to be deleted, and modify these changes only at the end of + -- each stabilise run. + -- This decision makes iterating through a potentially changing list easier. - -- don't contact all neighbours unless the previous one failed/ Left ed + -- don't contact all neighbours unless the previous one failed/ Left ed - predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] - succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] - let - (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise - (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise - allDeletes = predDeletes <> succDeletes - allNeighbours = predNeighbours <> succNeighbours - - -- now actually modify the node state's neighbours - updatedNs <- atomically $ do - newerNsSnap <- readTVar nsSTM let - -- sorting and taking only k neighbours is taken care of by the - -- setSuccessors/ setPredecessors functions - newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours - newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours - newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap - writeTVar nsSTM newNs - pure newNs - -- delete unresponding nodes from cache as well - mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes + (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise + (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise + allDeletes = predDeletes <> succDeletes + allNeighbours = predNeighbours <> succNeighbours - -- try looking up additional neighbours if list too short - forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [entry] latestNs - ) - nextEntry - ) + -- now actually modify the node state's neighbours + updatedNs <- atomically $ do + newerNsSnap <- readTVar nsSTM + let + -- sorting and taking only k neighbours is taken care of by the + -- setSuccessors/ setPredecessors functions + newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours + newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours + newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap + writeTVar nsSTM newNs + pure newNs + -- delete unresponding nodes from cache as well + mapM_ (atomically . writeTQueue (cacheWriteQueue updatedNs) . deleteCacheEntry . getNid) allDeletes - forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do - ns' <- readTVarIO nsSTM - nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - either - (const $ pure ()) - (\entry -> atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [entry] latestNs - ) - nextEntry - ) + -- try looking up additional neighbours if list too short + forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry + ) - newNs <- readTVarIO nsSTM + forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do + ns' <- readTVarIO nsSTM + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry + ) - let - oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs - newPredecessor = headMay $ predecessors newNs - -- manage need for service data migration: - maybe (pure ()) (\newPredecessor' -> - when ( - isJust newPredecessor - && oldPredecessor /= newPredecessor' - -- case: predecessor has changed in some way => own responsibility has changed in some way - -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data - -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy - -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it - && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do - ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) - migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') - -- TODO: deal with migration failure, e.g retry - pure () - ) - newPredecessor + newNs <- readTVarIO nsSTM - stabiliseDelay <- confStabiliseInterval . nodeConfig <$> readTVarIO (parentRealNode newNs) - threadDelay stabiliseDelay + let + oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs + newPredecessor = headMay $ predecessors newNs + -- manage need for service data migration: + maybe (pure ()) (\newPredecessor' -> + when ( + isJust newPredecessor + && oldPredecessor /= newPredecessor' + -- case: predecessor has changed in some way => own responsibility has changed in some way + -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data + -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy + -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it + && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) + migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') + -- TODO: deal with migration failure, e.g retry + pure () + ) + newPredecessor + + ) + + threadDelay . confStabiliseInterval . nodeConfig $ node where -- | send a stabilise request to the n-th neighbour -- (specified by the provided getter function) and on failure retry @@ -636,8 +652,11 @@ fediMainThreads sock nodeSTM = do -- all get cancelled concurrently_ (fediMessageHandler sendQ recvQ nodeSTM) $ - concurrently_ (stabiliseThread nsSTM) $ - concurrently_ (nodeCacheVerifyThread nsSTM) $ + -- decision whether to [1] launch 1 thread per VS or [2] let a single + -- thread process all VSes sequentially: + -- choose option 2 for the sake of limiting concurrency in simulation scenario + concurrently_ (stabiliseThread nodeSTM) $ + concurrently_ (nodeCacheVerifyThread nodeSTM) $ concurrently_ (convergenceSampleThread nsSTM) $ concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ concurrently_ From 8e8ea41dc4836a3c450785a5d5a7a2c72dca0a04 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 21 Sep 2020 02:18:28 +0200 Subject: [PATCH 4/6] re-structure convergenceSampleThread to work on a RealNode and iterate over all joined vservers contributes to #34 --- src/Hash2Pub/FediChord.hs | 61 ++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 9565bae..307dc51 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -133,7 +133,7 @@ fediChordInit initConf serviceRunner = do putStrLn $ err <> " Error joining, start listening for incoming requests anyways" -- launch thread attempting to join on new cache entries _ <- forkIO $ joinOnNewEntriesThread firstVSSTM - async (fediMainThreads serverSock firstVSSTM) + async (fediMainThreads serverSock realNodeSTM) ) (\joinedNS -> do -- launch main eventloop with successfully joined state @@ -195,33 +195,36 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () -convergenceSampleThread nsSTM = forever $ do - nsSnap <- readTVarIO nsSTM - parentNode <- readTVarIO $ parentRealNode nsSnap - if isJoined nsSnap - then - runExceptT (do - -- joined node: choose random node, do queryIDLoop, compare result with own responsibility - let bss = bootstrapNodes parentNode - randIndex <- liftIO $ randomRIO (0, length bss - 1) - chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) - currentlyResponsible <- liftEither lookupResult - if getNid currentlyResponsible /= getNid nsSnap - -- if mismatch, stabilise on the result, else do nothing - then do - stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible - (preds, succs) <- liftEither stabResult - -- TODO: verify neighbours before adding, see #55 - liftIO . atomically $ do - ns <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors preds ns - else pure () - ) >> pure () - -- unjoined node: try joining through all bootstrapping nodes - else tryBootstrapJoining nsSTM >> pure () - let delaySecs = confBootstrapSamplingInterval . nodeConfig $ parentNode +convergenceSampleThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO () +convergenceSampleThread nodeSTM = forever $ do + node <- readTVarIO nodeSTM + forM_ (vservers node) $ \nsSTM -> do + nsSnap <- readTVarIO nsSTM + parentNode <- readTVarIO $ parentRealNode nsSnap + if isJoined nsSnap + then + runExceptT (do + -- joined node: choose random node, do queryIDLoop, compare result with own responsibility + let bss = bootstrapNodes parentNode + randIndex <- liftIO $ randomRIO (0, length bss - 1) + chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex + lookupResult <- liftIO $ bootstrapQueryId nsSTM chosenNode (getNid nsSnap) + currentlyResponsible <- liftEither lookupResult + if getNid currentlyResponsible /= getNid nsSnap + -- if mismatch, stabilise on the result, else do nothing + then do + stabResult <- liftIO $ requestStabilise nsSnap currentlyResponsible + (preds, succs) <- liftEither stabResult + -- TODO: verify neighbours before adding, see #55 + liftIO . atomically $ do + ns <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors preds ns + else pure () + ) >> pure () + -- unjoined node: try joining through all bootstrapping nodes + else tryBootstrapJoining nsSTM >> pure () + + let delaySecs = confBootstrapSamplingInterval . nodeConfig $ node threadDelay delaySecs @@ -657,7 +660,7 @@ fediMainThreads sock nodeSTM = do -- choose option 2 for the sake of limiting concurrency in simulation scenario concurrently_ (stabiliseThread nodeSTM) $ concurrently_ (nodeCacheVerifyThread nodeSTM) $ - concurrently_ (convergenceSampleThread nsSTM) $ + concurrently_ (convergenceSampleThread nodeSTM) $ concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ concurrently_ (sendThread sock sendQ) From 1a7afed06223129c28f33165cd2abff856aca2b8 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 21 Sep 2020 02:22:46 +0200 Subject: [PATCH 5/6] finish restructuring fediMainThreads contributes to #34 --- src/Hash2Pub/FediChord.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 307dc51..0624abd 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -401,7 +401,7 @@ nodeCacheVerifyThread nodeSTM = forever $ do ) (\vss -> if cacheNode `notElem` vss - then queueDeleteEntry (getNid cacheNode) firstVS + then queueDeleteEntry (getNid cacheNode) cacheQ -- after verifying a node, check whether it can be a closer neighbour -- do this for each node -- TODO: optimisation: place all LocalNodeStates on the cache ring and check whether any of them is the predecessor/ successor @@ -661,7 +661,7 @@ fediMainThreads sock nodeSTM = do concurrently_ (stabiliseThread nodeSTM) $ concurrently_ (nodeCacheVerifyThread nodeSTM) $ concurrently_ (convergenceSampleThread nodeSTM) $ - concurrently_ (lookupCacheCleanup $ parentRealNode ns) $ + concurrently_ (lookupCacheCleanup nodeSTM) $ concurrently_ (sendThread sock sendQ) (recvThread sock recvQ) From 499c90e63af74499ca895e26e697cdf81b433011 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 21 Sep 2020 02:23:06 +0200 Subject: [PATCH 6/6] stylish run --- src/Hash2Pub/FediChord.hs | 6 +++--- src/Hash2Pub/FediChordTypes.hs | 21 +++++++++------------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 0624abd..9baf160 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -69,12 +69,12 @@ import qualified Data.ByteString.UTF8 as BSU import Data.Either (rights) import Data.Foldable (foldr') import Data.Functor.Identity +import Data.HashMap.Strict (HashMap) +import qualified Data.HashMap.Strict as HMap import Data.IP (IPv6, fromHostAddress6, toHostAddress6) import Data.List ((\\)) import qualified Data.Map.Strict as Map -import qualified Data.HashMap.Strict as HMap -import Data.HashMap.Strict (HashMap) import Data.Maybe (catMaybes, fromJust, fromMaybe, isJust, isNothing, mapMaybe) import qualified Data.Set as Set @@ -375,7 +375,7 @@ nodeCacheVerifyThread nodeSTM = forever $ do node <- readTVar nodeSTM case headMay (HMap.elems $ vservers node) of -- wait until first VS is joined - Nothing -> retry + Nothing -> retry Just vs' -> pure (node, vs') let maxEntryAge = confMaxNodeCacheAge $ nodeConfig node diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index a1c0937..04396d6 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -69,10 +69,10 @@ import Control.Exception import Data.Foldable (foldr') import Data.Function (on) import qualified Data.Hashable as Hashable -import Data.List (delete, nub, sortBy) -import qualified Data.Map.Strict as Map import Data.HashMap.Strict (HashMap) import qualified Data.HashMap.Strict as HMap +import Data.List (delete, nub, sortBy) +import qualified Data.Map.Strict as Map import Data.Maybe (fromJust, fromMaybe, isJust, isNothing, mapMaybe) import qualified Data.Set as Set @@ -153,22 +153,19 @@ a `localCompare` b -- Also contains shared data and config values. -- TODO: more data structures for k-choices bookkeeping data RealNode s = RealNode - { vservers :: HashMap NodeID (LocalNodeStateSTM s) + { vservers :: HashMap NodeID (LocalNodeStateSTM s) -- ^ map of all active VServer node IDs to their node state - , nodeConfig :: FediChordConf + , nodeConfig :: FediChordConf -- ^ holds the initial configuration read at program start - , bootstrapNodes :: [(String, PortNumber)] + , bootstrapNodes :: [(String, PortNumber)] -- ^ nodes to be used as bootstrapping points, new ones learned during operation - , lookupCacheSTM :: TVar LookupCache + , lookupCacheSTM :: TVar LookupCache -- ^ a global cache of looked up keys and their associated nodes - , globalNodeCacheSTM :: TVar NodeCache + , globalNodeCacheSTM :: TVar NodeCache -- ^ EpiChord node cache with expiry times for nodes. - -- Shared between all vservers, each 'LocalNodeState' holds a reference to - -- the same TVar for avoiding unnecessary reads of parent node - , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) + , globalCacheWriteQueue :: TQueue (NodeCache -> NodeCache) -- ^ cache updates are not written directly to the 'globalNodeCacheSTM' - -- but queued and processed by a single thread - , nodeService :: s (RealNodeSTM s) + , nodeService :: s (RealNodeSTM s) } type RealNodeSTM s = TVar (RealNode s)