Compare commits

...

2 commits

Author SHA1 Message Date
Trolli Schmittlauch 2b39648a77 actually implement simple relaying of posts
was still missing for #41
2020-09-09 11:51:09 +02:00
Trolli Schmittlauch df479982fa make RingMap instance of Functor and Foldable 2020-09-08 08:46:36 +02:00
3 changed files with 53 additions and 14 deletions

View file

@ -587,7 +587,6 @@ sendQueryIdMessages targetID ns lParam targets = do
nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
let srcAddr = confIP nodeConf
-- ToDo: make attempts and timeout configurable
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
)) targets

View file

@ -17,6 +17,7 @@ import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import Data.Either (rights)
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, isJust)
@ -611,6 +612,33 @@ fetchTagPosts serv = forever $ do
pure ()
-- TODO: paralellelisation
-- TODO: make sure it doesn't busy-wait
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
subscriptionMap <- readTVarIO $ subscribers serv
-- for each tag, try to deliver some posts to subscriber
forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVarIO subscriberMapSTM
forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpTo 500 postChan
response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- so far just dropping failed attempts, TODO: retry mechanism
-- TODO: stats
pure ()
)
)
where
readUpTo :: Int -> TChan a -> IO [a]
readUpTo 0 _ = pure []
readUpTo n chan = do
readFromChan <- atomically (tryReadTChan chan)
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpTo (pred n) chan
pure (val:moreReads)
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
@ -713,8 +741,8 @@ evaluateStats timeInterval summedStats =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats
, relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats
{ relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
, relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}

View file

@ -25,6 +25,29 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
instance (Bounded k, Ord k) => Functor (RingMap k) where
-- | map a function over all payload values of a 'RingMap'
fmap f = RingMap . Map.map traversingF . getRingMap
where
traversingF (KeyEntry a) = KeyEntry (f a)
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
instance (Bounded k, Ord k) => Foldable (RingMap k) where
foldr f initVal = Map.foldr traversingFR initVal . getRingMap
where
traversingFR (KeyEntry a) acc = f a acc
traversingFR (ProxyEntry _ Nothing) acc = acc
traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc
foldl f initVal = Map.foldl traversingFL initVal . getRingMap
where
traversingFL acc (KeyEntry a) = f acc a
traversingFL acc (ProxyEntry _ Nothing) = acc
traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
-- | entry of a 'RingMap' that holds a value and can also
-- wrap around the lookup direction at the edges of the name space.
data RingEntry k a = KeyEntry a
@ -247,14 +270,3 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
-> RingMap k a
-> [a]
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
-- | map a function over all payload values of a 'RingMap'
mapRMap :: (Bounded k, Ord k, Num k)
=> (a -> b) -> RingMap k a -> RingMap k b
mapRMap f = RingMap . Map.map traversingF . getRingMap
where
--traversingF :: RingEntry k a -> RingEntry k b
traversingF (KeyEntry a) = KeyEntry (f a)
traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing