Compare commits

...

5 commits

5 changed files with 86 additions and 26 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client
ghc-options: -Wall ghc-options: -Wall

View file

@ -57,7 +57,7 @@ readConfig = do
, confMaxLookupCacheAge = 300 , confMaxLookupCacheAge = 300
} }
sConf = ServiceConf { sConf = ServiceConf {
confSubscriptionExpiryTime = 2*3600 `div` read speedup confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer)
, confServicePort = read servicePortString , confServicePort = read servicePortString
, confServiceHost = confDomainString , confServiceHost = confDomainString
} }

View file

@ -430,7 +430,7 @@ instance Hashable.Hashable NodeID where
hash = Hashable.hash . getNodeID hash = Hashable.hash . getNodeID
data ServiceConf = ServiceConf data ServiceConf = ServiceConf
{ confSubscriptionExpiryTime :: Integer { confSubscriptionExpiryTime :: POSIXTime
-- ^ subscription lease expiration in seconds -- ^ subscription lease expiration in seconds
, confServicePort :: Int , confServicePort :: Int
-- ^ listening port for service -- ^ listening port for service

View file

@ -16,16 +16,18 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Monad (foldM, forM_, forever) import Control.Monad (foldM, forM_, forever)
import Control.Monad.IO.Class (liftIO) import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString) import Data.String (fromString)
import qualified Data.Text.Lazy as Txt import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC), import Data.Text.Normalize (NormalizationMode (NFC),
normalize) normalize)
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Typeable (Typeable) import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import System.Random import System.Random
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
@ -48,6 +50,7 @@ data PostService d = PostService
-- ^ just store the existence of posts for saving memory, -- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent) , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously -- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
} }
deriving (Typeable) deriving (Typeable)
@ -73,6 +76,7 @@ instance DHT d => Service PostService d where
ownSubsVar <- newTVarIO HMap.empty ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
let let
thisService = PostService { thisService = PostService {
serviceConf = conf serviceConf = conf
@ -82,6 +86,7 @@ instance DHT d => Service PostService d where
, ownSubscriptions = ownSubsVar , ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar , ownPosts = ownPostVar
, relayInQueue = relayInQueue' , relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
} }
port' = fromIntegral (confServicePort conf) port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
@ -117,23 +122,23 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers ============= -- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance -- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
-- ^ endpoint for delivering the subscriptions and outstanding queue -- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid -- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for fetching multiple posts at once -- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance -- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint for posts of $tag at subscribing instance -- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
-- ^ endpoint for subscribing the instance specified in -- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag. -- the Origin header to $hashtag.
-- Returns subscription lease time in seconds. -- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text :<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ endpoint for unsubscribing the instance specified in -- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag -- the Origin header to $hashtag
@ -221,14 +226,39 @@ postInbox serv post = do
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags
pure ()
pure $ "Received a postID for tag " <> hashtag
tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe serv hashtag origin = pure 42 tagSubscribe serv hashtag origin = do
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
now <- liftIO getPOSIXTime
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
pure $ round leaseTime
tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag tagUnsubscribe serv hashtag origin = do
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req)
pure "bye bye"
-- ======= data structure manipulations ========= -- ======= data structure manipulations =========
@ -251,7 +281,7 @@ enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID) setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM tagMap <- readTVar tagMapSTM
case lookupRelayTags tag tagMap of case lookupTagSubscriptions tag tagMap of
Nothing -> do Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a -- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map -- new subscriber map
@ -273,11 +303,30 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
Just (tagOutChan, _) -> pure tagOutChan Just (tagOutChan, _) -> pure tagOutChan
-- | deletes a subscription from the passed subscriber map
deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM ()
deleteSubscription tagMapSTM tag subscriber = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
-- no subscribers to that tag, just return
Nothing -> pure ()
Just (foundSubMapSTM, _, _) -> do
foundSubMap <- readTVar foundSubMapSTM
let newSubMap = HMap.delete subscriber foundSubMap
-- if there are no subscriptions for the tag anymore, remove its
-- data sttructure altogether
if HMap.null newSubMap
then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap
-- otherwise just remove the subscription of that node
else writeTVar foundSubMapSTM newSubMap
-- | returns the broadcast channel of a hashtag if there are any subscribers to it -- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv tagMap <- readTVar $ subscribers serv
case lookupRelayTags tag tagMap of case lookupTagSubscriptions tag tagMap of
Nothing -> pure Nothing Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM subscriberMap <- readTVar subscriberSTM
@ -287,8 +336,8 @@ getTagBroadcastChannel serv tag = do
-- | look up the subscription data of a tag -- | look up the subscription data of a tag
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag) lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag) lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC -- normalise the unicode representation of a string to NFC
@ -299,7 +348,7 @@ normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where
mimeRender _ = BSU.fromString . show mimeRender _ = BSUL.fromString . show
-- ====== worker threads ====== -- ====== worker threads ======

View file

@ -1,4 +1,6 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module FediChordSpec where module FediChordSpec where
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
@ -292,12 +294,15 @@ exampleNodeState = RemoteNodeState {
, vServerID = 0 , vServerID = 0
} }
exampleLocalNode :: IO (LocalNodeState s) exampleLocalNode :: IO (LocalNodeState MockService)
exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode { exampleLocalNode = do
realNode <- newTVarIO $ RealNode {
vservers = [] vservers = []
, nodeConfig = exampleFediConf , nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf , bootstrapNodes = confBootstrapNodes exampleFediConf
}) , nodeService = MockService
}
nodeStateInit realNode
exampleFediConf :: FediChordConf exampleFediConf :: FediChordConf
@ -313,3 +318,9 @@ exampleVs :: (Integral i) => i
exampleVs = 4 exampleVs = 4
exampleIp :: HostAddress6 exampleIp :: HostAddress6
exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e) exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e)
data MockService d = MockService
instance DHT d => Service MockService d where
runService _ _ = pure MockService
getListeningPortFromService = const 1337