Compare commits

...

3 commits

4 changed files with 113 additions and 37 deletions

View file

@ -21,6 +21,7 @@ module Hash2Pub.DHTProtocol
, mkSendSocket , mkSendSocket
, mkServerSocket , mkServerSocket
, handleIncomingRequest , handleIncomingRequest
, ackRequest
) )
where where
@ -150,20 +151,38 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
isJoined_ :: LocalNodeState -> Bool isJoined_ :: LocalNodeState -> Bool
isJoined_ ns = not . all null $ [successors ns, predecessors ns] isJoined_ ns = not . all null $ [successors ns, predecessors ns]
-- | the size limit to be used when serialising messages for sending
sendMessageSize :: Num i => i
sendMessageSize = 1200
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
-- encode the response to a request that just signals successful receipt
ackRequest :: NodeID -> FediChordMessage -> Map.Map Integer BS.ByteString
ackRequest ownID req@Request{} = serialiseMessage sendMessageSize $ Response {
responseTo = requestID req
, senderID = ownID
, part = part req
, isFinalPart = False
, action = action req
, payload = Nothing
}
handleIncomingRequest :: LocalNodeState -- ^ the handling node handleIncomingRequest :: LocalNodeState -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> FediChordMessage -- ^ request to handle -> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request -> SockAddr -- ^ source address of the request
-> IO () -> IO ()
handleIncomingRequest ns sendQ msg sourceAddr = do handleIncomingRequest ns sendQ msgSet sourceAddr = do
-- add nodestate to cache -- add nodestate to cache
now <- getPOSIXTime now <- getPOSIXTime
queueAddEntries (Identity . RemoteCacheEntry (sender msg) $ now) ns queueAddEntries (Identity . RemoteCacheEntry (sender . head . Set.elems $ msgSet) $ now) ns
-- distinguish on whether and how to respond -- distinguish on whether and how to respond
-- create and enqueue ACK -- create and enqueue ACK
-- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on -- Idea: only respond with payload on last part (part == parts), problem: need to know partnumber of response from first part on
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
-- TODO: test case: mixed message types of parts
-- PLACEHOLDER -- PLACEHOLDER
pure () pure ()
@ -272,7 +291,7 @@ sendRequestTo :: Int -- ^ timeout in seconds
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- give the message a random request ID -- give the message a random request ID
randomID <- randomRIO (0, 2^32-1) randomID <- randomRIO (0, 2^32-1)
let requests = serialiseMessage 1200 $ msgIncomplete randomID let requests = serialiseMessage sendMessageSize $ msgIncomplete randomID
-- create a queue for passing received response messages back, even after a timeout -- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout -- start sendAndAck with timeout

View file

@ -47,10 +47,12 @@ module Hash2Pub.FediChord (
, cacheWriter , cacheWriter
) where ) where
import Control.Applicative ((<|>))
import Control.Exception import Control.Exception
import Data.Foldable (foldr') import Data.Foldable (foldr')
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust, mapMaybe) import Data.Maybe (fromJust, fromMaybe, isJust,
mapMaybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, import Network.Socket hiding (recv, recvFrom, send,
@ -62,7 +64,7 @@ import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Monad (forever) import Control.Monad (forM_, forever)
import Crypto.Hash import Crypto.Hash
import qualified Data.ByteArray as BA import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
@ -207,26 +209,83 @@ fediMainThreads sock ns = do
(recvThread sock recvQ) (recvThread sock recvQ)
-- defining this here as, for now, the RequestMap is only used by fediMessageHandler.
-- Once that changes, move to FediChordTypes
type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
POSIXTime
-- TODO: make purge age configurable
-- | periodically clean up old request parts
purgeAge :: POSIXTime
purgeAge = 60 -- seconds
requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do
rMapState <- takeMVar mapVar
now <- getPOSIXTime
putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
now - ts < purgeAge
) rMapState
threadDelay $ fromEnum purgeAge * 2000
-- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function. -- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeState -- ^ acting NodeState -> LocalNodeState -- ^ acting NodeState
-> IO () -> IO ()
fediMessageHandler sendQ recvQ ns = forever $ do fediMessageHandler sendQ recvQ ns = do
-- handling multipart messages:
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
requestMap <- newMVar (Map.empty :: RequestMap)
-- run receive loop and requestMapPurge concurrently, so that an exception makes
-- both of them fail
concurrently_ (requestMapPurge requestMap) $ forever $ do
-- wait for incoming messages -- wait for incoming messages
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
let aMsg = deserialiseMessage rawMsg let aMsg = deserialiseMessage rawMsg
-- handling multipart messages:
-- So far I handle the effects of each message part immedeiately, before making sure that and whether all parts have been received, based on the idea that even incomplete information is beneficial and handled idempotent.
-- If this turns out not to be the case, request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
either (\_ -> either (\_ ->
-- drop invalid messages -- drop invalid messages
pure () pure ()
) )
(\validMsg -> (\validMsg ->
case validMsg of case validMsg of
aRequest@Request{} -> forkIO (handleIncomingRequest ns sendQ aRequest sourceAddr) >> pure () aRequest@Request{}
-- if not a multipart message, handle immediately. Response is at the same time an ACK
| part aRequest == 1 && isFinalPart aRequest ->
forkIO (handleIncomingRequest ns sendQ (Set.singleton aRequest) sourceAddr) >> pure ()
-- otherwise collect all message parts first before handling the whole request
| otherwise -> do
now <- getPOSIXTime
-- critical locking section of requestMap
rMapState <- takeMVar requestMap
-- insert new message and get set
let
theseMaxParts = if isFinalPart aRequest then Just (part aRequest) else Nothing
thisKey = (sourceAddr, requestID aRequest)
newMapState = Map.insertWith (\
(RequestMapEntry thisMsgSet p' ts) (RequestMapEntry oldMsgSet p'' _) ->
RequestMapEntry (thisMsgSet `Set.union` oldMsgSet) (p' <|> p'') ts
)
thisKey
(RequestMapEntry (Set.singleton aRequest) theseMaxParts now)
rMapState
-- put map back into MVar, end of critical section
putMVar requestMap newMapState
-- ACK the received part
forM_ (ackRequest (getNid ns) aRequest) $
\msg -> atomically $ writeTQueue sendQ (msg, sourceAddr)
-- if all parts received, then handle request.
let
(RequestMapEntry theseParts mayMaxParts _) = fromJust $ Map.lookup thisKey newMapState
numParts = Set.size theseParts
if maybe False (numParts ==) (fromIntegral <$> mayMaxParts)
then forkIO (handleIncomingRequest ns sendQ theseParts sourceAddr) >> pure()
else pure()
-- Responses should never arrive on the main server port, as they are always -- Responses should never arrive on the main server port, as they are always
-- responses to requests sent from dedicated sockets on another port -- responses to requests sent from dedicated sockets on another port
_ -> pure () _ -> pure ()

View file

@ -42,8 +42,6 @@ main = do
wait =<< async (fediMainThreads serverSock thisNode) wait =<< async (fediMainThreads serverSock thisNode)
) )
joinedState joinedState
-- stop main thread from terminating during development
getChar
pure () pure ()