From 50044673a65d0ba0afcc4abc104fa19a70b67757 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 31 Jul 2020 17:46:33 +0200 Subject: [PATCH 1/5] server endpoint for tag-post delivery --- src/Hash2Pub/PostService.hs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index ae122e2..cab4350 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -19,7 +19,7 @@ import Control.Monad.IO.Class (liftIO) import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe) +import Data.Maybe (fromMaybe, isJust) import Data.String (fromString) import qualified Data.Text.Lazy as Txt import Data.Text.Normalize (NormalizationMode (NFC), @@ -48,6 +48,7 @@ data PostService d = PostService -- ^ just store the existence of posts for saving memory, , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously + , postFetchQueue :: TQueue PostID } deriving (Typeable) @@ -73,6 +74,7 @@ instance DHT d => Service PostService d where ownSubsVar <- newTVarIO HMap.empty ownPostVar <- newTVarIO HSet.empty relayInQueue' <- newTQueueIO + postFetchQueue' <- newTQueueIO let thisService = PostService { serviceConf = conf @@ -82,6 +84,7 @@ instance DHT d => Service PostService d where , ownSubscriptions = ownSubsVar , ownPosts = ownPostVar , relayInQueue = relayInQueue' + , postFetchQueue = postFetchQueue' } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings @@ -221,7 +224,15 @@ postInbox serv post = do tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text -tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts +tagDelivery serv hashtag posts = do + let postIDs = Txt.lines posts + subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv + if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions) + then -- TODO: increase a counter/ statistics for received posts of this tag + liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) + else -- silently drop posts from unsubscribed tags + pure () + pure $ "Received a postID for tag " <> hashtag tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = pure 42 @@ -251,7 +262,7 @@ enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID) setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do tagMap <- readTVar tagMapSTM - case lookupRelayTags tag tagMap of + case lookupTagSubscriptions tag tagMap of Nothing -> do -- if no collision/ tag doesn't exist yet, just initialize a -- new subscriber map @@ -277,7 +288,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) getTagBroadcastChannel serv tag = do tagMap <- readTVar $ subscribers serv - case lookupRelayTags tag tagMap of + case lookupTagSubscriptions tag tagMap of Nothing -> pure Nothing Just (subscriberSTM, broadcastChan, _) -> do subscriberMap <- readTVar subscriberSTM @@ -287,8 +298,8 @@ getTagBroadcastChannel serv tag = do -- | look up the subscription data of a tag -lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag) -lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag) +lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a +lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag) -- normalise the unicode representation of a string to NFC From 7d7fa3b52a745d57cefc4fb7f24aac798e39bce4 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 31 Jul 2020 17:49:52 +0200 Subject: [PATCH 2/5] fix haddock parsing --- src/Hash2Pub/PostService.hs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index cab4350..dc2164a 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -120,23 +120,23 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- ^ delivery endpoint of newly published posts of the relay's instance + -- delivery endpoint of newly published posts of the relay's instance :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent - -- ^ endpoint for delivering the subscriptions and outstanding queue + -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text - -- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid + -- fetch endpoint for posts, full post ID is http://$domain/post/$postid :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text - -- ^ endpoint for fetching multiple posts at once + -- endpoint for fetching multiple posts at once :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- ^ delivery endpoint of newly published posts of the relay's instance + -- delivery endpoint of newly published posts of the relay's instance :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text - -- ^ delivery endpoint for posts of $tag at subscribing instance + -- delivery endpoint for posts of $tag at subscribing instance :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer - -- ^ endpoint for subscribing the instance specified in + -- endpoint for subscribing the instance specified in -- the Origin header to $hashtag. -- Returns subscription lease time in seconds. :<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text - -- ^ endpoint for unsubscribing the instance specified in + -- endpoint for unsubscribing the instance specified in -- the Origin header to $hashtag From 7280f251b5015fed7e76e1c01dcf19145f77cf83 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 1 Aug 2020 11:00:29 +0200 Subject: [PATCH 3/5] server endpoint for tag subscription --- Hash2Pub.cabal | 2 +- app/Main.hs | 2 +- src/Hash2Pub/FediChordTypes.hs | 2 +- src/Hash2Pub/PostService.hs | 19 ++++++++++++++++--- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 54cb29d..251c60d 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client ghc-options: -Wall diff --git a/app/Main.hs b/app/Main.hs index 98961c0..3bdb4d4 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -57,7 +57,7 @@ readConfig = do , confMaxLookupCacheAge = 300 } sConf = ServiceConf { - confSubscriptionExpiryTime = 2*3600 `div` read speedup + confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer) , confServicePort = read servicePortString , confServiceHost = confDomainString } diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index e73e7f5..91b3822 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -430,7 +430,7 @@ instance Hashable.Hashable NodeID where hash = Hashable.hash . getNodeID data ServiceConf = ServiceConf - { confSubscriptionExpiryTime :: Integer + { confSubscriptionExpiryTime :: POSIXTime -- ^ subscription lease expiration in seconds , confServicePort :: Int -- ^ listening port for service diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index dc2164a..d56eb4c 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -16,7 +16,8 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Monad (foldM, forM_, forever) import Control.Monad.IO.Class (liftIO) -import qualified Data.ByteString.Lazy.UTF8 as BSU +import qualified Data.ByteString.Lazy.UTF8 as BSUL +import qualified Data.ByteString.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet import Data.Maybe (fromMaybe, isJust) @@ -26,6 +27,7 @@ import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX import Data.Typeable (Typeable) +import qualified Network.HTTP.Client as HTTP import System.Random import qualified Network.Wai.Handler.Warp as Warp @@ -235,7 +237,18 @@ tagDelivery serv hashtag posts = do pure $ "Received a postID for tag " <> hashtag tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer -tagSubscribe serv hashtag origin = pure 42 +tagSubscribe serv hashtag origin = do + originURL <- maybe + (throwError $ err400 { errBody = "Missing Origin header" }) + pure + origin + req <- HTTP.parseUrlThrow (Txt.unpack originURL) + now <- liftIO getPOSIXTime + let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv) + -- setup subscription entry + _ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime + pure $ round leaseTime + tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag @@ -310,7 +323,7 @@ normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where - mimeRender _ = BSU.fromString . show + mimeRender _ = BSUL.fromString . show -- ====== worker threads ====== From 89706f688a332f2966d8c48a12c9a2e983424310 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 1 Aug 2020 11:18:16 +0200 Subject: [PATCH 4/5] server endpoint for tag unsubscription --- src/Hash2Pub/PostService.hs | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index d56eb4c..838b2c8 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -251,8 +251,14 @@ tagSubscribe serv hashtag origin = do tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text -tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag - +tagUnsubscribe serv hashtag origin = do + originURL <- maybe + (throwError $ err400 { errBody = "Missing Origin header" }) + pure + origin + req <- HTTP.parseUrlThrow (Txt.unpack originURL) + liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) + pure "bye bye" -- ======= data structure manipulations ========= @@ -297,6 +303,25 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do Just (tagOutChan, _) -> pure tagOutChan +-- | deletes a subscription from the passed subscriber map +deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM () +deleteSubscription tagMapSTM tag subscriber = do + tagMap <- readTVar tagMapSTM + case lookupTagSubscriptions tag tagMap of + -- no subscribers to that tag, just return + Nothing -> pure () + Just (foundSubMapSTM, _, _) -> do + foundSubMap <- readTVar foundSubMapSTM + let newSubMap = HMap.delete subscriber foundSubMap + -- if there are no subscriptions for the tag anymore, remove its + -- data sttructure altogether + if HMap.null newSubMap + then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap + -- otherwise just remove the subscription of that node + else writeTVar foundSubMapSTM newSubMap + + + -- | returns the broadcast channel of a hashtag if there are any subscribers to it getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) getTagBroadcastChannel serv tag = do From 8faa9dc0164162d1f2b5ba558faf36a7defee250 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 1 Aug 2020 18:58:30 +0200 Subject: [PATCH 5/5] fix test by providing a MockService --- test/FediChordSpec.hs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index bcc2eaf..ed1f3c8 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -1,4 +1,6 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} module FediChordSpec where import Control.Concurrent.STM.TVar @@ -292,12 +294,15 @@ exampleNodeState = RemoteNodeState { , vServerID = 0 } -exampleLocalNode :: IO (LocalNodeState s) -exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode { +exampleLocalNode :: IO (LocalNodeState MockService) +exampleLocalNode = do + realNode <- newTVarIO $ RealNode { vservers = [] , nodeConfig = exampleFediConf , bootstrapNodes = confBootstrapNodes exampleFediConf - }) + , nodeService = MockService + } + nodeStateInit realNode exampleFediConf :: FediChordConf @@ -313,3 +318,9 @@ exampleVs :: (Integral i) => i exampleVs = 4 exampleIp :: HostAddress6 exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e) + +data MockService d = MockService + +instance DHT d => Service MockService d where + runService _ _ = pure MockService + getListeningPortFromService = const 1337