Compare commits

..

No commits in common. "eee40ce4fb7bd74fd0ebd8b9455a5cab22bcd5e5" and "f5de7601bbef988d82e756d42e3f2197f2646325" have entirely different histories.

2 changed files with 8 additions and 13 deletions

View file

@ -69,7 +69,7 @@ readConfig = do
, confRequestRetries = 3 , confRequestRetries = 3
} }
sConf = ServiceConf sConf = ServiceConf
{ confSubscriptionExpiryTime = 24*3600 / fromIntegral speedup { confSubscriptionExpiryTime = 12*3600 / fromIntegral speedup
, confServicePort = read servicePortString , confServicePort = read servicePortString
, confServiceHost = confDomainString , confServiceHost = confDomainString
, confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log" , confLogfilePath = "../simulationData/logs/" <> confDomainString <> ".log"

View file

@ -12,14 +12,14 @@ import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception (Exception (..), try) import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, unless, import Control.Monad (foldM, forM, forM_, forever, void,
void, when) when)
import Control.Monad.IO.Class (liftIO) import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import qualified Data.DList as D import qualified Data.DList as D
import Data.Either (lefts, rights) import Data.Either (rights)
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, isJust) import Data.Maybe (fromJust, isJust)
@ -57,6 +57,8 @@ data PostService d = PostService
-- ^ for each tag store the subscribers + their queue -- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time -- ^ tags subscribed by the own node have an assigned lease time
--, ownPosts :: TVar (HSet.HashSet Text)
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent) , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously -- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID , postFetchQueue :: TQueue PostID
@ -323,7 +325,6 @@ tagSubscribe serv hashtag origin = do
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv) let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry -- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime _ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
--liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag
pure $ round leaseTime pure $ round leaseTime
@ -426,12 +427,10 @@ clientSubscribeTo serv tag = do
Left (FailureResponse _ fresp) Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup |(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
--putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost
doSubscribe newRes False doSubscribe newRes False
Left err -> pure . Left . show $ err Left err -> pure . Left . show $ err
Right lease -> do Right lease -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease) atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
--putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost
pure . Right $ lease pure . Right $ lease
) )
lookupResponse lookupResponse
@ -736,11 +735,7 @@ relayWorker serv = forever $ do
forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do
runningJobs <- mapM async jobset runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism -- so far just dropping failed attempts, TODO: retry mechanism
results <- mapM waitCatch runningJobs successfulResults <- rights <$> mapM waitCatch runningJobs
let
successfulResults = rights results
unsuccessfulResults = lefts results
unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!")
putStrLn $ "successfully relayed " <> show (length successfulResults) putStrLn $ "successfully relayed " <> show (length successfulResults)
pure () pure ()
@ -834,7 +829,7 @@ evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
-- evaluate stats rate and replace server stats -- evaluate stats rate and replace server stats
-- persistently store in a TVar so it can be retrieved later by the DHT -- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv) let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats let rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats atomically $ writeTVar (loadStats serv) rateStats
-- and now what? write a log to file -- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum -- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum