diff --git a/Readme.md b/Readme.md index daf9e38..3c7dbe5 100644 --- a/Readme.md +++ b/Readme.md @@ -14,7 +14,4 @@ The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`. The project and its developent environment are built with [Nix](https://nixos.org/nix/). -The development environment can be entered with `nix-shell shell-minimal.nix`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build" shell-minimal.nix` to do both steps at once. - -While the `shell-minimal.nix` environment contains everything necessary for building and testing this project, the `shell.nix` additionally contains the Haskell IDE engine *hie* and the documentation for all used Haskell packages for more convenient development. -Be aware that these need to be build from source and can take a very long time to build. +The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once. diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index baa2b70..81cf552 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -12,16 +12,16 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM_, forever, void, when) +import Control.Monad (foldM, forM, forM_, forever, void, + when) import Control.Monad.IO.Class (liftIO) import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (isJust, fromJust) +import Data.Maybe (fromMaybe, isJust) import Data.String (fromString) -import Data.Text.Lazy (Text) import qualified Data.Text.Lazy as Txt import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX @@ -36,7 +36,6 @@ import Servant import Servant.Client import Hash2Pub.FediChordTypes -import Hash2Pub.PostService.API import Hash2Pub.RingMap @@ -49,20 +48,19 @@ data PostService d = PostService -- ^ for each tag store the subscribers + their queue , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time - , ownPosts :: TVar (HSet.HashSet Text) + , ownPosts :: TVar (HSet.HashSet Txt.Text) -- ^ just store the existence of posts for saving memory, , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously , postFetchQueue :: TQueue PostID , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) , httpMan :: HTTP.Manager - , statsQueue :: TQueue StatsEvent } deriving (Typeable) -type Hashtag = Text -type PostID = Text -type PostContent = Text +type Hashtag = Txt.Text +type PostID = Txt.Text +type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) @@ -85,10 +83,9 @@ instance DHT d => Service PostService d where postFetchQueue' <- newTQueueIO migrationsInProgress' <- newTVarIO HMap.empty httpMan' <- HTTP.newManager HTTP.defaultManagerSettings - statsQueue' <- newTQueueIO let - thisService = PostService - { serviceConf = conf + thisService = PostService { + serviceConf = conf , baseDHT = dht , serviceThread = threadVar , subscribers = subscriberVar @@ -98,8 +95,7 @@ instance DHT d => Service PostService d where , postFetchQueue = postFetchQueue' , migrationsInProgress = migrationsInProgress' , httpMan = httpMan' - , statsQueue = statsQueue' - } + } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings -- Run 'concurrently_' from another thread to be able to return the @@ -135,13 +131,38 @@ postServiceApplication :: DHT d => PostService d -> Application postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv +-- | needed for guiding type inference +exposedPostServiceAPI :: Proxy PostServiceAPI +exposedPostServiceAPI = Proxy + -- ========= constants =========== -placeholderPost :: Text +placeholderPost :: Txt.Text placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= +type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent + -- delivery endpoint at responsible relay for delivering posts of $tag for distribution + :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text + -- endpoint for delivering the subscriptions and outstanding queue + :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text + -- fetch endpoint for posts, full post ID is http://$domain/post/$postid + :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text + -- endpoint for fetching multiple posts at once + :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent + -- delivery endpoint of newly published posts of the relay's instance + :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text + -- delivery endpoint for posts of $tag at subscribing instance + :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer + -- endpoint for subscribing the instance specified in + -- the Origin header to $hashtag. + -- Returns subscription lease time in seconds. + :<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text + -- endpoint for unsubscribing the instance specified in + -- the Origin header to $hashtag + + postServer :: DHT d => PostService d -> Server PostServiceAPI postServer service = relayInbox service :<|> subscriptionDelivery service @@ -153,7 +174,7 @@ postServer service = relayInbox service :<|> tagUnsubscribe service -relayInbox :: DHT d => PostService d -> Hashtag -> Text -> Handler NoContent +relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent relayInbox serv tag posts = do let -- skip checking whether the post actually contains the tag, just drop full post @@ -181,7 +202,7 @@ newtype UnhandledTagException = UnhandledTagException String instance Exception UnhandledTagException -subscriptionDelivery :: DHT d => PostService d -> Integer -> Text -> Handler Text +subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text subscriptionDelivery serv senderID subList = do let tagSubs = Txt.lines subList @@ -215,7 +236,7 @@ subscriptionDelivery serv senderID subList = do Right _ -> pure "" -- TODO: check and only accept tags in own (future?) responsibility where - processTag :: TVar RelayTags -> Text -> STM () + processTag :: TVar RelayTags -> Txt.Text -> STM () processTag subscriberSTM tagData = do let tag:subText:lease:posts:_ = Txt.splitOn "," tagData @@ -226,7 +247,7 @@ subscriptionDelivery serv senderID subList = do enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime -postFetch :: PostService d -> Text -> Handler Text +postFetch :: PostService d -> Txt.Text -> Handler Txt.Text postFetch serv postID = do postSet <- liftIO . readTVarIO . ownPosts $ serv if HSet.member postID postSet @@ -235,7 +256,7 @@ postFetch serv postID = do else throwError $ err404 { errBody = "No post found with this ID" } -postMultiFetch :: PostService d -> Text -> Handler Text +postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text postMultiFetch serv postIDs = do let idList = Txt.lines postIDs postSet <- liftIO . readTVarIO . ownPosts $ serv @@ -247,7 +268,7 @@ postMultiFetch serv postIDs = do ) "" idList -postInbox :: PostService d -> Text -> Handler NoContent +postInbox :: PostService d -> Txt.Text -> Handler NoContent postInbox serv post = do -- extract contained hashtags let @@ -257,13 +278,13 @@ postInbox serv post = do -- add ID to own posts liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) -- enqueue a relay job for each tag - liftIO $ forM_ (containedTags :: [Text]) (\tag -> + liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) ) pure NoContent -tagDelivery :: PostService d -> Text -> Text -> Handler Text +tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = do let postIDs = Txt.lines posts subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv @@ -274,7 +295,7 @@ tagDelivery serv hashtag posts = do pure () pure $ "Received a postID for tag " <> hashtag -tagSubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Integer +tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = do responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible @@ -293,7 +314,7 @@ tagSubscribe serv hashtag origin = do pure $ round leaseTime -tagUnsubscribe :: DHT d => PostService d -> Text -> Maybe Text -> Handler Text +tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe serv hashtag origin = do responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible @@ -313,15 +334,8 @@ tagUnsubscribe serv hashtag origin = do clientAPI :: Proxy PostServiceAPI clientAPI = Proxy -relayInboxClient :: Text -> Text -> ClientM NoContent -relayInboxClient :<|> subscriptionDeliveryClient - :<|> postFetchClient - :<|> postMultiFetchClient - :<|> postInboxClient - :<|> tagDeliveryClient - :<|> tagSubscribeClient - :<|> tagUnsubscribeClient - = client clientAPI + +relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI -- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag] @@ -529,7 +543,7 @@ lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) -- normalise the unicode representation of a string to NFC -normaliseTag :: Text -> Text +normaliseTag :: Txt.Text -> Txt.Text normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict @@ -587,7 +601,7 @@ fetchTagPosts serv = forever $ do -- TODO: batching, retry -- TODO: process multiple in parallel pIdUri <- atomically . readTQueue $ postFetchQueue serv - fetchReq <- HTTP.parseRequest . Txt.unpack $ pIdUri + fetchReq <- HTTP.parseRequest . Txt.unpack $pIdUri resp <- try $ HTTP.httpLbs fetchReq (httpMan serv) :: IO (Either HTTP.HttpException (HTTP.Response BSUL.ByteString)) case resp of Right response -> @@ -602,98 +616,3 @@ fetchTagPosts serv = forever $ do -- TODO error handling, retry pure () - --- ======= statistics/measurement and logging ======= - -data StatsEventType = PostPublishEvent - -- ^ initial publishing of a post by an instance - | RelayReceiveEvent - -- ^ receiving of posts because of being the responsible relay, for estimation of \tau_t - -- TODO: record for which hashtag - | RelayDeliveryEvent - -- ^ delivering (or at least attempt) a post to a subscriber - | IncomingPostFetchEvent - -- ^ another instance fetches a post from this instance - deriving (Enum, Show, Eq) - --- | Represents measurement event of a 'StatsEventType' with a count relevant for a certain key -data StatsEvent = StatsEvent StatsEventType Int NodeID - --- TODO: make delay configurable -statsAccuDelay = 300000 - --- | periodically flush the stats queue and accumulate all events inside -accumulateStatsThread :: TQueue StatsEvent -> IO () -accumulateStatsThread statsQ = getPOSIXTime >>= flushLoop - where - flushLoop previousRun = do - now <- getPOSIXTime - -- TODO: instead of letting the events accumulate in the queue and allocate linear memory, immediately fold the result - -- but how to achieve the periodicity when blocking on a queue? - -- idea: let another thread periodically exchange the RelayStats, modify it atomically (Konzept "unterm Arsch wegziehen") - threadDelay statsAccuDelay - latestEvents <- atomically $ flushTQueue statsQ - -- accumulate the events - -- and now what? write a log to file, probably as a forkIO - -- persistently store in a TVar so it can be retrieved later by the DHT - flushLoop now - - -accumulateStats :: POSIXTime -> [StatsEvent] -> RelayStats -accumulateStats timeInterval events = - -- first sum all event numbers, then divide through number of seconds passed to - -- get rate per second - RelayStats - { relayReceiveRates = mapRMap (/ intervalSeconds) $ relayReceiveRates summedStats - , relayDeliveryRates = mapRMap (/ intervalSeconds) $ relayDeliveryRates summedStats - , postPublishRate = postPublishRate summedStats / intervalSeconds - , postFetchRate = postFetchRate summedStats / intervalSeconds - } - where - intervalSeconds = fromIntegral (fromEnum timeInterval) / 10^12 - summedStats = foldl (\stats event -> case event of - StatsEvent PostPublishEvent num _ -> - stats {postPublishRate = fromIntegral num + postPublishRate stats} - StatsEvent RelayReceiveEvent num key -> - stats {relayReceiveRates = sumIfEntryExists key (fromIntegral num) (relayReceiveRates stats)} - StatsEvent RelayDeliveryEvent num key -> - stats {relayDeliveryRates = sumIfEntryExists key (fromIntegral num) (relayDeliveryRates stats)} - StatsEvent IncomingPostFetchEvent num _ -> - stats {postFetchRate = fromIntegral num + postFetchRate stats} - ) - emptyStats - events - sumIfEntryExists = addRMapEntryWith (\newVal oldVal -> - let toInsert = fromJust $ extractRingEntry newVal - in - case oldVal of - KeyEntry n -> KeyEntry (n + toInsert) - ProxyEntry pointer (Just (KeyEntry n)) -> ProxyEntry pointer (Just (KeyEntry $ n + toInsert)) - ProxyEntry pointer Nothing -> ProxyEntry pointer (Just newVal) - _ -> error "RingMap nested too deeply" - ) - --- idea: first just sum with foldl, and then map the time division over all values - - - -emptyStats :: RelayStats -emptyStats = RelayStats - { relayReceiveRates = emptyRMap - , relayDeliveryRates = emptyRMap - , postFetchRate = 0 - , postPublishRate = 0 - } - --- | measured rates of relay performance --- TODO: maybe include other metrics in here as well, like number of subscribers? -data RelayStats = RelayStats - { relayReceiveRates :: RingMap NodeID Double - -- ^ rate of incoming posts in the responsibility of this relay - , relayDeliveryRates :: RingMap NodeID Double - -- ^ rate of relayed outgoing posts - , postFetchRate :: Double -- no need to differentiate between tags - -- ^ number of post-fetches delivered - , postPublishRate :: Double - -- ^ rate of initially publishing posts through this instance - } diff --git a/src/Hash2Pub/PostService/API.hs b/src/Hash2Pub/PostService/API.hs deleted file mode 100644 index 1484631..0000000 --- a/src/Hash2Pub/PostService/API.hs +++ /dev/null @@ -1,37 +0,0 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE InstanceSigs #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE RankNTypes #-} -{-# LANGUAGE TypeOperators #-} -module Hash2Pub.PostService.API where - -import Data.Text.Lazy (Text) - -import Servant - -type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint at responsible relay for delivering posts of $tag for distribution - :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Text :> PostNoContent '[PlainText] Text - -- endpoint for delivering the subscriptions and outstanding queue - :<|> "post" :> Capture "postid" Text :> Get '[PlainText] Text - -- fetch endpoint for posts, full post ID is http://$domain/post/$postid - :<|> "posts" :> ReqBody '[PlainText] Text :> Post '[PlainText] Text - -- endpoint for fetching multiple posts at once - :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance - :<|> "tags" :> Capture "hashtag" Text :> ReqBody '[PlainText] Text :> PostCreated '[PlainText] Text - -- delivery endpoint for posts of $tag at subscribing instance - :<|> "tags" :> Capture "hashtag" Text :> "subscribe" :> Header "Origin" Text :> Get '[PlainText] Integer - -- endpoint for subscribing the instance specified in - -- the Origin header to $hashtag. - -- Returns subscription lease time in seconds. - :<|> "tags" :> Capture "hashtag" Text :> "unsubscribe" :> Header "Origin" Text :> Get '[PlainText] Text - -- endpoint for unsubscribing the instance specified in - -- the Origin header to $hashtag - --- | needed for guiding type inference -exposedPostServiceAPI :: Proxy PostServiceAPI -exposedPostServiceAPI = Proxy - diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index a5af10c..86825a7 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -1,5 +1,7 @@ module Hash2Pub.ProtocolTypes where +import qualified Data.Map as Map +import Data.Maybe (mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX (POSIXTime) diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index 8416278..e99f8b2 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -23,7 +23,7 @@ instance (Bounded k, Ord k, Eq a) => Eq (RingMap k a) where a == b = getRingMap a == getRingMap b instance (Bounded k, Ord k, Show k, Show a) => Show (RingMap k a) where - show rmap = shows ("RingMap " :: String) (show $ getRingMap rmap) + show rmap = shows "RingMap " (show $ getRingMap rmap) -- | entry of a 'RingMap' that holds a value and can also -- wrap around the lookup direction at the edges of the name space. @@ -133,7 +133,7 @@ rMapLookupPred :: (Bounded k, Ord k, Num k) rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards addRMapEntryWith :: (Bounded k, Ord k) - => (RingEntry k a -> RingEntry k a -> RingEntry k a) -- ^ f new_value mold_value + => (RingEntry k a -> RingEntry k a -> RingEntry k a) -> k -- ^ key -> a -- ^ value -> RingMap k a @@ -207,7 +207,7 @@ takeEntriesUntil_ :: (Integral i, Bounded k, Ord k) -> Maybe i -- possible number limit -> [a] -> [a] -takeEntriesUntil_ _rmap' _getterFunc' _havingReached _previousEntry (Just remaining) takeAcc +takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc -- length limit reached | remaining <= 0 = takeAcc takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc = @@ -247,14 +247,3 @@ takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k) -> RingMap k a -> [a] takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing [] - - --- | map a function over all payload values of a 'RingMap' -mapRMap :: (Bounded k, Ord k, Num k) - => (a -> b) -> RingMap k a -> RingMap k b -mapRMap f = RingMap . Map.map traversingF . getRingMap - where - --traversingF :: RingEntry k a -> RingEntry k b - traversingF (KeyEntry a) = KeyEntry (f a) - traversingF (ProxyEntry pointer (Just entry)) = ProxyEntry pointer (Just $ traversingF entry) - traversingF (ProxyEntry pointer Nothing) = ProxyEntry pointer Nothing