manage incoming request parts before handling
This commit is contained in:
parent
2542091379
commit
bcd1c34c7c
|
@ -152,15 +152,16 @@ isJoined_ ns = not . all null $ [successors ns, predecessors ns]
|
||||||
|
|
||||||
-- ====== message send and receive operations ======
|
-- ====== message send and receive operations ======
|
||||||
|
|
||||||
|
|
||||||
handleIncomingRequest :: LocalNodeState -- ^ the handling node
|
handleIncomingRequest :: LocalNodeState -- ^ the handling node
|
||||||
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||||
-> FediChordMessage -- ^ request to handle
|
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
|
||||||
-> SockAddr -- ^ source address of the request
|
-> SockAddr -- ^ source address of the request
|
||||||
-> IO ()
|
-> IO ()
|
||||||
handleIncomingRequest ns sendQ msg sourceAddr = do
|
handleIncomingRequest ns sendQ msgSet sourceAddr = do
|
||||||
-- add nodestate to cache
|
-- add nodestate to cache
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
queueAddEntries (Identity . RemoteCacheEntry (sender msg) $ now) ns
|
queueAddEntries (Identity . RemoteCacheEntry (sender . head . Set.elems $ msgSet) $ now) ns
|
||||||
-- distinguish on whether and how to respond
|
-- distinguish on whether and how to respond
|
||||||
-- create and enqueue ACK
|
-- create and enqueue ACK
|
||||||
-- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on
|
-- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on
|
||||||
|
|
|
@ -47,10 +47,12 @@ module Hash2Pub.FediChord (
|
||||||
, cacheWriter
|
, cacheWriter
|
||||||
) where
|
) where
|
||||||
|
|
||||||
|
import Control.Applicative ((<|>))
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Data.Foldable (foldr')
|
import Data.Foldable (foldr')
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Maybe (fromMaybe, isJust, mapMaybe)
|
import Data.Maybe (fromJust, fromMaybe, isJust,
|
||||||
|
mapMaybe)
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Network.Socket hiding (recv, recvFrom, send,
|
import Network.Socket hiding (recv, recvFrom, send,
|
||||||
|
@ -207,26 +209,66 @@ fediMainThreads sock ns = do
|
||||||
(recvThread sock recvQ)
|
(recvThread sock recvQ)
|
||||||
|
|
||||||
|
|
||||||
|
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
|
||||||
|
-- Once that changes, move to FediChordTypes
|
||||||
|
type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry
|
||||||
|
|
||||||
|
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
|
||||||
|
POSIXTime
|
||||||
|
|
||||||
|
requestMapPurge :: MVar RequestMap -> IO ()
|
||||||
|
-- PLACEHOLDER
|
||||||
|
requestMapPurge mapVar = pure ()
|
||||||
|
|
||||||
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
|
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
|
||||||
-- and pass them to their specific handling function.
|
-- and pass them to their specific handling function.
|
||||||
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||||
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
||||||
-> LocalNodeState -- ^ acting NodeState
|
-> LocalNodeState -- ^ acting NodeState
|
||||||
-> IO ()
|
-> IO ()
|
||||||
fediMessageHandler sendQ recvQ ns = forever $ do
|
fediMessageHandler sendQ recvQ ns = do
|
||||||
|
-- handling multipart messages:
|
||||||
|
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
||||||
|
requestMap <- newMVar (Map.empty :: RequestMap)
|
||||||
|
forever $ do
|
||||||
-- wait for incoming messages
|
-- wait for incoming messages
|
||||||
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
||||||
let aMsg = deserialiseMessage rawMsg
|
let aMsg = deserialiseMessage rawMsg
|
||||||
-- handling multipart messages:
|
|
||||||
-- So far I handle the effects of each message part immedeiately, before making sure that and whether all parts have been received, based on the idea that even incomplete information is beneficial and handled idempotent.
|
|
||||||
-- If this turns out not to be the case, request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
|
||||||
either (\_ ->
|
either (\_ ->
|
||||||
-- drop invalid messages
|
-- drop invalid messages
|
||||||
pure ()
|
pure ()
|
||||||
)
|
)
|
||||||
(\validMsg ->
|
(\validMsg ->
|
||||||
case validMsg of
|
case validMsg of
|
||||||
aRequest@Request{} -> forkIO (handleIncomingRequest ns sendQ aRequest sourceAddr) >> pure ()
|
aRequest@Request{}
|
||||||
|
-- if not a multipart message, handle immediately. Response is at the same time a confirmation
|
||||||
|
| part aRequest == 1 && isFinalPart aRequest ->
|
||||||
|
forkIO (handleIncomingRequest ns sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
|
||||||
|
-- otherwise collect all message parts first before handling the whole request
|
||||||
|
| otherwise -> do
|
||||||
|
now <- getPOSIXTime
|
||||||
|
-- critical locking section of requestMap
|
||||||
|
rMapState <- takeMVar requestMap
|
||||||
|
-- insert new message and get set
|
||||||
|
let
|
||||||
|
theseMaxParts = if isFinalPart aRequest then Just (part aRequest) else Nothing
|
||||||
|
thisKey = (sourceAddr, requestID aRequest)
|
||||||
|
newMapState = Map.insertWith (\
|
||||||
|
(RequestMapEntry thisMsgSet p' ts) (RequestMapEntry oldMsgSet p'' _) ->
|
||||||
|
RequestMapEntry (thisMsgSet `Set.union` oldMsgSet) (p' <|> p'') ts
|
||||||
|
)
|
||||||
|
thisKey
|
||||||
|
(RequestMapEntry (Set.singleton aRequest) theseMaxParts now)
|
||||||
|
rMapState
|
||||||
|
-- put map back into MVar, end of critical section
|
||||||
|
putMVar requestMap newMapState
|
||||||
|
-- if all parts received, then handle request.
|
||||||
|
let
|
||||||
|
(RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState
|
||||||
|
numParts = Set.size theseParts
|
||||||
|
if maybe False (numParts ==) (fromIntegral <$> mayMaxParts)
|
||||||
|
then forkIO (handleIncomingRequest ns sendQ theseParts sourceAddr) >> pure()
|
||||||
|
else pure()
|
||||||
-- Responses should never arrive on the main server port, as they are always
|
-- Responses should never arrive on the main server port, as they are always
|
||||||
-- responses to requests sent from dedicated sockets on another port
|
-- responses to requests sent from dedicated sockets on another port
|
||||||
_ -> pure ()
|
_ -> pure ()
|
||||||
|
|
Loading…
Reference in a new issue