From df479982fa15b463a2c0f0daab99cad4755eb32c Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 8 Sep 2020 08:46:36 +0200 Subject: [PATCH 1/2] make RingMap instance of Functor and Foldable --- src/Hash2Pub/PostService.hs | 4 ++-- src/Hash2Pub/RingMap.hs | 34 +++++++++++++++++++++++----------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index c1ea936..b111455 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -713,8 +713,8 @@ evaluateStats timeInterval summedStats = -- first sum all event numbers, then divide through number of seconds passed to -- get rate per second RelayStats - { relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats - , relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats + { relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats + , relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats , postPublishRate = postPublishRate summedStats / intervalSeconds , postFetchRate = postFetchRate summedStats / intervalSeconds } diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index 8416278..a2fe3ae 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -25,6 +25,29 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap) + +instance (Bounded k, Ord k) => Functor (RingMap k) where + -- | map a function over all payload values of a 'RingMap' + fmap f = RingMap . Map.map traversingF . getRingMap + where + traversingF (KeyEntry a) = KeyEntry (f a) + traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry) + traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing + + +instance (Bounded k, Ord k) => Foldable (RingMap k) where + foldr f initVal = Map.foldr traversingFR initVal . getRingMap + where + traversingFR (KeyEntry a) acc = f a acc + traversingFR (ProxyEntry _ Nothing) acc = acc + traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc + foldl f initVal = Map.foldl traversingFL initVal . getRingMap + where + traversingFL acc (KeyEntry a) = f acc a + traversingFL acc (ProxyEntry _ Nothing) = acc + traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry + + -- | entry of a 'RingMap' that holds a value and can also -- wrap around the lookup direction at the edges of the name space. data RingEntry k a = KeyEntry a @@ -247,14 +270,3 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k) -> RingMap k a -> [a] takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] - - --- | map a function over all payload values of a 'RingMap' -mapRMap :: (Bounded k, Ord k, Num k) - => (a -> b) -> RingMap k a -> RingMap k b -mapRMap f = RingMap . Map.map traversingF . getRingMap - where - --traversingF :: RingEntry k a -> RingEntry k b - traversingF (KeyEntry a) = KeyEntry (f a) - traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry) - traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing From 2b39648a77dcc16c5689020d26dcd6abc8218a89 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 9 Sep 2020 11:39:48 +0200 Subject: [PATCH 2/2] actually implement simple relaying of posts was still missing for #41 --- src/Hash2Pub/DHTProtocol.hs | 1 - src/Hash2Pub/PostService.hs | 28 ++++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 8258ca3..3639c08 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -587,7 +587,6 @@ sendQueryIdMessages targetID ns lParam targets = do nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) let srcAddr = confIP nodeConf - -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) )) targets diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index b111455..099855d 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -17,6 +17,7 @@ import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU +import Data.Either (rights) import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet import Data.Maybe (fromJust, isJust) @@ -611,6 +612,33 @@ fetchTagPosts serv = forever $ do pure () +-- TODO: paralellelisation +-- TODO: make sure it doesn't busy-wait +relayWorker :: PostService d -> IO () +relayWorker serv = forever $ do + subscriptionMap <- readTVarIO $ subscribers serv + -- for each tag, try to deliver some posts to subscriber + forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do + subscriberMap <- readTVarIO subscriberMapSTM + forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do + postsToDeliver <- readUpTo 500 postChan + response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) + -- so far just dropping failed attempts, TODO: retry mechanism + -- TODO: stats + pure () + ) + ) + where + readUpTo :: Int -> TChan a -> IO [a] + readUpTo 0 _ = pure [] + readUpTo n chan = do + readFromChan <- atomically (tryReadTChan chan) + case readFromChan of + Nothing -> pure [] + Just val -> do + moreReads <- readUpTo (pred n) chan + pure (val:moreReads) + -- ======= statistics/measurement and logging ======= data StatsEventType = PostPublishEvent