From 68de73d919b54dc7e4a8248e3c689f1c37be10d5 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 20 Sep 2020 21:19:17 +0200 Subject: [PATCH] re-structure fediChordMessageHandler to dispatch requests to the responsible vserver contributes to #34 --- src/Hash2Pub/DHTProtocol.hs | 8 ++++---- src/Hash2Pub/FediChord.hs | 33 +++++++++++++++++---------------- 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 38c0dcb..eca145a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -241,16 +241,16 @@ sendMessageSize = 1200 -- ====== message send and receive operations ====== -- encode the response to a request that just signals successful receipt -ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString -ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { +ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString +ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { requestID = requestID req - , senderID = ownID + , senderID = receiverID req , part = part req , isFinalPart = False , action = action req , payload = Nothing } -ackRequest _ _ = Map.empty +ackRequest _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 6f9caf6..ee5e9b6 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -138,7 +138,7 @@ fediChordInit initConf serviceRunner = do (\joinedNS -> do -- launch main eventloop with successfully joined state putStrLn "successful join" - async (fediMainThreads serverSock firstVSSTM) + async (fediMainThreads serverSock realNodeSTM) ) joinedState pure (fediThreadsAsync, realNodeSTM) @@ -626,16 +626,16 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () -fediMainThreads sock nsSTM = do - ns <- readTVarIO nsSTM +fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO () +fediMainThreads sock nodeSTM = do + node <- readTVarIO nodeSTM putStrLn "launching threads" sendQ <- newTQueueIO recvQ <- newTQueueIO -- concurrently launch all handler threads, if one of them throws an exception -- all get cancelled concurrently_ - (fediMessageHandler sendQ recvQ nsSTM) $ + (fediMessageHandler sendQ recvQ nodeSTM) $ concurrently_ (stabiliseThread nsSTM) $ concurrently_ (nodeCacheVerifyThread nsSTM) $ concurrently_ (convergenceSampleThread nsSTM) $ @@ -668,20 +668,17 @@ requestMapPurge purgeAge mapVar = forever $ do fediMessageHandler :: Service s (RealNodeSTM s) => TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue - -> LocalNodeStateSTM s -- ^ acting NodeState + -> RealNodeSTM s -- ^ node -> IO () -fediMessageHandler sendQ recvQ nsSTM = do - -- Read node state just once, assuming that all relevant data for this function does - -- not change. - -- Other functions are passed the nsSTM reference and thus can get the latest state. - nsSnap <- readTVarIO nsSTM - nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap) +fediMessageHandler sendQ recvQ nodeSTM = do + nodeConf <- nodeConfig <$> readTVarIO nodeSTM -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) -- run receive loop and requestMapPurge concurrently, so that an exception makes -- both of them fail concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do + node <- readTVarIO nodeSTM -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg @@ -691,12 +688,14 @@ fediMessageHandler sendQ recvQ nsSTM = do ) (\validMsg -> case validMsg of - aRequest@Request{} + aRequest@Request{} -> case dispatchVS node aRequest of + -- if no match to an active vserver ID, just ignore + Nothing -> pure () -- if not a multipart message, handle immediately. Response is at the same time an ACK - | part aRequest == 1 && isFinalPart aRequest -> + Just nsSTM | part aRequest == 1 && isFinalPart aRequest -> forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure () -- otherwise collect all message parts first before handling the whole request - | otherwise -> do + Just nsSTM | otherwise -> do now <- getPOSIXTime -- critical locking section of requestMap rMapState <- takeMVar requestMap @@ -714,7 +713,7 @@ fediMessageHandler sendQ recvQ nsSTM = do -- put map back into MVar, end of critical section putMVar requestMap newMapState -- ACK the received part - forM_ (ackRequest (getNid nsSnap) aRequest) $ + forM_ (ackRequest aRequest) $ \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) -- if all parts received, then handle request. let @@ -730,6 +729,8 @@ fediMessageHandler sendQ recvQ nsSTM = do aMsg pure () + where + dispatchVS node req = HMap.lookup (receiverID req) (vservers node) -- ==== interface to service layer ====