Hash2Pub/src/Hash2Pub/PostService.hs

885 lines
40 KiB
Haskell

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeOperators #-}
module Hash2Pub.PostService where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, unless,
void, when)
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.DList as D
import Data.Either (lefts, rights)
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromJust, isJust)
import Data.String (fromString)
import Data.Text.Lazy (Text)
import qualified Data.Text.Lazy as Txt
import qualified Data.Text.Lazy.IO as TxtI
import Data.Text.Normalize (NormalizationMode (NFC), normalize)
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import System.IO
import System.Random
import Text.Read (readEither)
import Formatting (fixed, format, int, (%))
import qualified Network.Wai.Handler.Warp as Warp
import Servant
import Servant.Client
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService.API
import Hash2Pub.RingMap
import Hash2Pub.Utils
import Debug.Trace
data PostService d = PostService
{ serviceConf :: ServiceConf
-- queues, other data structures
, baseDHT :: (DHT d) => d
, serviceThread :: TVar ThreadId
, subscribers :: TVar RelayTags
-- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
-- ^ queue of posts to be fetched
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
, statsQueue :: TQueue StatsEvent
, loadStats :: TVar RelayStats
-- ^ current load stats, replaced periodically
, logFileHandle :: Handle
}
deriving (Typeable)
type Hashtag = Text
type PostID = Text
type PostContent = Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
type TagSubscribersSTM = TVar TagSubscribers
-- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag
-- + an expiration timestamp
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
instance DHT d => Service PostService d where
-- | initialise 'PostService' data structures and run server
runService conf dht = do
-- create necessary TVars
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap
ownSubsVar <- newTVarIO HMap.empty
--ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
statsQueue' <- newTQueueIO
loadStats' <- newTVarIO emptyStats
loggingFile <- openFile (confLogfilePath conf) WriteMode
hSetBuffering loggingFile LineBuffering
let
thisService = PostService
{ serviceConf = conf
, baseDHT = dht
, serviceThread = threadVar
, subscribers = subscriberVar
, ownSubscriptions = ownSubsVar
--, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
, statsQueue = statsQueue'
, loadStats = loadStats'
, logFileHandle = loggingFile
}
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- log a start message, this also truncates existing files
TxtI.hPutStrLn loggingFile $ Txt.unlines
[ "# Starting mock relay implementation"
, "#time stamp ; relay receive rate ;relay delivery rate ;instance publish rate ;instance fetch rate ;total subscriptions"
]
-- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well.
servThreadID <- forkIO $
concurrently_
-- web server
(Warp.runSettings warpSettings $ postServiceApplication thisService)
$ concurrently
-- background processing workers
(launchWorkerThreads thisService)
-- statistics/ measurements
(launchStatsThreads thisService)
-- update thread ID after fork
atomically $ writeTVar threadVar servThreadID
pure thisService
getListeningPortFromService = fromIntegral . confServicePort . serviceConf
migrateData = clientDeliverSubscriptions
waitForMigrationFrom serv fromID = do
migrationSynchroniser <- atomically $ do
syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv)
maybe
-- decision: this function blocks until it gets an incoming migration from given ID
retry
pure
syncPoint
-- block until migration finished
takeMVar migrationSynchroniser
-- | return a WAI application
postServiceApplication :: DHT d => PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
-- ========= constants ===========
placeholderPost :: Text
placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
postServer :: DHT d => PostService d -> Server PostServiceAPI
postServer service = relayInbox service
:<|> subscriptionDelivery service
:<|> postFetch service
:<|> postMultiFetch service
:<|> postInbox service
:<|> tagDelivery service
:<|> tagSubscribe service
:<|> tagUnsubscribe service
-- | delivery endpoint: receive posts of a handled tag and enqueue them for relaying
relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent
relayInbox serv tag posts = do
let
-- skip checking whether the post actually contains the tag, just drop full post
postIDs = head . Txt.splitOn "," <$> Txt.lines posts
-- if tag is not in own responsibility, return a 410 Gone
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag)
if responsible
then pure ()
else
throwError $ err410 { errBody = "Relay is not responsible for this tag"}
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
maybe
-- if noone subscribed to the tag, nothing needs to be done
(pure ())
-- otherwise enqueue posts into broadcast queue of the tag
(\queue -> do
liftIO $ forM_ postIDs (atomically . writeTChan queue)
-- report the received post for statistic purposes
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayReceiveEvent (length postIDs) (hashtagToId tag)
)
broadcastChan
pure NoContent
-- exception to be thrown when a tag is not in the responsibility of a relay
newtype UnhandledTagException = UnhandledTagException String
deriving (Show, Typeable)
instance Exception UnhandledTagException
-- | delivery endpoint: receives a list of subscribers of tags and their outstanding queues for migration
subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text
subscriptionDelivery serv senderID subList = do
let
tagSubs = Txt.lines subList
-- signal that the migration is in progress
syncMVar <- liftIO newEmptyMVar
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.insert (fromInteger senderID) syncMVar
-- In favor of having the convenience of rolling back the transaction once a
-- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad.
res <- liftIO . atomically $ (foldM (\_ tag' -> do
responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag')
if responsible
then processTag (subscribers serv) tag'
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
pure $ Right ()
) (pure ()) tagSubs
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
-- TODO: potentially log this
:: STM (Either String ()))
-- TODO: should this always signal migration finished to avoid deadlocksP
liftIO $ putMVar syncMVar () -- wakes up waiting thread
-- allow response to be completed independently from waiting thread
_ <- liftIO . forkIO $ do
putMVar syncMVar () -- blocks until waiting thread has resumed
-- delete this migration from ongoing ones
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.delete (fromInteger senderID)
case res of
Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure ""
-- TODO: check and only accept tags in own (future?) responsibility
where
processTag :: TVar RelayTags -> Text -> STM ()
processTag subscriberSTM tagData = do
let
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
-- ignore checking of lease time
leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer)
sub = read . Txt.unpack $ subText :: (String, Int)
postList = Txt.words posts
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
-- | endpoint for fetching a post by its ID
postFetch :: PostService d -> Text -> Handler Text
postFetch serv _ = do
-- decision: for saving memory do not store published posts, just
-- pretend there is a post for each requested ID
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent 1 0 -- tag fetched for is irrelevant
pure placeholderPost
-- | endpoint for fetching multiple posts of this instance by their IDs
postMultiFetch :: PostService d -> Text -> Handler Text
postMultiFetch serv postIDs = do
let
idList = Txt.lines postIDs
-- decision: for saving memory do not store published posts, just
-- pretend there is a post for each requested ID
response = foldl (\response' _ ->
placeholderPost <> "\n" <> response'
) "" idList
liftIO . atomically . writeTQueue (statsQueue serv) $ StatsEvent IncomingPostFetchEvent (length idList) 0 -- tag fetched for is irrelevant
pure response
-- | delivery endpoint: inbox for initially publishing a post at an instance
postInbox :: PostService d -> Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- decision: for saving memory do not store published post IDs, just deliver a post for any requested ID
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
-- | delivery endpoint: receive postIDs of a certain subscribed hashtag
tagDelivery :: PostService d -> Text -> Text -> Handler Text
tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (hashtagToId hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags
pure ()
pure $ "Received a postID for tag " <> hashtag
-- | receive subscription requests to a handled hashtag
tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer
tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
-- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" }
else pure ()
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
now <- liftIO getPOSIXTime
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
--liftIO . putStrLn $ "just got a subscription to " <> Txt.unpack hashtag
pure $ round leaseTime
-- | receive and handle unsubscription requests regarding a handled tag
tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text
tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible
-- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" }
else pure ()
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req)
pure "bye bye"
-- client/ request functions
clientAPI :: Proxy PostServiceAPI
clientAPI = Proxy
relayInboxClient
:<|> subscriptionDeliveryClient
:<|> postFetchClient
:<|> postMultiFetchClient
:<|> postInboxClient
:<|> tagDeliveryClient
:<|> tagSubscribeClient
:<|> tagUnsubscribeClient
= client clientAPI
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
-- and their outstanding delivery queue to another instance.
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
clientDeliverSubscriptions :: PostService d
-> NodeID -- ^ sender node ID
-> NodeID -- ^ fromTag
-> NodeID -- ^ toTag
-> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag interval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
subscriberData <- foldM (\response (subSTM, _, tag) -> do
subMap <- readTVarIO subSTM
thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do
-- duplicate the pending queue to work on a copy, in case of a delivery error
pending <- atomically $ do
queueCopy <- cloneTChan subChan
channelGetAll queueCopy
if null pending
then pure tagResponse
else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n"
)
""
(HMap.toList subMap)
pure $ thisTagsData <> response
)
""
intervalTags
-- send subscribers
resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
-- on failure return a Left, otherwise delete subscription entry
case resp of
Left err -> pure . Left . show $ err
Right _ -> do
atomically $
modifyTVar' (subscribers serv) $ \tagMap ->
foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags)
pure . Right $ ()
where
channelGetAll :: TChan a -> STM [a]
channelGetAll chan = channelGetAll' chan []
channelGetAll' :: TChan a -> [a] -> STM [a]
channelGetAll' chan acc = do
haveRead <- tryReadTChan chan
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
-- | Subscribe the client to the given hashtag. On success it returns the given lease time,
-- but also records the subscription in its own data structure.
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
clientSubscribeTo serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
doSubscribe lookupRes True
where
doSubscribe lookupResponse allowRetry = maybe
(pure . Left $ "No node found")
(\(foundHost, foundPort) -> do
let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer))
resp <- runClientM (tagSubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
case resp of
Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
--putStrLn $ "failed subscribing to " <> Txt.unpack tag <> " on " <> foundHost
doSubscribe newRes False
Left err -> pure . Left . show $ err
Right lease -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
--putStrLn $ "just subscribed to " <> Txt.unpack tag <> " on " <> foundHost
pure . Right $ lease
)
lookupResponse
-- | Unsubscribe the client from the given hashtag.
clientUnsubscribeFrom :: DHT d => PostService d -> Hashtag -> IO (Either String ())
clientUnsubscribeFrom serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe lookupRes True
where
doUnsubscribe lookupResponse allowRetry = maybe
(pure . Left $ "No node found")
(\(foundHost, foundPort) -> do
let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer))
resp <- runClientM (tagUnsubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) ""))
case resp of
Left (FailureResponse _ fresp)
|(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe newRes False
Left err -> pure . Left . show $ err
Right _ -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag)
pure . Right $ ()
)
lookupResponse
-- | publish a new post to the inbox of a specified relay instance. This
-- instance will then be the originating instance of the post and will forward
-- the post to the responsible relays.
-- As the initial publishing isn't done by a specific relay (but *to* a specific relay
-- instead), the function does *not* take a PostService as argument.
clientPublishPost :: HTTP.Manager -- ^ for better performance, a shared HTTP manager has to be provided
-> String -- ^ hostname
-> Int -- ^ port
-> PostContent -- ^ post content
-> IO (Either String ()) -- ^ error or success
clientPublishPost httpman hostname port postC = do
resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port ""))
pure . bimap show (const ()) $ resp
-- currently this is unused code
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
getClients hostname' port' httpMan = hoistClient clientAPI
(fmap (either (error . show) id)
. flip runClientM clientEnv
)
(client clientAPI)
where
clientEnv = mkClientEnv httpMan (BaseUrl Http hostname' port' "")
-- ======= data structure manipulations =========
-- | Write all pending posts of a subscriber-tag-combination to its queue.
-- Sets up all necessary data structures if they are still missing.
enqueueSubscription :: TVar RelayTags -- tag-subscriber map
-> Hashtag -- hashtag of pending posts
-> (String, Int) -- subscriber's connection information
-> [PostID] -- pending posts
-> POSIXTime -- lease expiry time
-> STM ()
enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
-- get the tag output queue and, if necessary, create it
subChan <- setupSubscriberChannel tagMapSTM tag subscriber leaseTime
forM_ posts (writeTChan subChan)
-- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
-- If the queue doesn't exist yet, all necessary data structures are set up accordingly.
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map
broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do
-- otherwise use the existing subscriber map
foundSubMap <- readTVar foundSubMapSTM
case HMap.lookup subscriber foundSubMap of
Nothing -> do
-- for new subscribers, create new output channel
tagOutChan <- dupTChan broadcastChan
writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap
pure tagOutChan
-- existing subscriber's channels are just returned
Just (tagOutChan, _) -> pure tagOutChan
-- | deletes a subscription from the passed subscriber map
deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM ()
deleteSubscription tagMapSTM tag subscriber = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
-- no subscribers to that tag, just return
Nothing -> pure ()
Just (foundSubMapSTM, _, _) -> do
foundSubMap <- readTVar foundSubMapSTM
let newSubMap = HMap.delete subscriber foundSubMap
-- if there are no subscriptions for the tag anymore, remove its
-- data sttructure altogether
if HMap.null newSubMap
then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap
-- otherwise just remove the subscription of that node
else writeTVar foundSubMapSTM newSubMap
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv
case lookupTagSubscriptions tag tagMap of
Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM
if HMap.null subscriberMap
then pure Nothing
else pure (Just broadcastChan)
-- | look up the subscription data of a tag
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (hashtagToId tag)
-- normalise the unicode representation of a string to NFC and convert to lower case
normaliseTag :: Text -> Text
normaliseTag = Txt.toLower . Txt.fromStrict . normalize NFC . Txt.toStrict
-- | convert a hashtag to its representation on the DHT
hashtagToId :: Hashtag -> NodeID
hashtagToId = genKeyID . Txt.unpack
readUpToTChan :: Int -> TChan a -> STM [a]
readUpToTChan 0 _ = pure []
readUpToTChan n chan = do
readFromChan <- tryReadTChan chan
case readFromChan of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTChan (pred n) chan
pure (val:moreReads)
readUpToTQueue :: Int -> TQueue a -> STM [a]
readUpToTQueue 0 _ = pure []
readUpToTQueue n q = do
readFromQueue <- tryReadTQueue q
case readFromQueue of
Nothing -> pure []
Just val -> do
moreReads <- readUpToTQueue (pred n) q
pure (val:moreReads)
-- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where
mimeRender _ = BSUL.fromString . show
instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
mimeUnrender _ = readEither . BSUL.toString
-- ====== worker threads ======
-- TODO: make configurable
numParallelDeliveries = 10
launchWorkerThreads :: DHT d => PostService d -> IO ()
launchWorkerThreads serv = concurrently_
(processIncomingPosts serv)
$ concurrently_
(purgeSubscriptionsThread serv)
$ concurrently_
(fetchTagPosts serv)
(relayWorker serv)
-- | periodically remove expired subscription entries from relay subscribers
purgeSubscriptionsThread :: PostService d -> IO ()
purgeSubscriptionsThread serv = forever $ do
-- read config
now <- getPOSIXTime
let
purgeInterval = confSubscriptionExpiryTime (serviceConf serv) / 10
-- no need to atomically lock this, as newly incoming subscriptions do not
-- need to be purged
tagMap <- readTVarIO $ subscribers serv
forM_ tagMap $ \(subscriberMapSTM, _, _) ->
-- but each subscriberMap needs to be modified atomically
atomically . modifyTVar' subscriberMapSTM $ HMap.filter (\(_, ts) -> ts > now)
threadDelay $ fromEnum purgeInterval `div` 10^6
-- | process the pending relay inbox of incoming posts from the internal queue:
-- Look up responsible relay node for given hashtag and forward post to it
processIncomingPosts :: DHT d => PostService d -> IO ()
processIncomingPosts serv = forever $ do
-- blocks until available
deliveriesToProcess <- atomically $ do
readResult <- readUpToTQueue numParallelDeliveries $ relayInQueue serv
if null readResult
then retry
else pure readResult
runningJobs <- forM deliveriesToProcess $ \(tag, pID, pContent) -> async $ do
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of
-- no vserver active => wait and retry
Nothing -> threadDelay (10 * 10^6) >> pure (Left "no vserver active")
Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of
Left err -> do
-- 410 error indicates outdated responsibility mapping
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
-- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
pure . Left $ "Error: " <> show err
Right _ -> do
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 5 minutes, (re)subscribe to tag
when (maybe True (\subLease -> now - subLease < 300) subscriptionStatus) $
void $ clientSubscribeTo serv tag
-- for evaluation, return the tag of the successfully forwarded post
pure $ Right tag
-- collect async results
results <- mapM waitCatch runningJobs
-- report the count of published posts for statistics
atomically . writeTQueue (statsQueue serv) $ StatsEvent PostPublishEvent (length . rights $ results) 0 -- hashtag published to doesn't matter
pure ()
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID
fetchTagPosts :: DHT d => PostService d -> IO ()
fetchTagPosts serv = forever $ do
-- blocks until available
-- TODO: batching, retry
-- TODO: process multiple in parallel
pIdUri <- atomically . readTQueue $ postFetchQueue serv
fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri
resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString))
case resp of
Right response ->
-- TODO error handling, retry
--if HTTPT.statusCode (HTTP.responseStatus response) == 200
-- then
-- -- success, TODO: statistics
-- else
pure ()
Left _ ->
-- TODO error handling, retry
pure ()
relayWorker :: PostService d -> IO ()
relayWorker serv = forever $ do
-- atomically (to be able to retry) fold a list of due delivery actions
jobsToProcess <- atomically $ do
subscriptionMap <- readTVar $ subscribers serv
jobList <- D.toList <$> foldM (\jobAcc (subscriberMapSTM, _, tag) -> do
subscriberMap <- readTVar subscriberMapSTM
foldM (\jobAcc' ((subHost, subPort), (postChan, _)) -> do
postsToDeliver <- readUpToTChan 500 postChan
let postDeliveryAction = runClientM (tagDeliveryClient tag (Txt.unlines postsToDeliver)) (mkClientEnv (httpMan serv) (BaseUrl Http subHost (fromIntegral subPort) ""))
-- append relay push job to job list
pure $ if not (null postsToDeliver)
then jobAcc' `D.snoc` (do
deliveryResult <- postDeliveryAction
either
(const $ pure ())
-- on successful push, record that event for statistics
(const . atomically . writeTQueue (statsQueue serv) $ StatsEvent RelayDeliveryEvent (length postsToDeliver) (hashtagToId tag))
deliveryResult
pure deliveryResult
)
else jobAcc'
) jobAcc $ HMap.toList subscriberMap
) D.empty subscriptionMap
-- if no relay jobs, then retry
if null jobList
then retry
else pure jobList
-- when processing the list, send several deliveries in parallel
forM_ (chunksOf numParallelDeliveries jobsToProcess) $ \jobset -> do
runningJobs <- mapM async jobset
-- so far just dropping failed attempts, TODO: retry mechanism
results <- mapM waitCatch runningJobs
let
successfulResults = rights results
unsuccessfulResults = lefts results
unless (null unsuccessfulResults) $ putStrLn ("ERR: " <> show (length unsuccessfulResults) <> " failed deliveries!")
putStrLn $ "successfully relayed " <> show (length successfulResults)
pure ()
-- ======= statistics/measurement and logging =======
data StatsEventType = PostPublishEvent
| RelayReceiveEvent
| RelayDeliveryEvent
| IncomingPostFetchEvent
deriving (Enum, Show, Eq)
-- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key
data StatsEvent = StatsEvent StatsEventType Int NodeID
deriving (Show, Eq)
-- | measured rates of relay performance
-- TODO: maybe include other metrics in here as well, like number of subscribers?
data RelayStats = RelayStats
{ relayReceiveRates :: RingMap NodeID Double
-- ^ rate of incoming posts in the responsibility of this relay
, relayDeliveryRates :: RingMap NodeID Double
-- ^ rate of relayed outgoing posts
, postFetchRate :: Double -- no need to differentiate between tags
-- ^ number of post-fetches delivered
, postPublishRate :: Double
-- ^ rate of initially publishing posts through this instance
}
deriving (Show, Eq)
launchStatsThreads :: PostService d -> IO ()
launchStatsThreads serv = do
-- create shared accumulator
sharedAccum <- newTVarIO emptyStats
concurrently_
(accumulateStatsThread sharedAccum $ statsQueue serv)
(evaluateStatsThread serv sharedAccum)
-- | Read stats events from queue and add them to a shared accumulator.
-- Instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result.
accumulateStatsThread :: TVar RelayStats -> TQueue StatsEvent -> IO ()
accumulateStatsThread statsAccumulator statsQ = forever $ do
-- blocks until stats event arrives
event <- atomically $ readTQueue statsQ
-- add the event number to current accumulator
atomically $ modifyTVar' statsAccumulator $ statsAdder event
-- | add incoming stats events to accumulator value
statsAdder :: StatsEvent -> RelayStats -> RelayStats
statsAdder event stats = case event of
StatsEvent PostPublishEvent num _ ->
stats {postPublishRate = fromIntegral num + postPublishRate stats}
StatsEvent RelayReceiveEvent num key ->
stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)}
StatsEvent RelayDeliveryEvent num key ->
stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)}
StatsEvent IncomingPostFetchEvent num _ ->
stats {postFetchRate = fromIntegral num + postFetchRate stats}
where
sumIfEntryExists = addRMapEntryWith (\newVal oldVal ->
let toInsert = fromJust $ extractRingEntry newVal
in
case oldVal of
KeyEntry n -> KeyEntry (n + toInsert)
ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert))
ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal)
_ -> error "RingMap nested too deeply"
)
-- Periodically exchange the accumulated statistics with empty ones, evaluate them
-- and make them the current statistics of the service.
evaluateStatsThread :: PostService d -> TVar RelayStats -> IO ()
evaluateStatsThread serv statsAcc = getPOSIXTime >>= loop
where
loop previousTs = do
threadDelay $ confStatsEvalDelay (serviceConf serv)
-- get and reset the stats accumulator
summedStats <- atomically $ do
stats <- readTVar statsAcc
writeTVar statsAcc emptyStats
pure stats
-- as the transaction might retry several times, current time needs to
-- be read afterwards
now <- getPOSIXTime
-- evaluate stats rate and replace server stats
-- persistently store in a TVar so it can be retrieved later by the DHT
let timePassed = (now - previousTs) * fromIntegral (confSpeedupFactor $ serviceConf serv)
rateStats = evaluateStats timePassed summedStats
atomically $ writeTVar (loadStats serv) rateStats
-- and now what? write a log to file
-- format: total relayReceiveRates;total relayDeliveryRates;postFetchRate;postPublishRate; subscriberSum
-- later: current (reported) load, target load
subscriberSum <- sumSubscribers
TxtI.hPutStrLn (logFileHandle serv) $
format (fixed 9 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % fixed 20 % ";" % int )
(realToFrac now :: Double)
(sum . relayReceiveRates $ rateStats)
(sum . relayDeliveryRates $ rateStats)
(postPublishRate rateStats)
(postFetchRate rateStats)
subscriberSum
loop now
sumSubscribers = do
tagMap <- readTVarIO $ subscribers serv
foldM (\subscriberSum (subscriberMapSTM, _, _) -> do
subscriberMap <- readTVarIO subscriberMapSTM
pure $ subscriberSum + HMap.size subscriberMap
)
0 tagMap
-- | Evaluate the accumulated statistic events: Currently mostly calculates the event
-- rates by dividing through the collection time frame
evaluateStats :: POSIXTime -> RelayStats -> RelayStats
evaluateStats timeInterval summedStats =
-- first sum all event numbers, then divide through number of seconds passed to
-- get rate per second
RelayStats
{ relayReceiveRates = (/ intervalSeconds) <$> relayReceiveRates summedStats
, relayDeliveryRates = (/ intervalSeconds) <$> relayDeliveryRates summedStats
, postPublishRate = postPublishRate summedStats / intervalSeconds
, postFetchRate = postFetchRate summedStats / intervalSeconds
}
where
intervalSeconds = realToFrac timeInterval
emptyStats :: RelayStats
emptyStats = RelayStats
{ relayReceiveRates = emptyRMap
, relayDeliveryRates = emptyRMap
, postFetchRate = 0
, postPublishRate = 0
}