parent
df479982fa
commit
2b39648a77
|
@ -587,7 +587,6 @@ sendQueryIdMessages targetID ns lParam targets = do
|
||||||
|
|
||||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||||
let srcAddr = confIP nodeConf
|
let srcAddr = confIP nodeConf
|
||||||
-- ToDo: make attempts and timeout configurable
|
|
||||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
||||||
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
||||||
)) targets
|
)) targets
|
||||||
|
|
|
@ -17,6 +17,7 @@ import Control.Monad.IO.Class (liftIO)
|
||||||
import Data.Bifunctor
|
import Data.Bifunctor
|
||||||
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
|
import Data.Either (rights)
|
||||||
import qualified Data.HashMap.Strict as HMap
|
import qualified Data.HashMap.Strict as HMap
|
||||||
import qualified Data.HashSet as HSet
|
import qualified Data.HashSet as HSet
|
||||||
import Data.Maybe (fromJust, isJust)
|
import Data.Maybe (fromJust, isJust)
|
||||||
|
@ -611,6 +612,33 @@ fetchTagPosts serv = forever $ do
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
|
-- TODO: paralellelisation
|
||||||
|
-- TODO: make sure it doesn't busy-wait
|
||||||
|
relayWorker :: PostService d -> IO ()
|
||||||
|
relayWorker serv = forever $ do
|
||||||
|
subscriptionMap <- readTVarIO $ subscribers serv
|
||||||
|
-- for each tag, try to deliver some posts to subscriber
|
||||||
|
forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do
|
||||||
|
subscriberMap <- readTVarIO subscriberMapSTM
|
||||||
|
forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do
|
||||||
|
postsToDeliver <- readUpTo 500 postChan
|
||||||
|
response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
|
||||||
|
-- so far just dropping failed attempts, TODO: retry mechanism
|
||||||
|
-- TODO: stats
|
||||||
|
pure ()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
where
|
||||||
|
readUpTo :: Int -> TChan a -> IO [a]
|
||||||
|
readUpTo 0 _ = pure []
|
||||||
|
readUpTo n chan = do
|
||||||
|
readFromChan <- atomically (tryReadTChan chan)
|
||||||
|
case readFromChan of
|
||||||
|
Nothing -> pure []
|
||||||
|
Just val -> do
|
||||||
|
moreReads <- readUpTo (pred n) chan
|
||||||
|
pure (val:moreReads)
|
||||||
|
|
||||||
-- ======= statistics/measurement and logging =======
|
-- ======= statistics/measurement and logging =======
|
||||||
|
|
||||||
data StatsEventType = PostPublishEvent
|
data StatsEventType = PostPublishEvent
|
||||||
|
|
Loading…
Reference in a new issue