diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 3639c08..8258ca3 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -587,6 +587,7 @@ sendQueryIdMessages targetID ns lParam targets = do nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns) let srcAddr = confIP nodeConf + -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing) )) targets diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 099855d..c1ea936 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -17,7 +17,6 @@ import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU -import Data.Either (rights) import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet import Data.Maybe (fromJust, isJust) @@ -612,33 +611,6 @@ fetchTagPosts serv = forever $ do pure () --- TODO: paralellelisation --- TODO: make sure it doesn't busy-wait -relayWorker :: PostService d -> IO () -relayWorker serv = forever $ do - subscriptionMap <- readTVarIO $ subscribers serv - -- for each tag, try to deliver some posts to subscriber - forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do - subscriberMap <- readTVarIO subscriberMapSTM - forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do - postsToDeliver <- readUpTo 500 postChan - response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) "")) - -- so far just dropping failed attempts, TODO: retry mechanism - -- TODO: stats - pure () - ) - ) - where - readUpTo :: Int -> TChan a -> IO [a] - readUpTo 0 _ = pure [] - readUpTo n chan = do - readFromChan <- atomically (tryReadTChan chan) - case readFromChan of - Nothing -> pure [] - Just val -> do - moreReads <- readUpTo (pred n) chan - pure (val:moreReads) - -- ======= statistics/measurement and logging ======= data StatsEventType = PostPublishEvent @@ -741,8 +713,8 @@ evaluateStats timeInterval summedStats = -- first sum all event numbers, then divide through number of seconds passed to -- get rate per second RelayStats - { relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats - , relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats + { relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats + , relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats , postPublishRate = postPublishRate summedStats / intervalSeconds , postFetchRate = postFetchRate summedStats / intervalSeconds } diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index a2fe3ae..8416278 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -25,29 +25,6 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap) - -instance (Bounded k, Ord k) => Functor (RingMap k) where - -- | map a function over all payload values of a 'RingMap' - fmap f = RingMap . Map.map traversingF . getRingMap - where - traversingF (KeyEntry a) = KeyEntry (f a) - traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry) - traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing - - -instance (Bounded k, Ord k) => Foldable (RingMap k) where - foldr f initVal = Map.foldr traversingFR initVal . getRingMap - where - traversingFR (KeyEntry a) acc = f a acc - traversingFR (ProxyEntry _ Nothing) acc = acc - traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc - foldl f initVal = Map.foldl traversingFL initVal . getRingMap - where - traversingFL acc (KeyEntry a) = f acc a - traversingFL acc (ProxyEntry _ Nothing) = acc - traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry - - -- | entry of a 'RingMap' that holds a value and can also -- wrap around the lookup direction at the edges of the name space. data RingEntry k a = KeyEntry a @@ -270,3 +247,14 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k) -> RingMap k a -> [a] takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] + + +-- | map a function over all payload values of a 'RingMap' +mapRMap :: (Bounded k, Ord k, Num k) + => (a -> b) -> RingMap k a -> RingMap k b +mapRMap f = RingMap . Map.map traversingF . getRingMap + where + --traversingF :: RingEntry k a -> RingEntry k b + traversingF (KeyEntry a) = KeyEntry (f a) + traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry) + traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing