abstract away the hashtag -> NodeID conversion
This commit is contained in:
parent
75c1932ef6
commit
c3b1aad1c7
|
@ -187,7 +187,7 @@ relayInbox serv tag posts = do
|
||||||
-- skip checking whether the post actually contains the tag, just drop full post
|
-- skip checking whether the post actually contains the tag, just drop full post
|
||||||
postIDs = head . Txt.splitOn "," <$> Txt.lines posts
|
postIDs = head . Txt.splitOn "," <$> Txt.lines posts
|
||||||
-- if tag is not in own responsibility, return a 410 Gone
|
-- if tag is not in own responsibility, return a 410 Gone
|
||||||
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag)
|
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag)
|
||||||
if responsible
|
if responsible
|
||||||
then pure ()
|
then pure ()
|
||||||
else
|
else
|
||||||
|
@ -221,7 +221,7 @@ subscriptionDelivery serv senderID subList = do
|
||||||
-- not-handled tag occurs, this results in a single large transaction.
|
-- not-handled tag occurs, this results in a single large transaction.
|
||||||
-- Hopefully the performance isn't too bad.
|
-- Hopefully the performance isn't too bad.
|
||||||
res <- liftIO . atomically $ (foldM (\_ tag' -> do
|
res <- liftIO . atomically $ (foldM (\_ tag' -> do
|
||||||
responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag')
|
responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag')
|
||||||
if responsible
|
if responsible
|
||||||
then processTag (subscribers serv) tag'
|
then processTag (subscribers serv) tag'
|
||||||
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
|
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
|
||||||
|
@ -295,7 +295,7 @@ tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
|
||||||
tagDelivery serv hashtag posts = do
|
tagDelivery serv hashtag posts = do
|
||||||
let postIDs = Txt.lines posts
|
let postIDs = Txt.lines posts
|
||||||
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
|
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
|
||||||
if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions)
|
if isJust (HMap.lookup (hashtagToId hashtag) subscriptions)
|
||||||
then -- TODO: increase a counter/ statistics for received posts of this tag
|
then -- TODO: increase a counter/ statistics for received posts of this tag
|
||||||
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
|
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
|
||||||
else -- silently drop posts from unsubscribed tags
|
else -- silently drop posts from unsubscribed tags
|
||||||
|
@ -304,7 +304,7 @@ tagDelivery serv hashtag posts = do
|
||||||
|
|
||||||
tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
|
tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
|
||||||
tagSubscribe serv hashtag origin = do
|
tagSubscribe serv hashtag origin = do
|
||||||
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag)
|
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
|
||||||
if not responsible
|
if not responsible
|
||||||
-- GONE if not responsible
|
-- GONE if not responsible
|
||||||
then throwError err410 { errBody = "not responsible for this tag" }
|
then throwError err410 { errBody = "not responsible for this tag" }
|
||||||
|
@ -323,7 +323,7 @@ tagSubscribe serv hashtag origin = do
|
||||||
|
|
||||||
tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
|
tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
|
||||||
tagUnsubscribe serv hashtag origin = do
|
tagUnsubscribe serv hashtag origin = do
|
||||||
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag)
|
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
|
||||||
if not responsible
|
if not responsible
|
||||||
-- GONE if not responsible
|
-- GONE if not responsible
|
||||||
then throwError err410 { errBody = "not responsible for this tag" }
|
then throwError err410 { errBody = "not responsible for this tag" }
|
||||||
|
@ -385,7 +385,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
||||||
Right _ -> do
|
Right _ -> do
|
||||||
atomically $
|
atomically $
|
||||||
modifyTVar' (subscribers serv) $ \tagMap ->
|
modifyTVar' (subscribers serv) $ \tagMap ->
|
||||||
foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags)
|
foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags)
|
||||||
pure . Right $ ()
|
pure . Right $ ()
|
||||||
where
|
where
|
||||||
channelGetAll :: TChan a -> STM [a]
|
channelGetAll :: TChan a -> STM [a]
|
||||||
|
@ -415,7 +415,7 @@ clientSubscribeTo serv tag = do
|
||||||
doSubscribe newRes False
|
doSubscribe newRes False
|
||||||
Left err -> pure . Left . show $ err
|
Left err -> pure . Left . show $ err
|
||||||
Right lease -> do
|
Right lease -> do
|
||||||
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (genKeyID . Txt.unpack $ tag) (fromInteger lease)
|
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
|
||||||
pure . Right $ lease
|
pure . Right $ lease
|
||||||
)
|
)
|
||||||
lookupResponse
|
lookupResponse
|
||||||
|
@ -439,7 +439,7 @@ clientUnsubscribeFrom serv tag = do
|
||||||
doUnsubscribe newRes False
|
doUnsubscribe newRes False
|
||||||
Left err -> pure . Left . show $ err
|
Left err -> pure . Left . show $ err
|
||||||
Right _ -> do
|
Right _ -> do
|
||||||
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (genKeyID . Txt.unpack $ tag)
|
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag)
|
||||||
pure . Right $ ()
|
pure . Right $ ()
|
||||||
)
|
)
|
||||||
lookupResponse
|
lookupResponse
|
||||||
|
@ -497,7 +497,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
|
||||||
broadcastChan <- newBroadcastTChan
|
broadcastChan <- newBroadcastTChan
|
||||||
tagOutChan <- dupTChan broadcastChan
|
tagOutChan <- dupTChan broadcastChan
|
||||||
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
|
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
|
||||||
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
|
writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap
|
||||||
pure tagOutChan
|
pure tagOutChan
|
||||||
Just (foundSubMapSTM, broadcastChan, _) -> do
|
Just (foundSubMapSTM, broadcastChan, _) -> do
|
||||||
-- otherwise use the existing subscriber map
|
-- otherwise use the existing subscriber map
|
||||||
|
@ -525,7 +525,7 @@ deleteSubscription tagMapSTM tag subscriber = do
|
||||||
-- if there are no subscriptions for the tag anymore, remove its
|
-- if there are no subscriptions for the tag anymore, remove its
|
||||||
-- data sttructure altogether
|
-- data sttructure altogether
|
||||||
if HMap.null newSubMap
|
if HMap.null newSubMap
|
||||||
then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap
|
then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap
|
||||||
-- otherwise just remove the subscription of that node
|
-- otherwise just remove the subscription of that node
|
||||||
else writeTVar foundSubMapSTM newSubMap
|
else writeTVar foundSubMapSTM newSubMap
|
||||||
|
|
||||||
|
@ -546,13 +546,18 @@ getTagBroadcastChannel serv tag = do
|
||||||
|
|
||||||
-- | look up the subscription data of a tag
|
-- | look up the subscription data of a tag
|
||||||
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
|
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
|
||||||
lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag)
|
lookupTagSubscriptions tag = rMapLookup (hashtagToId tag)
|
||||||
|
|
||||||
|
|
||||||
-- normalise the unicode representation of a string to NFC
|
-- normalise the unicode representation of a string to NFC
|
||||||
normaliseTag :: Txt.Text -> Txt.Text
|
normaliseTag :: Txt.Text -> Txt.Text
|
||||||
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
|
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
|
||||||
|
|
||||||
|
|
||||||
|
-- | convert a hashtag to its representation on the DHT
|
||||||
|
hashtagToId :: Hashtag -> NodeID
|
||||||
|
hashtagToId = genKeyID . Txt.unpack
|
||||||
|
|
||||||
-- | define how to convert all showable types to PlainText
|
-- | define how to convert all showable types to PlainText
|
||||||
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
|
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
|
||||||
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
|
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
|
||||||
|
@ -590,7 +595,7 @@ processIncomingPosts serv = forever $ do
|
||||||
-- TODO: stats
|
-- TODO: stats
|
||||||
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
|
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
subscriptionStatus <- HMap.lookup (genKeyID . Txt.unpack $ tag) <$> readTVarIO (ownSubscriptions serv)
|
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
|
||||||
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag
|
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag
|
||||||
when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $
|
when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $
|
||||||
void $ clientSubscribeTo serv tag
|
void $ clientSubscribeTo serv tag
|
||||||
|
|
Loading…
Reference in a new issue