diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 5e8d25d..f7a1676 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays, dlist - ghc-options: -Wall -Wpartial-fields -O2 + ghc-options: -Wall -Wpartial-fields diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 608551f..7a082d0 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -121,8 +121,8 @@ instance DHT d => Service PostService d where -- web server (Warp.runSettings warpSettings $ postServiceApplication thisService) $ concurrently - -- post queue processing - (processIncomingPosts thisService) + -- background processing workers + (launchWorkerThreads thisService) -- statistics/ measurements (launchStatsThreads thisService) -- update thread ID after fork @@ -559,9 +559,9 @@ lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) --- normalise the unicode representation of a string to NFC +-- normalise the unicode representation of a string to NFC and convert to lower case normaliseTag :: Text -> Text -normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict +normaliseTag = Txt.toLower . Txt.fromStrict . normalize NFC . Txt.toStrict -- | convert a hashtag to its representation on the DHT @@ -604,6 +604,13 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where -- TODO: make configurable numParallelDeliveries = 10 +launchWorkerThreads :: DHT d => PostService d -> IO () +launchWorkerThreads serv = concurrently_ + (processIncomingPosts serv) + $ concurrently_ + (fetchTagPosts serv) + (relayWorker serv) + -- | process the pending relay inbox of incoming posts from the internal queue: -- Look up responsible relay node for given hashtag and forward post to it @@ -636,7 +643,7 @@ processIncomingPosts serv = forever $ do now <- getPOSIXTime subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag - when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ + when (maybe True (\subLease -> now - subLease < 120) subscriptionStatus) $ void $ clientSubscribeTo serv tag -- for evaluation, return the tag of the successfully forwarded post