use a shared HTTP manager for requests
This commit is contained in:
parent
dcd4a7b563
commit
375014812a
|
@ -1,4 +1,4 @@
|
||||||
{-# laNGUAGE DataKinds #-}
|
{-# LANGUAGE DataKinds #-}
|
||||||
{-# LANGUAGE FlexibleInstances #-}
|
{-# LANGUAGE FlexibleInstances #-}
|
||||||
{-# LANGUAGE InstanceSigs #-}
|
{-# LANGUAGE InstanceSigs #-}
|
||||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||||
|
@ -58,6 +58,7 @@ data PostService d = PostService
|
||||||
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
|
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
|
||||||
-- ^ Queue for processing incoming posts of own instance asynchronously
|
-- ^ Queue for processing incoming posts of own instance asynchronously
|
||||||
, postFetchQueue :: TQueue PostID
|
, postFetchQueue :: TQueue PostID
|
||||||
|
, httpMan :: HTTP.Manager
|
||||||
}
|
}
|
||||||
deriving (Typeable)
|
deriving (Typeable)
|
||||||
|
|
||||||
|
@ -84,6 +85,7 @@ instance DHT d => Service PostService d where
|
||||||
ownPostVar <- newTVarIO HSet.empty
|
ownPostVar <- newTVarIO HSet.empty
|
||||||
relayInQueue' <- newTQueueIO
|
relayInQueue' <- newTQueueIO
|
||||||
postFetchQueue' <- newTQueueIO
|
postFetchQueue' <- newTQueueIO
|
||||||
|
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
||||||
let
|
let
|
||||||
thisService = PostService {
|
thisService = PostService {
|
||||||
serviceConf = conf
|
serviceConf = conf
|
||||||
|
@ -94,6 +96,7 @@ instance DHT d => Service PostService d where
|
||||||
, ownPosts = ownPostVar
|
, ownPosts = ownPostVar
|
||||||
, relayInQueue = relayInQueue'
|
, relayInQueue = relayInQueue'
|
||||||
, postFetchQueue = postFetchQueue'
|
, postFetchQueue = postFetchQueue'
|
||||||
|
, httpMan = httpMan'
|
||||||
}
|
}
|
||||||
port' = fromIntegral (confServicePort conf)
|
port' = fromIntegral (confServicePort conf)
|
||||||
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
||||||
|
@ -334,8 +337,7 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
||||||
""
|
""
|
||||||
intervalTags
|
intervalTags
|
||||||
-- send subscribers
|
-- send subscribers
|
||||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
|
||||||
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) ""))
|
|
||||||
-- on failure return a Left, otherwise delete subscription entry
|
-- on failure return a Left, otherwise delete subscription entry
|
||||||
case resp of
|
case resp of
|
||||||
Left err -> pure . Left . show $ err
|
Left err -> pure . Left . show $ err
|
||||||
|
@ -469,8 +471,7 @@ processIncomingPosts serv = forever $ do
|
||||||
-- no vserver active => wait and retry
|
-- no vserver active => wait and retry
|
||||||
Nothing -> threadDelay $ 10 * 10^6
|
Nothing -> threadDelay $ 10 * 10^6
|
||||||
Just (responsibleHost, responsiblePort) -> do
|
Just (responsibleHost, responsiblePort) -> do
|
||||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
||||||
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
|
||||||
case resp of
|
case resp of
|
||||||
Left err -> do
|
Left err -> do
|
||||||
putStrLn $ "Error: " <> show err
|
putStrLn $ "Error: " <> show err
|
||||||
|
@ -489,9 +490,8 @@ fetchTagPosts serv = forever $ do
|
||||||
-- TODO: batching, retry
|
-- TODO: batching, retry
|
||||||
-- TODO: process multiple in parallel
|
-- TODO: process multiple in parallel
|
||||||
pIdUri <- atomically . readTQueue $ postFetchQueue serv
|
pIdUri <- atomically . readTQueue $ postFetchQueue serv
|
||||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
|
||||||
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
|
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
|
||||||
resp <- try $ HTTP.httpLbs fetchReq httpMan :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
|
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
|
||||||
case resp of
|
case resp of
|
||||||
Right response ->
|
Right response ->
|
||||||
if HTTPT.statusCode (HTTP.responseStatus response) == 200
|
if HTTPT.statusCode (HTTP.responseStatus response) == 200
|
||||||
|
|
Loading…
Reference in a new issue