|
|
|
@ -16,16 +16,18 @@ import Control.Concurrent.STM.TQueue
|
|
|
|
|
import Control.Concurrent.STM.TVar
|
|
|
|
|
import Control.Monad (foldM, forM_, forever)
|
|
|
|
|
import Control.Monad.IO.Class (liftIO)
|
|
|
|
|
import qualified Data.ByteString.Lazy.UTF8 as BSU
|
|
|
|
|
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
|
|
|
|
import qualified Data.ByteString.UTF8 as BSU
|
|
|
|
|
import qualified Data.HashMap.Strict as HMap
|
|
|
|
|
import qualified Data.HashSet as HSet
|
|
|
|
|
import Data.Maybe (fromMaybe)
|
|
|
|
|
import Data.Maybe (fromMaybe, isJust)
|
|
|
|
|
import Data.String (fromString)
|
|
|
|
|
import qualified Data.Text.Lazy as Txt
|
|
|
|
|
import Data.Text.Normalize (NormalizationMode (NFC),
|
|
|
|
|
normalize)
|
|
|
|
|
import Data.Time.Clock.POSIX
|
|
|
|
|
import Data.Typeable (Typeable)
|
|
|
|
|
import qualified Network.HTTP.Client as HTTP
|
|
|
|
|
import System.Random
|
|
|
|
|
|
|
|
|
|
import qualified Network.Wai.Handler.Warp as Warp
|
|
|
|
@ -48,6 +50,7 @@ data PostService d = PostService
|
|
|
|
|
-- ^ just store the existence of posts for saving memory,
|
|
|
|
|
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
|
|
|
|
|
-- ^ Queue for processing incoming posts of own instance asynchronously
|
|
|
|
|
, postFetchQueue :: TQueue PostID
|
|
|
|
|
}
|
|
|
|
|
deriving (Typeable)
|
|
|
|
|
|
|
|
|
@ -73,6 +76,7 @@ instance DHT d => Service PostService d where
|
|
|
|
|
ownSubsVar <- newTVarIO HMap.empty
|
|
|
|
|
ownPostVar <- newTVarIO HSet.empty
|
|
|
|
|
relayInQueue' <- newTQueueIO
|
|
|
|
|
postFetchQueue' <- newTQueueIO
|
|
|
|
|
let
|
|
|
|
|
thisService = PostService {
|
|
|
|
|
serviceConf = conf
|
|
|
|
@ -82,6 +86,7 @@ instance DHT d => Service PostService d where
|
|
|
|
|
, ownSubscriptions = ownSubsVar
|
|
|
|
|
, ownPosts = ownPostVar
|
|
|
|
|
, relayInQueue = relayInQueue'
|
|
|
|
|
, postFetchQueue = postFetchQueue'
|
|
|
|
|
}
|
|
|
|
|
port' = fromIntegral (confServicePort conf)
|
|
|
|
|
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
|
|
|
@ -117,23 +122,23 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
|
|
|
|
|
-- ========= HTTP API and handlers =============
|
|
|
|
|
|
|
|
|
|
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
|
|
|
|
-- ^ delivery endpoint of newly published posts of the relay's instance
|
|
|
|
|
-- delivery endpoint of newly published posts of the relay's instance
|
|
|
|
|
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
|
|
|
|
|
-- ^ endpoint for delivering the subscriptions and outstanding queue
|
|
|
|
|
-- endpoint for delivering the subscriptions and outstanding queue
|
|
|
|
|
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
|
|
|
|
|
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
|
|
|
|
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
|
|
|
|
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
|
|
|
|
|
-- ^ endpoint for fetching multiple posts at once
|
|
|
|
|
-- endpoint for fetching multiple posts at once
|
|
|
|
|
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
|
|
|
|
-- ^ delivery endpoint of newly published posts of the relay's instance
|
|
|
|
|
-- delivery endpoint of newly published posts of the relay's instance
|
|
|
|
|
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
|
|
|
|
|
-- ^ delivery endpoint for posts of $tag at subscribing instance
|
|
|
|
|
-- delivery endpoint for posts of $tag at subscribing instance
|
|
|
|
|
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
|
|
|
|
|
-- ^ endpoint for subscribing the instance specified in
|
|
|
|
|
-- endpoint for subscribing the instance specified in
|
|
|
|
|
-- the Origin header to $hashtag.
|
|
|
|
|
-- Returns subscription lease time in seconds.
|
|
|
|
|
:<|> "tags" :> Capture "hashtag" Txt.Text :> "unsubscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Txt.Text
|
|
|
|
|
-- ^ endpoint for unsubscribing the instance specified in
|
|
|
|
|
-- endpoint for unsubscribing the instance specified in
|
|
|
|
|
-- the Origin header to $hashtag
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -221,14 +226,39 @@ postInbox serv post = do
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
|
|
|
|
|
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
|
|
|
|
|
tagDelivery serv hashtag posts = do
|
|
|
|
|
let postIDs = Txt.lines posts
|
|
|
|
|
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
|
|
|
|
|
if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions)
|
|
|
|
|
then -- TODO: increase a counter/ statistics for received posts of this tag
|
|
|
|
|
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
|
|
|
|
|
else -- silently drop posts from unsubscribed tags
|
|
|
|
|
pure ()
|
|
|
|
|
pure $ "Received a postID for tag " <> hashtag
|
|
|
|
|
|
|
|
|
|
tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
|
|
|
|
|
tagSubscribe serv hashtag origin = pure 42
|
|
|
|
|
tagSubscribe serv hashtag origin = do
|
|
|
|
|
originURL <- maybe
|
|
|
|
|
(throwError $ err400 { errBody = "Missing Origin header" })
|
|
|
|
|
pure
|
|
|
|
|
origin
|
|
|
|
|
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
|
|
|
|
|
now <- liftIO getPOSIXTime
|
|
|
|
|
let leaseTime = now + confSubscriptionExpiryTime (serviceConf serv)
|
|
|
|
|
-- setup subscription entry
|
|
|
|
|
_ <- liftIO . atomically $ setupSubscriberChannel (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req) leaseTime
|
|
|
|
|
pure $ round leaseTime
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tagUnsubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
|
|
|
|
|
tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription from " <> fromMaybe "Nothing" origin <> " to " <> hashtag
|
|
|
|
|
|
|
|
|
|
tagUnsubscribe serv hashtag origin = do
|
|
|
|
|
originURL <- maybe
|
|
|
|
|
(throwError $ err400 { errBody = "Missing Origin header" })
|
|
|
|
|
pure
|
|
|
|
|
origin
|
|
|
|
|
req <- HTTP.parseUrlThrow (Txt.unpack originURL)
|
|
|
|
|
liftIO . atomically $ deleteSubscription (subscribers serv) hashtag (BSU.toString $ HTTP.host req, HTTP.port req)
|
|
|
|
|
pure "bye bye"
|
|
|
|
|
|
|
|
|
|
-- ======= data structure manipulations =========
|
|
|
|
|
|
|
|
|
@ -251,7 +281,7 @@ enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
|
|
|
|
|
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
|
|
|
|
|
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
|
|
|
|
|
tagMap <- readTVar tagMapSTM
|
|
|
|
|
case lookupRelayTags tag tagMap of
|
|
|
|
|
case lookupTagSubscriptions tag tagMap of
|
|
|
|
|
Nothing -> do
|
|
|
|
|
-- if no collision/ tag doesn't exist yet, just initialize a
|
|
|
|
|
-- new subscriber map
|
|
|
|
@ -273,11 +303,30 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
|
|
|
|
|
Just (tagOutChan, _) -> pure tagOutChan
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | deletes a subscription from the passed subscriber map
|
|
|
|
|
deleteSubscription :: TVar RelayTags -> Hashtag -> (String, Int) -> STM ()
|
|
|
|
|
deleteSubscription tagMapSTM tag subscriber = do
|
|
|
|
|
tagMap <- readTVar tagMapSTM
|
|
|
|
|
case lookupTagSubscriptions tag tagMap of
|
|
|
|
|
-- no subscribers to that tag, just return
|
|
|
|
|
Nothing -> pure ()
|
|
|
|
|
Just (foundSubMapSTM, _, _) -> do
|
|
|
|
|
foundSubMap <- readTVar foundSubMapSTM
|
|
|
|
|
let newSubMap = HMap.delete subscriber foundSubMap
|
|
|
|
|
-- if there are no subscriptions for the tag anymore, remove its
|
|
|
|
|
-- data sttructure altogether
|
|
|
|
|
if HMap.null newSubMap
|
|
|
|
|
then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap
|
|
|
|
|
-- otherwise just remove the subscription of that node
|
|
|
|
|
else writeTVar foundSubMapSTM newSubMap
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
|
|
|
|
|
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
|
|
|
|
|
getTagBroadcastChannel serv tag = do
|
|
|
|
|
tagMap <- readTVar $ subscribers serv
|
|
|
|
|
case lookupRelayTags tag tagMap of
|
|
|
|
|
case lookupTagSubscriptions tag tagMap of
|
|
|
|
|
Nothing -> pure Nothing
|
|
|
|
|
Just (subscriberSTM, broadcastChan, _) -> do
|
|
|
|
|
subscriberMap <- readTVar subscriberSTM
|
|
|
|
@ -287,8 +336,8 @@ getTagBroadcastChannel serv tag = do
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | look up the subscription data of a tag
|
|
|
|
|
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag)
|
|
|
|
|
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag)
|
|
|
|
|
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
|
|
|
|
|
lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- normalise the unicode representation of a string to NFC
|
|
|
|
@ -299,7 +348,7 @@ normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
|
|
|
|
|
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
|
|
|
|
|
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
|
|
|
|
|
instance {-# OVERLAPPABLE #-} Show a => MimeRender PlainText a where
|
|
|
|
|
mimeRender _ = BSU.fromString . show
|
|
|
|
|
mimeRender _ = BSUL.fromString . show
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- ====== worker threads ======
|
|
|
|
|