diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 251c60d..54cb29d 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms ghc-options: -Wall diff --git a/app/Main.hs b/app/Main.hs index 3bdb4d4..98961c0 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -57,7 +57,7 @@ readConfig = do , confMaxLookupCacheAge = 300 } sConf = ServiceConf { - confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer) + confSubscriptionExpiryTime = 2*3600 `div` read speedup , confServicePort = read servicePortString , confServiceHost = confDomainString } diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 91b3822..e73e7f5 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -430,7 +430,7 @@ instance Hashable.Hashable NodeID where hash = Hashable.hash . getNodeID data ServiceConf = ServiceConf - { confSubscriptionExpiryTime :: POSIXTime + { confSubscriptionExpiryTime :: Integer -- ^ subscription lease expiration in seconds , confServicePort :: Int -- ^ listening port for service diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 838b2c8..ae122e2 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -16,18 +16,16 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Monad (foldM, forM_, forever) import Control.Monad.IO.Class (liftIO) -import qualified Data.ByteString.Lazy.UTF8 as BSUL -import qualified Data.ByteString.UTF8 as BSU +import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.HashMap.Strict as HMap import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe, isJust) +import Data.Maybe (fromMaybe) import Data.String (fromString) import qualified Data.Text.Lazy as Txt import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX import Data.Typeable (Typeable) -import qualified Network.HTTP.Client as HTTP import System.Random import qualified Network.Wai.Handler.Warp as Warp @@ -50,7 +48,6 @@ data PostService d = PostService -- ^ just store the existence of posts for saving memory, , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously - , postFetchQueue :: TQueue PostID } deriving (Typeable) @@ -76,7 +73,6 @@ instance DHT d => Service PostService d where ownSubsVar <- newTVarIO HMap.empty ownPostVar <- newTVarIO HSet.empty relayInQueue' <- newTQueueIO - postFetchQueue' <- newTQueueIO let thisService = PostService { serviceConf = conf @@ -86,7 +82,6 @@ instance DHT d => Service PostService d where , ownSubscriptions = ownSubsVar , ownPosts = ownPostVar , relayInQueue = relayInQueue' - , postFetchQueue = postFetchQueue' } port' = fromIntegral (confServicePort conf) warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings @@ -122,23 +117,23 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance + -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent - -- endpoint for delivering the subscriptions and outstanding queue + -- ^ endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text - -- fetch endpoint for posts, full post ID is http://$domain/post/$postid + -- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text - -- endpoint for fetching multiple posts at once + -- ^ endpoint for fetching multiple posts at once :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance + -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text - -- delivery endpoint for posts of $tag at subscribing instance + -- ^ delivery endpoint for posts of $tag at subscribing instance :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer - -- endpoint for subscribing the instance specified in + -- ^ endpoint for subscribing the instance specified in -- the Origin header to $hashtag. -- Returns subscription lease time in seconds. :<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text - -- endpoint for unsubscribing the instance specified in + -- ^ endpoint for unsubscribing the instance specified in -- the Origin header to $hashtag @@ -226,39 +221,14 @@ postInbox serv post = do tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text -tagDelivery serv hashtag posts = do - let postIDs = Txt.lines posts - subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv - if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions) - then -- TODO: increase a counter/ statistics for received posts of this tag - liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) - else -- silently drop posts from unsubscribed tags - pure () - pure $ "Received a postID for tag " <> hashtag +tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer -tagSubscribe serv hashtag origin = do - originURL <- maybe - (throwError $ err400 { errBody = "Missing Origin header" }) - pure - origin - req <- HTTP.parseUrlThrow (Txt.unpack originURL) - now <- liftIO getPOSIXTime - let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv) - -- setup subscription entry - _ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime - pure $ round leaseTime - +tagSubscribe serv hashtag origin = pure 42 tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text -tagUnsubscribe serv hashtag origin = do - originURL <- maybe - (throwError $ err400 { errBody = "Missing Origin header" }) - pure - origin - req <- HTTP.parseUrlThrow (Txt.unpack originURL) - liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) - pure "bye bye" +tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag + -- ======= data structure manipulations ========= @@ -281,7 +251,7 @@ enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID) setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do tagMap <- readTVar tagMapSTM - case lookupTagSubscriptions tag tagMap of + case lookupRelayTags tag tagMap of Nothing -> do -- if no collision/ tag doesn't exist yet, just initialize a -- new subscriber map @@ -303,30 +273,11 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do Just (tagOutChan, _) -> pure tagOutChan --- | deletes a subscription from the passed subscriber map -deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM () -deleteSubscription tagMapSTM tag subscriber = do - tagMap <- readTVar tagMapSTM - case lookupTagSubscriptions tag tagMap of - -- no subscribers to that tag, just return - Nothing -> pure () - Just (foundSubMapSTM, _, _) -> do - foundSubMap <- readTVar foundSubMapSTM - let newSubMap = HMap.delete subscriber foundSubMap - -- if there are no subscriptions for the tag anymore, remove its - -- data sttructure altogether - if HMap.null newSubMap - then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap - -- otherwise just remove the subscription of that node - else writeTVar foundSubMapSTM newSubMap - - - -- | returns the broadcast channel of a hashtag if there are any subscribers to it getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) getTagBroadcastChannel serv tag = do tagMap <- readTVar $ subscribers serv - case lookupTagSubscriptions tag tagMap of + case lookupRelayTags tag tagMap of Nothing -> pure Nothing Just (subscriberSTM, broadcastChan, _) -> do subscriberMap <- readTVar subscriberSTM @@ -336,8 +287,8 @@ getTagBroadcastChannel serv tag = do -- | look up the subscription data of a tag -lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a -lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag) +lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag) +lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag) -- normalise the unicode representation of a string to NFC @@ -348,7 +299,7 @@ normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where - mimeRender _ = BSUL.fromString . show + mimeRender _ = BSU.fromString . show -- ====== worker threads ====== diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index ed1f3c8..bcc2eaf 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -1,6 +1,4 @@ -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE OverloadedStrings #-} module FediChordSpec where import Control.Concurrent.STM.TVar @@ -294,15 +292,12 @@ exampleNodeState = RemoteNodeState { , vServerID = 0 } -exampleLocalNode :: IO (LocalNodeState MockService) -exampleLocalNode = do - realNode <- newTVarIO $ RealNode { +exampleLocalNode :: IO (LocalNodeState s) +exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode { vservers = [] , nodeConfig = exampleFediConf , bootstrapNodes = confBootstrapNodes exampleFediConf - , nodeService = MockService - } - nodeStateInit realNode + }) exampleFediConf :: FediChordConf @@ -318,9 +313,3 @@ exampleVs :: (Integral i) => i exampleVs = 4 exampleIp :: HostAddress6 exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e) - -data MockService d = MockService - -instance DHT d => Service MockService d where - runService _ _ = pure MockService - getListeningPortFromService = const 1337