worker thread for processing incoming posts in background, started together with web server

This commit is contained in:
Trolli Schmittlauch 2020-07-28 02:12:03 +02:00
parent 736815ea83
commit 3b65757406

View file

@ -9,6 +9,7 @@
module Hash2Pub.PostService where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
@ -83,7 +84,14 @@ instance DHT d => Service PostService d where
}
port' = fromIntegral port
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
servThreadID <- forkIO $ Warp.runSettings warpSettings $ postServiceApplication thisService
-- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well.
servThreadID <- forkIO $
concurrently_
-- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService)
(processIncomingPosts thisService)
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
@ -175,3 +183,21 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where
mimeRender _ = BSU.fromString . show
-- ====== worker threads ======
-- | process the pending relays of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do
-- blocks until available
-- TODO: process multiple in parallel
(t, pID, pC) <- atomically . readTQueue $ relayInQueue serv
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack t)
case lookupRes of
-- no vserver active => wait and retry
Nothing -> threadDelay $ 10 * 10^6
Just (responsibleHost, responsiblePort) -> do
-- TODO: do actual HTTP requests
pure ()