Compare commits

..

No commits in common. "e9ae258ddeec73b424528639c379c18d7c3d3e2c" and "dcd4a7b563a046f6e05cac27f6abc4c2160189b9" have entirely different histories.

View file

@ -1,4 +1,4 @@
{-# LANGUAGE DataKinds #-}
{-# laNGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
@ -58,7 +58,6 @@ data PostService d = PostService
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
, httpMan :: HTTP.Manager
}
deriving (Typeable)
@ -85,7 +84,6 @@ instance DHT d => Service PostService d where
ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
let
thisService = PostService {
serviceConf = conf
@ -96,7 +94,6 @@ instance DHT d => Service PostService d where
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
, httpMan = httpMan'
}
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
@ -337,7 +334,8 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
""
intervalTags
-- send subscribers
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) ""))
-- on failure return a Left, otherwise delete subscription entry
case resp of
Left err -> pure . Left . show $ err
@ -354,21 +352,6 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
haveRead <- tryReadTChan chan
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
-- | Subscribe the client to the given hashtag. On success it returns the given lease time.
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
clientSubscribeTo serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
maybe
(pure . Left $ "No node found")
(\(foundHost, foundPort) -> do
resp <- runClientM (tagSubscribeClient tag (Just . fromString . confServiceHost . serviceConf $ serv)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
case resp of
Left err -> pure . Left . show $ err
Right lease -> pure . Right $ lease
)
lookupRes
-- currently this is unused code
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
getClients hostname' port' httpMan = hoistClient clientAPI
@ -486,7 +469,8 @@ processIncomingPosts serv = forever $ do
-- no vserver active => wait and retry
Nothing -> threadDelay $ 10 * 10^6
Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of
Left err -> do
putStrLn $ "Error: " <> show err
@ -505,8 +489,9 @@ fetchTagPosts serv = forever $ do
-- TODO: batching, retry
-- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
resp <- try $ HTTP.httpLbs fetchReq httpMan :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of
Right response ->
if HTTPT.statusCode (HTTP.responseStatus response) == 200