From bcd1c34c7cf100aa5634e6c910ae338b4146d324 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 31 May 2020 01:13:34 +0200 Subject: [PATCH 1/3] manage incoming request parts before handling --- src/Hash2Pub/DHTProtocol.hs | 7 +-- src/Hash2Pub/FediChord.hs | 82 ++++++++++++++++++++++++++--------- src/Hash2Pub/ProtocolTypes.hs | 20 ++++----- 3 files changed, 76 insertions(+), 33 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fb50c98..ed1e5d4 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -152,15 +152,16 @@ isJoined_ ns = not . all null $ [successors ns, predecessors ns] -- ====== message send and receive operations ====== + handleIncomingRequest :: LocalNodeState -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue - -> FediChordMessage -- ^ request to handle + -> Set.Set FediChordMessage -- ^ all parts of the request to handle -> SockAddr -- ^ source address of the request -> IO () -handleIncomingRequest ns sendQ msg sourceAddr = do +handleIncomingRequest ns sendQ msgSet sourceAddr = do -- add nodestate to cache now <- getPOSIXTime - queueAddEntries (Identity . RemoteCacheEntry (sender msg) $ now) ns + queueAddEntries (Identity . RemoteCacheEntry (sender . head . Set.elems $ msgSet) $ now) ns -- distinguish on whether and how to respond -- create and enqueue ACK -- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 8777cb8..2a8e151 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -47,10 +47,12 @@ module Hash2Pub.FediChord ( , cacheWriter ) where +import Control.Applicative ((<|>)) import Control.Exception import Data.Foldable (foldr') import qualified Data.Map.Strict as Map -import Data.Maybe (fromMaybe, isJust, mapMaybe) +import Data.Maybe (fromJust, fromMaybe, isJust, + mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX import Network.Socket hiding (recv, recvFrom, send, @@ -207,30 +209,70 @@ fediMainThreads sock ns = do (recvThread sock recvQ) +-- defining this here as, for now, the RequestMap is only used by fediMessageHandler. +-- Once that changes, move to FediChordTypes +type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry + +data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) + POSIXTime + +requestMapPurge :: MVar RequestMap -> IO () +-- PLACEHOLDER +requestMapPurge mapVar = pure () + -- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- and pass them to their specific handling function. fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> LocalNodeState -- ^ acting NodeState -> IO () -fediMessageHandler sendQ recvQ ns = forever $ do - -- wait for incoming messages - (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ - let aMsg = deserialiseMessage rawMsg +fediMessageHandler sendQ recvQ ns = do -- handling multipart messages: - -- So far I handle the effects of each message part immedeiately, before making sure that and whether all parts have been received, based on the idea that even incomplete information is beneficial and handled idempotent. - -- If this turns out not to be the case, request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. - either (\_ -> - -- drop invalid messages - pure () - ) - (\validMsg -> - case validMsg of - aRequest@Request{} -> forkIO (handleIncomingRequest ns sendQ aRequest sourceAddr) >> pure () - -- Responses should never arrive on the main server port, as they are always - -- responses to requests sent from dedicated sockets on another port - _ -> pure () - ) - aMsg + -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. + requestMap <- newMVar (Map.empty :: RequestMap) + forever $ do + -- wait for incoming messages + (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ + let aMsg = deserialiseMessage rawMsg + either (\_ -> + -- drop invalid messages + pure () + ) + (\validMsg -> + case validMsg of + aRequest@Request{} + -- if not a multipart message, handle immediately. Response is at the same time a confirmation + | part aRequest == 1 && isFinalPart aRequest -> + forkIO (handleIncomingRequest ns sendQ (Set.singleton aRequest) sourceAddr) >> pure () + -- otherwise collect all message parts first before handling the whole request + | otherwise -> do + now <- getPOSIXTime + -- critical locking section of requestMap + rMapState <- takeMVar requestMap + -- insert new message and get set + let + theseMaxParts = if isFinalPart aRequest then Just (part aRequest) else Nothing + thisKey = (sourceAddr, requestID aRequest) + newMapState = Map.insertWith (\ + (RequestMapEntry thisMsgSet p' ts) (RequestMapEntry oldMsgSet p'' _) -> + RequestMapEntry (thisMsgSet `Set.union` oldMsgSet) (p' <|> p'') ts + ) + thisKey + (RequestMapEntry (Set.singleton aRequest) theseMaxParts now) + rMapState + -- put map back into MVar, end of critical section + putMVar requestMap newMapState + -- if all parts received, then handle request. + let + (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState + numParts = Set.size theseParts + if maybe False (numParts ==) (fromIntegral <$> mayMaxParts) + then forkIO (handleIncomingRequest ns sendQ theseParts sourceAddr) >> pure() + else pure() + -- Responses should never arrive on the main server port, as they are always + -- responses to requests sent from dedicated sockets on another port + _ -> pure () + ) + aMsg - pure () + pure () diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index bab3866..5a594ca 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -19,21 +19,21 @@ data Action = QueryID deriving (Show, Eq, Enum) data FediChordMessage = Request - { requestID :: Integer - , sender :: RemoteNodeState - , part :: Integer + { requestID :: Integer + , sender :: RemoteNodeState + , part :: Integer , isFinalPart :: Bool -- ^ part starts at 1 - , action :: Action - , payload :: Maybe ActionPayload + , action :: Action + , payload :: Maybe ActionPayload } | Response - { responseTo :: Integer - , senderID :: NodeID - , part :: Integer + { responseTo :: Integer + , senderID :: NodeID + , part :: Integer , isFinalPart :: Bool - , action :: Action - , payload :: Maybe ActionPayload + , action :: Action + , payload :: Maybe ActionPayload } deriving (Show, Eq) From 88104de9bffca080bd3dcf987cc5015493fad478 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 31 May 2020 21:07:40 +0200 Subject: [PATCH 2/3] periodically purge request parts --- src/Hash2Pub/FediChord.hs | 20 +++++++++++++++++--- src/Hash2Pub/Main.hs | 2 -- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 2a8e151..8a367f2 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -216,9 +216,21 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) POSIXTime +-- TODO: make purge age configurable +-- | periodically clean up old request parts +purgeAge :: POSIXTime +purgeAge = 60 -- seconds + requestMapPurge :: MVar RequestMap -> IO () --- PLACEHOLDER -requestMapPurge mapVar = pure () +requestMapPurge mapVar = forever $ do + rMapState <- takeMVar mapVar + now <- getPOSIXTime + putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> + now - ts < purgeAge + ) rMapState + threadDelay $ fromEnum purgeAge * 2000 + + -- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- and pass them to their specific handling function. @@ -230,7 +242,9 @@ fediMessageHandler sendQ recvQ ns = do -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) - forever $ do + -- run receive loop and requestMapPurge concurrently, so that an exception makes + -- both of them fail + concurrently_ (requestMapPurge requestMap) $ forever $ do -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index a837cc5..554585f 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -42,8 +42,6 @@ main = do wait =<< async (fediMainThreads serverSock thisNode) ) joinedState - -- stop main thread from terminating during development - getChar pure () From 0660bce29949b7237bbdf403a146a1910799c28a Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 31 May 2020 23:21:27 +0200 Subject: [PATCH 3/3] acknowledge parts when receiving partial requests --- src/Hash2Pub/DHTProtocol.hs | 20 +++++++++++++++++++- src/Hash2Pub/FediChord.hs | 7 +++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index ed1e5d4..5ff87b0 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -21,6 +21,7 @@ module Hash2Pub.DHTProtocol , mkSendSocket , mkServerSocket , handleIncomingRequest + , ackRequest ) where @@ -150,8 +151,23 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc isJoined_ :: LocalNodeState -> Bool isJoined_ ns = not . all null $ [successors ns, predecessors ns] +-- | the size limit to be used when serialising messages for sending +sendMessageSize :: Num i => i +sendMessageSize = 1200 + -- ====== message send and receive operations ====== +-- encode the response to a request that just signals successful receipt +ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString +ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response { + responseTo = requestID req + , senderID = ownID + , part = part req + , isFinalPart = False + , action = action req + , payload = Nothing + } + handleIncomingRequest :: LocalNodeState -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue @@ -165,6 +181,8 @@ handleIncomingRequest ns sendQ msgSet sourceAddr = do -- distinguish on whether and how to respond -- create and enqueue ACK -- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on + -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash + -- TODO: test case: mixed message types of parts -- PLACEHOLDER pure () @@ -273,7 +291,7 @@ sendRequestTo :: Int -- ^ timeout in seconds sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- give the message a random request ID randomID <- randomRIO (0, 2^32-1) - let requests = serialiseMessage 1200 $ msgIncomplete randomID + let requests = serialiseMessage sendMessageSize $ msgIncomplete randomID -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 8a367f2..c8b2b2e 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -64,7 +64,7 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue -import Control.Monad (forever) +import Control.Monad (forM_, forever) import Crypto.Hash import qualified Data.ByteArray as BA import qualified Data.ByteString as BS @@ -255,7 +255,7 @@ fediMessageHandler sendQ recvQ ns = do (\validMsg -> case validMsg of aRequest@Request{} - -- if not a multipart message, handle immediately. Response is at the same time a confirmation + -- if not a multipart message, handle immediately. Response is at the same time an ACK | part aRequest == 1 && isFinalPart aRequest -> forkIO (handleIncomingRequest ns sendQ (Set.singleton aRequest) sourceAddr) >> pure () -- otherwise collect all message parts first before handling the whole request @@ -276,6 +276,9 @@ fediMessageHandler sendQ recvQ ns = do rMapState -- put map back into MVar, end of critical section putMVar requestMap newMapState + -- ACK the received part + forM_ (ackRequest (getNid ns) aRequest) $ + \msg -> atomically $ writeTQueue sendQ (msg, sourceAddr) -- if all parts received, then handle request. let (RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState