Compare commits
2 commits
dcd4a7b563
...
e9ae258dde
Author | SHA1 | Date | |
---|---|---|---|
|
e9ae258dde | ||
|
375014812a |
|
@ -1,4 +1,4 @@
|
|||
{-# laNGUAGE DataKinds #-}
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE InstanceSigs #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
|
@ -58,6 +58,7 @@ data PostService d = PostService
|
|||
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
|
||||
-- ^ Queue for processing incoming posts of own instance asynchronously
|
||||
, postFetchQueue :: TQueue PostID
|
||||
, httpMan :: HTTP.Manager
|
||||
}
|
||||
deriving (Typeable)
|
||||
|
||||
|
@ -84,6 +85,7 @@ instance DHT d => Service PostService d where
|
|||
ownPostVar <- newTVarIO HSet.empty
|
||||
relayInQueue' <- newTQueueIO
|
||||
postFetchQueue' <- newTQueueIO
|
||||
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
||||
let
|
||||
thisService = PostService {
|
||||
serviceConf = conf
|
||||
|
@ -94,6 +96,7 @@ instance DHT d => Service PostService d where
|
|||
, ownPosts = ownPostVar
|
||||
, relayInQueue = relayInQueue'
|
||||
, postFetchQueue = postFetchQueue'
|
||||
, httpMan = httpMan'
|
||||
}
|
||||
port' = fromIntegral (confServicePort conf)
|
||||
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
||||
|
@ -334,8 +337,7 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
|||
""
|
||||
intervalTags
|
||||
-- send subscribers
|
||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
||||
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) ""))
|
||||
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
|
||||
-- on failure return a Left, otherwise delete subscription entry
|
||||
case resp of
|
||||
Left err -> pure . Left . show $ err
|
||||
|
@ -352,6 +354,21 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
|||
haveRead <- tryReadTChan chan
|
||||
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
|
||||
|
||||
|
||||
-- | Subscribe the client to the given hashtag. On success it returns the given lease time.
|
||||
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
|
||||
clientSubscribeTo serv tag = do
|
||||
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
|
||||
maybe
|
||||
(pure . Left $ "No node found")
|
||||
(\(foundHost, foundPort) -> do
|
||||
resp <- runClientM (tagSubscribeClient tag (Just . fromString . confServiceHost . serviceConf $ serv)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
|
||||
case resp of
|
||||
Left err -> pure . Left . show $ err
|
||||
Right lease -> pure . Right $ lease
|
||||
)
|
||||
lookupRes
|
||||
|
||||
-- currently this is unused code
|
||||
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
|
||||
getClients hostname' port' httpMan = hoistClient clientAPI
|
||||
|
@ -469,8 +486,7 @@ processIncomingPosts serv = forever $ do
|
|||
-- no vserver active => wait and retry
|
||||
Nothing -> threadDelay $ 10 * 10^6
|
||||
Just (responsibleHost, responsiblePort) -> do
|
||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
||||
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
||||
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
||||
case resp of
|
||||
Left err -> do
|
||||
putStrLn $ "Error: " <> show err
|
||||
|
@ -489,9 +505,8 @@ fetchTagPosts serv = forever $ do
|
|||
-- TODO: batching, retry
|
||||
-- TODO: process multiple in parallel
|
||||
pIdUri <- atomically . readTQueue $ postFetchQueue serv
|
||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
||||
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
|
||||
resp <- try $ HTTP.httpLbs fetchReq httpMan :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
|
||||
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
|
||||
case resp of
|
||||
Right response ->
|
||||
if HTTPT.statusCode (HTTP.responseStatus response) == 200
|
||||
|
|
Loading…
Reference in a new issue