From 580410e0b4c78150bedfe51d3e5c27a24d02b550 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 13 Aug 2020 13:07:50 +0200 Subject: [PATCH] simple post fetch worker thread --- Hash2Pub.cabal | 2 +- src/Hash2Pub/PostService.hs | 32 +++++++++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 251c60d..5ffff0d 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types ghc-options: -Wall diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 17d585b..47a4059 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE DataKinds #-} +{-# laNGUAGE DataKinds #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE MultiParamTypeClasses #-} @@ -15,7 +15,7 @@ import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar -import Control.Exception (Exception (..)) +import Control.Exception (Exception (..), try) import Control.Monad (foldM, forM, forM_, forever) import Control.Monad.IO.Class (liftIO) import Control.Monad.STM @@ -31,6 +31,7 @@ import Data.Text.Normalize (NormalizationMode (NFC), import Data.Time.Clock.POSIX import Data.Typeable (Typeable) import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Types as HTTPT import System.Random import Text.Read (readEither) @@ -341,7 +342,7 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do Right _ -> do atomically $ modifyTVar' (subscribers serv) $ \tagMap -> - foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) + foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) pure . Right $ () where channelGetAll :: TChan a -> STM [a] @@ -479,3 +480,28 @@ processIncomingPosts serv = forever $ do _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) Right yay -> putStrLn $ "Yay! " <> show yay + + +-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID +fetchTagPosts :: DHT d => PostService d -> IO () +fetchTagPosts serv = forever $ do + -- blocks until available + -- TODO: batching, retry + -- TODO: process multiple in parallel + pIdUri <- atomically . readTQueue $ postFetchQueue serv + httpMan <- HTTP.newManager HTTP.defaultManagerSettings + fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri + resp <- try $ HTTP.httpLbs fetchReq httpMan :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) + case resp of + Right response -> + if HTTPT.statusCode (HTTP.responseStatus response) == 200 + then + -- success, TODO: statistics + putStrLn "post fetch success" + else + -- TODO error handling, retry + pure () + Left _ -> + -- TODO error handling, retry + pure () +