server endpoint for tag-post delivery

This commit is contained in:
Trolli Schmittlauch 2020-07-31 17:46:33 +02:00
parent e3c7faa80b
commit 50044673a6

View file

@ -19,7 +19,7 @@ import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.ByteString.Lazy.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString) import Data.String (fromString)
import qualified Data.Text.Lazy as Txt import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC), import Data.Text.Normalize (NormalizationMode (NFC),
@ -48,6 +48,7 @@ data PostService d = PostService
-- ^ just store the existence of posts for saving memory, -- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent) , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously -- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
} }
deriving (Typeable) deriving (Typeable)
@ -73,6 +74,7 @@ instance DHT d => Service PostService d where
ownSubsVar <- newTVarIO HMap.empty ownSubsVar <- newTVarIO HMap.empty
ownPostVar <- newTVarIO HSet.empty ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
let let
thisService = PostService { thisService = PostService {
serviceConf = conf serviceConf = conf
@ -82,6 +84,7 @@ instance DHT d => Service PostService d where
, ownSubscriptions = ownSubsVar , ownSubscriptions = ownSubsVar
, ownPosts = ownPostVar , ownPosts = ownPostVar
, relayInQueue = relayInQueue' , relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
} }
port' = fromIntegral (confServicePort conf) port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
@ -221,7 +224,15 @@ postInbox serv post = do
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags
pure ()
pure $ "Received a postID for tag " <> hashtag
tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe serv hashtag origin = pure 42 tagSubscribe serv hashtag origin = pure 42
@ -251,7 +262,7 @@ enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID) setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM tagMap <- readTVar tagMapSTM
case lookupRelayTags tag tagMap of case lookupTagSubscriptions tag tagMap of
Nothing -> do Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a -- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map -- new subscriber map
@ -277,7 +288,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv tagMap <- readTVar $ subscribers serv
case lookupRelayTags tag tagMap of case lookupTagSubscriptions tag tagMap of
Nothing -> pure Nothing Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM subscriberMap <- readTVar subscriberSTM
@ -287,8 +298,8 @@ getTagBroadcastChannel serv tag = do
-- | look up the subscription data of a tag -- | look up the subscription data of a tag
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag) lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag) lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC -- normalise the unicode representation of a string to NFC