simple post fetch worker thread

This commit is contained in:
Trolli Schmittlauch 2020-08-13 13:07:50 +02:00
parent c1ce386b65
commit 580410e0b4
2 changed files with 30 additions and 4 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types
ghc-options: -Wall

View file

@ -1,4 +1,4 @@
{-# LANGUAGE DataKinds #-}
{-# laNGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
@ -15,7 +15,7 @@ import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception (Exception (..))
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
@ -31,6 +31,7 @@ import Data.Text.Normalize (NormalizationMode (NFC),
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import System.Random
import Text.Read (readEither)
@ -341,7 +342,7 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
Right _ -> do
atomically $
modifyTVar' (subscribers serv) $ \tagMap ->
foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags)
foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags)
pure . Right $ ()
where
channelGetAll :: TChan a -> STM [a]
@ -479,3 +480,28 @@ processIncomingPosts serv = forever $ do
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
Right yay -> putStrLn $ "Yay! " <> show yay
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
fetchTagPosts :: DHT d => PostService d -> IO ()
fetchTagPosts serv = forever $ do
-- blocks until available
-- TODO: batching, retry
-- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri
resp <- try $ HTTP.httpLbs fetchReq httpMan :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of
Right response ->
if HTTPT.statusCode (HTTP.responseStatus response) == 200
then
-- success, TODO: statistics
putStrLn "post fetch success"
else
-- TODO error handling, retry
pure ()
Left _ ->
-- TODO error handling, retry
pure ()