diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 8258ca3..3639c08 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -587,7 +587,6 @@ sendQueryIdMessages targetID ns lParam targets = do nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) let srcAddr = confIP nodeConf - -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) )) targets diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index b111455..099855d 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -17,6 +17,7 @@ import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU +import Data.Either (rights) import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet import Data.Maybe (fromJust, isJust) @@ -611,6 +612,33 @@ fetchTagPosts serv = forever $ do pure () +-- TODO: paralellelisation +-- TODO: make sure it doesn't busy-wait +relayWorker :: PostService d -> IO () +relayWorker serv = forever $ do + subscriptionMap <- readTVarIO $ subscribers serv + -- for each tag, try to deliver some posts to subscriber + forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do + subscriberMap <- readTVarIO subscriberMapSTM + forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do + postsToDeliver <- readUpTo 500 postChan + response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) + -- so far just dropping failed attempts, TODO: retry mechanism + -- TODO: stats + pure () + ) + ) + where + readUpTo :: Int -> TChan a -> IO [a] + readUpTo 0 _ = pure [] + readUpTo n chan = do + readFromChan <- atomically (tryReadTChan chan) + case readFromChan of + Nothing -> pure [] + Just val -> do + moreReads <- readUpTo (pred n) chan + pure (val:moreReads) + -- ======= statistics/measurement and logging ======= data StatsEventType = PostPublishEvent