diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 47a4059..15901e0 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -1,4 +1,4 @@ -{-# laNGUAGE DataKinds #-} +{-# LANGUAGE DataKinds #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -58,6 +58,7 @@ data PostService d = PostService , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously , postFetchQueue :: TQueue PostID + , httpMan :: HTTP.Manager } deriving (Typeable) @@ -84,6 +85,7 @@ instance DHT d => Service PostService d where ownPostVar <- newTVarIO HSet.empty relayInQueue' <- newTQueueIO postFetchQueue' <- newTQueueIO + httpMan' <- HTTP.newManager HTTP.defaultManagerSettings let thisService = PostService { serviceConf = conf @@ -94,6 +96,7 @@ instance DHT d => Service PostService d where , ownPosts = ownPostVar , relayInQueue = relayInQueue' , postFetchQueue = postFetchQueue' + , httpMan = httpMan' } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings @@ -334,8 +337,7 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do "" intervalTags -- send subscribers - httpMan <- HTTP.newManager HTTP.defaultManagerSettings - resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) "")) + resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) "")) -- on failure return a Left, otherwise delete subscription entry case resp of Left err -> pure . Left . show $ err @@ -352,6 +354,21 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do haveRead <- tryReadTChan chan maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead + +-- | Subscribe the client to the given hashtag. On success it returns the given lease time. +clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) +clientSubscribeTo serv tag = do + lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) + maybe + (pure . Left $ "No node found") + (\(foundHost, foundPort) -> do + resp <- runClientM (tagSubscribeClient tag (Just . fromString . confServiceHost . serviceConf $ serv)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) "")) + case resp of + Left err -> pure . Left . show $ err + Right lease -> pure . Right $ lease + ) + lookupRes + -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI getClients hostname' port' httpMan = hoistClient clientAPI @@ -469,8 +486,7 @@ processIncomingPosts serv = forever $ do -- no vserver active => wait and retry Nothing -> threadDelay $ 10 * 10^6 Just (responsibleHost, responsiblePort) -> do - httpMan <- HTTP.newManager HTTP.defaultManagerSettings - resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) + resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of Left err -> do putStrLn $ "Error: " <> show err @@ -489,9 +505,8 @@ fetchTagPosts serv = forever $ do -- TODO: batching, retry -- TODO: process multiple in parallel pIdUri <- atomically . readTQueue $ postFetchQueue serv - httpMan <- HTTP.newManager HTTP.defaultManagerSettings fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri - resp <- try $ HTTP.httpLbs fetchReq httpMan :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) + resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) case resp of Right response -> if HTTPT.statusCode (HTTP.responseStatus response) == 200