Compare commits
	
		
			2 commits
		
	
	
		
			c536994afe
			...
			2b39648a77
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 2b39648a77 | |||
| df479982fa | 
					 3 changed files with 53 additions and 14 deletions
				
			
		| 
						 | 
					@ -587,7 +587,6 @@ sendQueryIdMessages targetID ns lParam targets = do
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
 | 
					          nodeConf <- nodeConfig <$> readTVarIO (parentRealNode ns)
 | 
				
			||||||
          let srcAddr = confIP nodeConf
 | 
					          let srcAddr = confIP nodeConf
 | 
				
			||||||
          -- ToDo: make attempts and timeout configurable
 | 
					 | 
				
			||||||
          queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
 | 
					          queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
 | 
				
			||||||
              sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
 | 
					              sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (lookupMessage targetID ns Nothing)
 | 
				
			||||||
                                                                                                                                   )) targets
 | 
					                                                                                                                                   )) targets
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -17,6 +17,7 @@ import           Control.Monad.IO.Class    (liftIO)
 | 
				
			||||||
import           Data.Bifunctor
 | 
					import           Data.Bifunctor
 | 
				
			||||||
import qualified Data.ByteString.Lazy.UTF8 as BSUL
 | 
					import qualified Data.ByteString.Lazy.UTF8 as BSUL
 | 
				
			||||||
import qualified Data.ByteString.UTF8      as BSU
 | 
					import qualified Data.ByteString.UTF8      as BSU
 | 
				
			||||||
 | 
					import           Data.Either               (rights)
 | 
				
			||||||
import qualified Data.HashMap.Strict       as HMap
 | 
					import qualified Data.HashMap.Strict       as HMap
 | 
				
			||||||
import qualified Data.HashSet              as HSet
 | 
					import qualified Data.HashSet              as HSet
 | 
				
			||||||
import           Data.Maybe                (fromJust, isJust)
 | 
					import           Data.Maybe                (fromJust, isJust)
 | 
				
			||||||
| 
						 | 
					@ -611,6 +612,33 @@ fetchTagPosts serv = forever $ do
 | 
				
			||||||
             pure ()
 | 
					             pure ()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					-- TODO: paralellelisation
 | 
				
			||||||
 | 
					-- TODO: make sure it doesn't busy-wait
 | 
				
			||||||
 | 
					relayWorker :: PostService d -> IO ()
 | 
				
			||||||
 | 
					relayWorker serv = forever $ do
 | 
				
			||||||
 | 
					    subscriptionMap <- readTVarIO $ subscribers serv
 | 
				
			||||||
 | 
					    -- for each tag, try to deliver some posts to subscriber
 | 
				
			||||||
 | 
					    forM_ subscriptionMap (\(subscriberMapSTM, _, tag) -> do
 | 
				
			||||||
 | 
					        subscriberMap <- readTVarIO subscriberMapSTM
 | 
				
			||||||
 | 
					        forM_ (HMap.toList subscriberMap) (\((subHost, subPort), (postChan, _)) -> do
 | 
				
			||||||
 | 
					            postsToDeliver <- readUpTo 500 postChan
 | 
				
			||||||
 | 
					            response <- runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
 | 
				
			||||||
 | 
					            -- so far just dropping failed attempts, TODO: retry mechanism
 | 
				
			||||||
 | 
					            -- TODO: stats
 | 
				
			||||||
 | 
					            pure ()
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					  where
 | 
				
			||||||
 | 
					      readUpTo :: Int -> TChan a -> IO [a]
 | 
				
			||||||
 | 
					      readUpTo 0 _ = pure []
 | 
				
			||||||
 | 
					      readUpTo n chan = do
 | 
				
			||||||
 | 
					          readFromChan <- atomically (tryReadTChan chan)
 | 
				
			||||||
 | 
					          case readFromChan of
 | 
				
			||||||
 | 
					               Nothing -> pure []
 | 
				
			||||||
 | 
					               Just val -> do
 | 
				
			||||||
 | 
					                   moreReads <- readUpTo (pred n) chan
 | 
				
			||||||
 | 
					                   pure (val:moreReads)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- ======= statistics/measurement and logging =======
 | 
					-- ======= statistics/measurement and logging =======
 | 
				
			||||||
 | 
					
 | 
				
			||||||
data StatsEventType = PostPublishEvent
 | 
					data StatsEventType = PostPublishEvent
 | 
				
			||||||
| 
						 | 
					@ -713,8 +741,8 @@ evaluateStats timeInterval summedStats =
 | 
				
			||||||
    -- first sum all event numbers, then divide through number of seconds passed to
 | 
					    -- first sum all event numbers, then divide through number of seconds passed to
 | 
				
			||||||
    -- get rate per second
 | 
					    -- get rate per second
 | 
				
			||||||
    RelayStats
 | 
					    RelayStats
 | 
				
			||||||
    { relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats
 | 
					    { relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
 | 
				
			||||||
    , relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats
 | 
					    , relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
 | 
				
			||||||
    , postPublishRate = postPublishRate summedStats / intervalSeconds
 | 
					    , postPublishRate = postPublishRate summedStats / intervalSeconds
 | 
				
			||||||
    , postFetchRate = postFetchRate summedStats / intervalSeconds
 | 
					    , postFetchRate = postFetchRate summedStats / intervalSeconds
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -25,6 +25,29 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where
 | 
				
			||||||
instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
 | 
					instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where
 | 
				
			||||||
    show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
 | 
					    show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					instance (Bounded k, Ord k) => Functor (RingMap k) where
 | 
				
			||||||
 | 
					    -- | map a function over all payload values of a 'RingMap'
 | 
				
			||||||
 | 
					    fmap f = RingMap . Map.map traversingF . getRingMap
 | 
				
			||||||
 | 
					      where
 | 
				
			||||||
 | 
					          traversingF (KeyEntry a) = KeyEntry (f a)
 | 
				
			||||||
 | 
					          traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
 | 
				
			||||||
 | 
					          traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					instance (Bounded k, Ord k) => Foldable (RingMap k) where
 | 
				
			||||||
 | 
					    foldr f initVal = Map.foldr traversingFR initVal . getRingMap
 | 
				
			||||||
 | 
					      where
 | 
				
			||||||
 | 
					          traversingFR (KeyEntry a) acc = f a acc
 | 
				
			||||||
 | 
					          traversingFR (ProxyEntry _ Nothing) acc = acc
 | 
				
			||||||
 | 
					          traversingFR (ProxyEntry _ (Just entry)) acc = traversingFR entry acc
 | 
				
			||||||
 | 
					    foldl f initVal = Map.foldl traversingFL initVal . getRingMap
 | 
				
			||||||
 | 
					      where
 | 
				
			||||||
 | 
					          traversingFL acc (KeyEntry a) = f acc a
 | 
				
			||||||
 | 
					          traversingFL acc (ProxyEntry _ Nothing) = acc
 | 
				
			||||||
 | 
					          traversingFL acc (ProxyEntry _ (Just entry)) = traversingFL acc entry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | entry of a 'RingMap' that holds a value and can also
 | 
					-- | entry of a 'RingMap' that holds a value and can also
 | 
				
			||||||
-- wrap around the lookup direction at the edges of the name space.
 | 
					-- wrap around the lookup direction at the edges of the name space.
 | 
				
			||||||
data RingEntry k a = KeyEntry a
 | 
					data RingEntry k a = KeyEntry a
 | 
				
			||||||
| 
						 | 
					@ -247,14 +270,3 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
 | 
				
			||||||
                           -> RingMap k a
 | 
					                           -> RingMap k a
 | 
				
			||||||
                           -> [a]
 | 
					                           -> [a]
 | 
				
			||||||
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
 | 
					takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
-- | map a function over all payload values of a 'RingMap'
 | 
					 | 
				
			||||||
mapRMap :: (Bounded k, Ord k, Num k)
 | 
					 | 
				
			||||||
        => (a -> b) -> RingMap k a -> RingMap k b
 | 
					 | 
				
			||||||
mapRMap f = RingMap . Map.map traversingF . getRingMap
 | 
					 | 
				
			||||||
  where
 | 
					 | 
				
			||||||
      --traversingF :: RingEntry k a -> RingEntry k b
 | 
					 | 
				
			||||||
      traversingF (KeyEntry a) = KeyEntry (f a)
 | 
					 | 
				
			||||||
      traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry)
 | 
					 | 
				
			||||||
      traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing
 | 
					 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue