diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index e44c8c6..8811080 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -9,6 +9,7 @@ module Hash2Pub.PostService where import Control.Concurrent +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TQueue @@ -83,7 +84,14 @@ instance DHT d => Service PostService d where } port' = fromIntegral port warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings - servThreadID <- forkIO $ Warp.runSettings warpSettings $ postServiceApplication thisService + -- Run 'concurrently_' from another thread to be able to return the + -- 'PostService'. + -- Terminating that parent thread will make all child threads terminate as well. + servThreadID <- forkIO $ + concurrently_ + -- web server + (Warp.runSettings warpSettings $ postServiceApplication thisService) + (processIncomingPosts thisService) -- update thread ID after fork atomically $ writeTVar threadVar servThreadID pure thisService @@ -175,3 +183,21 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where mimeRender _ = BSU.fromString . show + + +-- ====== worker threads ====== + +-- | process the pending relays of incoming posts from the internal queue: +-- Look up responsible relay node for given hashtag and forward post to it +processIncomingPosts :: DHT d => PostService d -> IO () +processIncomingPosts serv = forever $ do + -- blocks until available + -- TODO: process multiple in parallel + (t, pID, pC) <- atomically . readTQueue $ relayInQueue serv + lookupRes <- lookupKey (baseDHT serv) (Txt.unpack t) + case lookupRes of + -- no vserver active => wait and retry + Nothing -> threadDelay $ 10 * 10^6 + Just (responsibleHost, responsiblePort) -> do + -- TODO: do actual HTTP requests + pure ()