Compare commits
2 commits
c536994afe
...
2b39648a77
Author | SHA1 | Date | |
---|---|---|---|
|
2b39648a77 | ||
|
df479982fa |
|
@ -587,7 +587,6 @@ sendQueryIdMessages targetID ns lParam targets = do
|
|||
|
||||
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
|
||||
let srcAddr = confIP nodeConf
|
||||
-- ToDo: make attempts and timeout configurable
|
||||
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
|
||||
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
|
||||
)) targets
|
||||
|
|
|
@ -17,6 +17,7 @@ import Control.Monad.IO.Class (liftIO)
|
|||
import Data.Bifunctor
|
||||
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
||||
import qualified Data.ByteString.UTF8 as BSU
|
||||
import Data.Either (rights)
|
||||
import qualified Data.HashMap.Strict as HMap
|
||||
import qualified Data.HashSet as HSet
|
||||
import Data.Maybe (fromJust, isJust)
|
||||
|
@ -611,6 +612,33 @@ fetchTagPosts serv = forever $ do
|
|||
pure ()
|
||||
|
||||
|
||||
-- TODO: paralellelisation
|
||||
-- TODO: make sure it doesn't busy-wait
|
||||
relayWorker :: PostService d -> IO ()
|
||||
relayWorker serv = forever $ do
|
||||
subscriptionMap <- readTVarIO $ subscribers serv
|
||||
-- for each tag, try to deliver some posts to subscriber
|
||||
forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do
|
||||
subscriberMap <- readTVarIO subscriberMapSTM
|
||||
forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do
|
||||
postsToDeliver <- readUpTo 500 postChan
|
||||
response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
|
||||
-- so far just dropping failed attempts, TODO: retry mechanism
|
||||
-- TODO: stats
|
||||
pure ()
|
||||
)
|
||||
)
|
||||
where
|
||||
readUpTo :: Int -> TChan a -> IO [a]
|
||||
readUpTo 0 _ = pure []
|
||||
readUpTo n chan = do
|
||||
readFromChan <- atomically (tryReadTChan chan)
|
||||
case readFromChan of
|
||||
Nothing -> pure []
|
||||
Just val -> do
|
||||
moreReads <- readUpTo (pred n) chan
|
||||
pure (val:moreReads)
|
||||
|
||||
-- ======= statistics/measurement and logging =======
|
||||
|
||||
data StatsEventType = PostPublishEvent
|
||||
|
@ -713,8 +741,8 @@ evaluateStats timeInterval summedStats =
|
|||
-- first sum all event numbers, then divide through number of seconds passed to
|
||||
-- get rate per second
|
||||
RelayStats
|
||||
{ relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats
|
||||
, relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats
|
||||
{ relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
|
||||
, relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
|
||||
, postPublishRate = postPublishRate summedStats / intervalSeconds
|
||||
, postFetchRate = postFetchRate summedStats / intervalSeconds
|
||||
}
|
||||
|
|
|
@ -25,6 +25,29 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
|
|||
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
|
||||
show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
|
||||
|
||||
|
||||
instance (Bounded k, Ord k) => Functor (RingMap k) where
|
||||
-- | map a function over all payload values of a 'RingMap'
|
||||
fmap f = RingMap . Map.map traversingF . getRingMap
|
||||
where
|
||||
traversingF (KeyEntry a) = KeyEntry (f a)
|
||||
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
|
||||
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
|
||||
|
||||
|
||||
instance (Bounded k, Ord k) => Foldable (RingMap k) where
|
||||
foldr f initVal = Map.foldr traversingFR initVal . getRingMap
|
||||
where
|
||||
traversingFR (KeyEntry a) acc = f a acc
|
||||
traversingFR (ProxyEntry _ Nothing) acc = acc
|
||||
traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc
|
||||
foldl f initVal = Map.foldl traversingFL initVal . getRingMap
|
||||
where
|
||||
traversingFL acc (KeyEntry a) = f acc a
|
||||
traversingFL acc (ProxyEntry _ Nothing) = acc
|
||||
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
|
||||
|
||||
|
||||
-- | entry of a 'RingMap' that holds a value and can also
|
||||
-- wrap around the lookup direction at the edges of the name space.
|
||||
data RingEntry k a = KeyEntry a
|
||||
|
@ -247,14 +270,3 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
|
|||
-> RingMap k a
|
||||
-> [a]
|
||||
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
|
||||
|
||||
|
||||
-- | map a function over all payload values of a 'RingMap'
|
||||
mapRMap :: (Bounded k, Ord k, Num k)
|
||||
=> (a -> b) -> RingMap k a -> RingMap k b
|
||||
mapRMap f = RingMap . Map.map traversingF . getRingMap
|
||||
where
|
||||
--traversingF :: RingEntry k a -> RingEntry k b
|
||||
traversingF (KeyEntry a) = KeyEntry (f a)
|
||||
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
|
||||
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
|
||||
|
|
Loading…
Reference in a new issue