re-structure fediChordMessageHandler to dispatch requests to the responsible vserver
contributes to #34
This commit is contained in:
parent
0ab6ee9c8f
commit
68de73d919
|
@ -241,16 +241,16 @@ sendMessageSize = 1200
|
||||||
-- ====== message send and receive operations ======
|
-- ====== message send and receive operations ======
|
||||||
|
|
||||||
-- encode the response to a request that just signals successful receipt
|
-- encode the response to a request that just signals successful receipt
|
||||||
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
|
ackRequest :: FediChordMessage -> Map.Map Integer BS.ByteString
|
||||||
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
|
ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||||
requestID = requestID req
|
requestID = requestID req
|
||||||
, senderID = ownID
|
, senderID = receiverID req
|
||||||
, part = part req
|
, part = part req
|
||||||
, isFinalPart = False
|
, isFinalPart = False
|
||||||
, action = action req
|
, action = action req
|
||||||
, payload = Nothing
|
, payload = Nothing
|
||||||
}
|
}
|
||||||
ackRequest _ _ = Map.empty
|
ackRequest _ = Map.empty
|
||||||
|
|
||||||
|
|
||||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||||
|
|
|
@ -138,7 +138,7 @@ fediChordInit initConf serviceRunner = do
|
||||||
(\joinedNS -> do
|
(\joinedNS -> do
|
||||||
-- launch main eventloop with successfully joined state
|
-- launch main eventloop with successfully joined state
|
||||||
putStrLn "successful join"
|
putStrLn "successful join"
|
||||||
async (fediMainThreads serverSock firstVSSTM)
|
async (fediMainThreads serverSock realNodeSTM)
|
||||||
)
|
)
|
||||||
joinedState
|
joinedState
|
||||||
pure (fediThreadsAsync, realNodeSTM)
|
pure (fediThreadsAsync, realNodeSTM)
|
||||||
|
@ -626,16 +626,16 @@ sendThread sock sendQ = forever $ do
|
||||||
sendAllTo sock packet addr
|
sendAllTo sock packet addr
|
||||||
|
|
||||||
-- | Sets up and manages the main server threads of FediChord
|
-- | Sets up and manages the main server threads of FediChord
|
||||||
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
|
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> RealNodeSTM s -> IO ()
|
||||||
fediMainThreads sock nsSTM = do
|
fediMainThreads sock nodeSTM = do
|
||||||
ns <- readTVarIO nsSTM
|
node <- readTVarIO nodeSTM
|
||||||
putStrLn "launching threads"
|
putStrLn "launching threads"
|
||||||
sendQ <- newTQueueIO
|
sendQ <- newTQueueIO
|
||||||
recvQ <- newTQueueIO
|
recvQ <- newTQueueIO
|
||||||
-- concurrently launch all handler threads, if one of them throws an exception
|
-- concurrently launch all handler threads, if one of them throws an exception
|
||||||
-- all get cancelled
|
-- all get cancelled
|
||||||
concurrently_
|
concurrently_
|
||||||
(fediMessageHandler sendQ recvQ nsSTM) $
|
(fediMessageHandler sendQ recvQ nodeSTM) $
|
||||||
concurrently_ (stabiliseThread nsSTM) $
|
concurrently_ (stabiliseThread nsSTM) $
|
||||||
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
||||||
concurrently_ (convergenceSampleThread nsSTM) $
|
concurrently_ (convergenceSampleThread nsSTM) $
|
||||||
|
@ -668,20 +668,17 @@ requestMapPurge purgeAge mapVar = forever $ do
|
||||||
fediMessageHandler :: Service s (RealNodeSTM s)
|
fediMessageHandler :: Service s (RealNodeSTM s)
|
||||||
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||||
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
||||||
-> LocalNodeStateSTM s -- ^ acting NodeState
|
-> RealNodeSTM s -- ^ node
|
||||||
-> IO ()
|
-> IO ()
|
||||||
fediMessageHandler sendQ recvQ nsSTM = do
|
fediMessageHandler sendQ recvQ nodeSTM = do
|
||||||
-- Read node state just once, assuming that all relevant data for this function does
|
nodeConf <- nodeConfig <$> readTVarIO nodeSTM
|
||||||
-- not change.
|
|
||||||
-- Other functions are passed the nsSTM reference and thus can get the latest state.
|
|
||||||
nsSnap <- readTVarIO nsSTM
|
|
||||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode nsSnap)
|
|
||||||
-- handling multipart messages:
|
-- handling multipart messages:
|
||||||
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
||||||
requestMap <- newMVar (Map.empty :: RequestMap)
|
requestMap <- newMVar (Map.empty :: RequestMap)
|
||||||
-- run receive loop and requestMapPurge concurrently, so that an exception makes
|
-- run receive loop and requestMapPurge concurrently, so that an exception makes
|
||||||
-- both of them fail
|
-- both of them fail
|
||||||
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
|
concurrently_ (requestMapPurge (confResponsePurgeAge nodeConf) requestMap) $ forever $ do
|
||||||
|
node <- readTVarIO nodeSTM
|
||||||
-- wait for incoming messages
|
-- wait for incoming messages
|
||||||
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
||||||
let aMsg = deserialiseMessage rawMsg
|
let aMsg = deserialiseMessage rawMsg
|
||||||
|
@ -691,12 +688,14 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
||||||
)
|
)
|
||||||
(\validMsg ->
|
(\validMsg ->
|
||||||
case validMsg of
|
case validMsg of
|
||||||
aRequest@Request{}
|
aRequest@Request{} -> case dispatchVS node aRequest of
|
||||||
|
-- if no match to an active vserver ID, just ignore
|
||||||
|
Nothing -> pure ()
|
||||||
-- if not a multipart message, handle immediately. Response is at the same time an ACK
|
-- if not a multipart message, handle immediately. Response is at the same time an ACK
|
||||||
| part aRequest == 1 && isFinalPart aRequest ->
|
Just nsSTM | part aRequest == 1 && isFinalPart aRequest ->
|
||||||
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
|
forkIO (handleIncomingRequest nsSTM sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
|
||||||
-- otherwise collect all message parts first before handling the whole request
|
-- otherwise collect all message parts first before handling the whole request
|
||||||
| otherwise -> do
|
Just nsSTM | otherwise -> do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
-- critical locking section of requestMap
|
-- critical locking section of requestMap
|
||||||
rMapState <- takeMVar requestMap
|
rMapState <- takeMVar requestMap
|
||||||
|
@ -714,7 +713,7 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
||||||
-- put map back into MVar, end of critical section
|
-- put map back into MVar, end of critical section
|
||||||
putMVar requestMap newMapState
|
putMVar requestMap newMapState
|
||||||
-- ACK the received part
|
-- ACK the received part
|
||||||
forM_ (ackRequest (getNid nsSnap) aRequest) $
|
forM_ (ackRequest aRequest) $
|
||||||
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
|
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
|
||||||
-- if all parts received, then handle request.
|
-- if all parts received, then handle request.
|
||||||
let
|
let
|
||||||
|
@ -730,6 +729,8 @@ fediMessageHandler sendQ recvQ nsSTM = do
|
||||||
aMsg
|
aMsg
|
||||||
|
|
||||||
pure ()
|
pure ()
|
||||||
|
where
|
||||||
|
dispatchVS node req = HMap.lookup (receiverID req) (vservers node)
|
||||||
|
|
||||||
|
|
||||||
-- ==== interface to service layer ====
|
-- ==== interface to service layer ====
|
||||||
|
|
Loading…
Reference in a new issue