From 61818c58a9429422538e3ae7786cc28595818017 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 29 May 2020 17:39:35 +0200 Subject: [PATCH] main server thread structure --- src/Hash2Pub/FediChord.hs | 55 ++++++++++++++++++++++++++++++++++++++- src/Hash2Pub/Main.hs | 7 +++-- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 76ab1a2..fe6d0aa 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -39,6 +39,7 @@ module Hash2Pub.FediChord ( , fediChordInit , fediChordJoin , fediChordBootstrapJoin + , fediMainThreads , nodeStateInit , mkServerSocket , mkSendSocket @@ -52,9 +53,11 @@ import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isJust, mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX -import Network.Socket +import Network.Socket hiding (recv, recvFrom, send, sendTo) +import Network.Socket.ByteString -- for hashing and ID conversion +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TQueue import Control.Monad (forever) @@ -169,3 +172,53 @@ cacheWriter ns = do refModifier :: NodeCache -> (NodeCache, ()) refModifier nc = (f nc, ()) atomicModifyIORef' (nodeCacheRef ns) refModifier + +-- | Receives UDP packets and passes them to other threads via the given TQueue. +-- Shall be used as the single receiving thread on the server socket, as multiple +-- threads blocking on the same socket degrades performance. +recvThread :: Socket -- ^ server socket to receive packets from + -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue + -> IO () +recvThread sock recvQ = forever $ do + packet <- recvFrom sock 65535 + atomically $ writeTQueue recvQ packet + +-- | Only thread to send data it gets from a TQueue through the server socket. +sendThread :: Socket -- ^ server socket used for sending + -> TQueue (BS.ByteString, SockAddr) -- ^ send queue + -> IO () +sendThread sock sendQ = forever $ do + (packet, addr) <- atomically $ readTQueue sendQ + sendAllTo sock packet addr + +-- | Sets up and manages the main server threads of FediChord +fediMainThreads :: Socket -> LocalNodeState -> IO () +fediMainThreads sock ns = do + sendQ <- newTQueueIO + recvQ <- newTQueueIO + -- concurrently launch all handler threads, if one of them throws an exception + -- all get cancelled + concurrently_ + (fediMessageHandler sendQ recvQ ns) $ + concurrently + (sendThread sock sendQ) + (recvThread sock recvQ) + + +-- | Wait for messages, deserialise them, manage parts and acknowledgement status, +-- and pass them to their specific handling function. +fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue + -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue + -> LocalNodeState -- ^ acting NodeState + -> IO () +fediMessageHandler sendQ recvQ ns = forever $ do + -- wait for incoming messages +-- newMsg <- deserialiseMessage <$> recvFrom sock +-- either (\_ -> +-- -- ignore invalid messages +-- pure () +-- ) +-- (\aMsg -> +-- case aMsg of +-- aRequest@Request{} -> handleRequest + pure () diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index 4ab3a48..5b7fad6 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -1,6 +1,7 @@ module Main where import Control.Concurrent +import Control.Concurrent.Async import Control.Exception import Data.Either import Data.IP (IPv6, toHostAddress6) @@ -28,15 +29,17 @@ main = do Right joined -> pure $ Right joined tryJoining [] = pure $ Left "Exhausted all bootstrap points for joining." joinedState <- tryJoining $ confBootstrapNodes conf - either (\err -> + either (\err -> do -- handle unsuccessful join putStrLn $ err <> " Error joining, start listening for incoming requests anyways" + wait =<< async (fediMainThreads serverSock thisNode) -- TODO: periodic retry ) - (\joinedNS -> + (\joinedNS -> do -- launch main eventloop with successfully joined state putStrLn ("successful join at " <> (show . getNid $ joinedNS)) + wait =<< async (fediMainThreads serverSock thisNode) ) joinedState -- stop main thread from terminating during development