main server thread structure

This commit is contained in:
Trolli Schmittlauch 2020-05-29 17:39:35 +02:00
parent b4ecf8b0aa
commit 61818c58a9
2 changed files with 59 additions and 3 deletions

View file

@ -39,6 +39,7 @@ module Hash2Pub.FediChord (
, fediChordInit , fediChordInit
, fediChordJoin , fediChordJoin
, fediChordBootstrapJoin , fediChordBootstrapJoin
, fediMainThreads
, nodeStateInit , nodeStateInit
, mkServerSocket , mkServerSocket
, mkSendSocket , mkSendSocket
@ -52,9 +53,11 @@ import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust, mapMaybe) import Data.Maybe (fromMaybe, isJust, mapMaybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket import Network.Socket hiding (recv, recvFrom, send, sendTo)
import Network.Socket.ByteString
-- for hashing and ID conversion -- for hashing and ID conversion
import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Monad (forever) import Control.Monad (forever)
@ -169,3 +172,53 @@ cacheWriter ns = do
refModifier :: NodeCache -> (NodeCache, ()) refModifier :: NodeCache -> (NodeCache, ())
refModifier nc = (f nc, ()) refModifier nc = (f nc, ())
atomicModifyIORef' (nodeCacheRef ns) refModifier atomicModifyIORef' (nodeCacheRef ns) refModifier
-- | Receives UDP packets and passes them to other threads via the given TQueue.
-- Shall be used as the single receiving thread on the server socket, as multiple
-- threads blocking on the same socket degrades performance.
recvThread :: Socket -- ^ server socket to receive packets from
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> IO ()
recvThread sock recvQ = forever $ do
packet <- recvFrom sock 65535
atomically $ writeTQueue recvQ packet
-- | Only thread to send data it gets from a TQueue through the server socket.
sendThread :: Socket -- ^ server socket used for sending
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> IO ()
sendThread sock sendQ = forever $ do
(packet, addr) <- atomically $ readTQueue sendQ
sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeState -> IO ()
fediMainThreads sock ns = do
sendQ <- newTQueueIO
recvQ <- newTQueueIO
-- concurrently launch all handler threads, if one of them throws an exception
-- all get cancelled
concurrently_
(fediMessageHandler sendQ recvQ ns) $
concurrently
(sendThread sock sendQ)
(recvThread sock recvQ)
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeState -- ^ acting NodeState
-> IO ()
fediMessageHandler sendQ recvQ ns = forever $ do
-- wait for incoming messages
-- newMsg <- deserialiseMessage <$> recvFrom sock
-- either (\_ ->
-- -- ignore invalid messages
-- pure ()
-- )
-- (\aMsg ->
-- case aMsg of
-- aRequest@Request{} -> handleRequest
pure ()

View file

@ -1,6 +1,7 @@
module Main where module Main where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception import Control.Exception
import Data.Either import Data.Either
import Data.IP (IPv6, toHostAddress6) import Data.IP (IPv6, toHostAddress6)
@ -28,15 +29,17 @@ main = do
Right joined -> pure $ Right joined Right joined -> pure $ Right joined
tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining."
joinedState <- tryJoining $ confBootstrapNodes conf joinedState <- tryJoining $ confBootstrapNodes conf
either (\err -> either (\err -> do
-- handle unsuccessful join -- handle unsuccessful join
putStrLn $ err <> " Error joining, start listening for incoming requests anyways" putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
wait =<< async (fediMainThreads serverSock thisNode)
-- TODO: periodic retry -- TODO: periodic retry
) )
(\joinedNS -> (\joinedNS -> do
-- launch main eventloop with successfully joined state -- launch main eventloop with successfully joined state
putStrLn ("successful join at " <> (show . getNid $ joinedNS)) putStrLn ("successful join at " <> (show . getNid $ joinedNS))
wait =<< async (fediMainThreads serverSock thisNode)
) )
joinedState joinedState
-- stop main thread from terminating during development -- stop main thread from terminating during development