Compare commits

...

5 commits

5 changed files with 86 additions and 26 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client
ghc-options: -Wall

View file

@ -57,7 +57,7 @@ readConfig = do
, confMaxLookupCacheAge = 300
}
sConf = ServiceConf {
confSubscriptionExpiryTime = 2*3600 `div` read speedup
confSubscriptionExpiryTime = fromIntegral $ 2*3600 `div` (read speedup :: Integer)
, confServicePort = read servicePortString
, confServiceHost = confDomainString
}

View file

@ -430,7 +430,7 @@ instance Hashable.Hashable NodeID where
hash = Hashable.hash . getNodeID
data ServiceConf = ServiceConf
{ confSubscriptionExpiryTime :: Integer
{ confSubscriptionExpiryTime :: POSIXTime
-- ^ subscription lease expiration in seconds
, confServicePort :: Int
-- ^ listening port for service

View file

@ -16,16 +16,18 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Monad (foldM, forM_, forever)
import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy.UTF8 as BSU
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe)
import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString)
import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC),
normalize)
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import System.Random
import qualified Network.Wai.Handler.Warp as Warp
@ -48,6 +50,7 @@ data PostService d = PostService
-- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
}
deriving (Typeable)
@ -73,6 +76,7 @@ instance DHT d => Service PostService d where
ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
let
thisService = PostService {
serviceConf = conf
@ -82,6 +86,7 @@ instance DHT d => Service PostService d where
, ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
}
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
@ -117,23 +122,23 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance
-- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
-- ^ endpoint for delivering the subscriptions and outstanding queue
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for fetching multiple posts at once
-- endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance
-- delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint for posts of $tag at subscribing instance
-- delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
-- ^ endpoint for subscribing the instance specified in
-- endpoint for subscribing the instance specified in
-- the Origin header to $hashtag.
-- Returns subscription lease time in seconds.
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
-- ^ endpoint for unsubscribing the instance specified in
-- endpoint for unsubscribing the instance specified in
-- the Origin header to $hashtag
@ -221,14 +226,39 @@ postInbox serv post = do
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags
pure ()
pure $ "Received a postID for tag " <> hashtag
tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe serv hashtag origin = pure 42
tagSubscribe serv hashtag origin = do
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
now <- liftIO getPOSIXTime
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
-- setup subscription entry
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
pure $ round leaseTime
tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
tagUnsubscribe serv hashtag origin = do
originURL <- maybe
(throwError $ err400 { errBody = "Missing Origin header" })
pure
origin
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req)
pure "bye bye"
-- ======= data structure manipulations =========
@ -251,7 +281,7 @@ enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM
case lookupRelayTags tag tagMap of
case lookupTagSubscriptions tag tagMap of
Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map
@ -273,11 +303,30 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
Just (tagOutChan, _) -> pure tagOutChan
-- | deletes a subscription from the passed subscriber map
deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM ()
deleteSubscription tagMapSTM tag subscriber = do
tagMap <- readTVar tagMapSTM
case lookupTagSubscriptions tag tagMap of
-- no subscribers to that tag, just return
Nothing -> pure ()
Just (foundSubMapSTM, _, _) -> do
foundSubMap <- readTVar foundSubMapSTM
let newSubMap = HMap.delete subscriber foundSubMap
-- if there are no subscriptions for the tag anymore, remove its
-- data sttructure altogether
if HMap.null newSubMap
then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap
-- otherwise just remove the subscription of that node
else writeTVar foundSubMapSTM newSubMap
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv
case lookupRelayTags tag tagMap of
case lookupTagSubscriptions tag tagMap of
Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM
@ -287,8 +336,8 @@ getTagBroadcastChannel serv tag = do
-- | look up the subscription data of a tag
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag)
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag)
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC
@ -299,7 +348,7 @@ normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where
mimeRender _ = BSU.fromString . show
mimeRender _ = BSUL.fromString . show
-- ====== worker threads ======

View file

@ -1,4 +1,6 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module FediChordSpec where
import Control.Concurrent.STM.TVar
@ -292,12 +294,15 @@ exampleNodeState = RemoteNodeState {
, vServerID = 0
}
exampleLocalNode :: IO (LocalNodeState s)
exampleLocalNode = nodeStateInit =<< (newTVarIO $ RealNode {
exampleLocalNode :: IO (LocalNodeState MockService)
exampleLocalNode = do
realNode <- newTVarIO $ RealNode {
vservers = []
, nodeConfig = exampleFediConf
, bootstrapNodes = confBootstrapNodes exampleFediConf
})
, nodeService = MockService
}
nodeStateInit realNode
exampleFediConf :: FediChordConf
@ -313,3 +318,9 @@ exampleVs :: (Integral i) => i
exampleVs = 4
exampleIp :: HostAddress6
exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e)
data MockService d = MockService
instance DHT d => Service MockService d where
runService _ _ = pure MockService
getListeningPortFromService = const 1337