actually check own responsibility for tags before accepting posts

This commit is contained in:
Trolli Schmittlauch 2020-08-11 00:07:45 +02:00
parent 7036867ae0
commit 96c1963a4f
5 changed files with 102 additions and 39 deletions

View file

@ -34,6 +34,7 @@ module Hash2Pub.DHTProtocol
, ackRequest , ackRequest
, isPossibleSuccessor , isPossibleSuccessor
, isPossiblePredecessor , isPossiblePredecessor
, isInOwnResponsibilitySlice
, isJoined , isJoined
, closestCachePredecessors , closestCachePredecessors
) )

View file

@ -656,6 +656,25 @@ fediMessageHandler sendQ recvQ nsSTM = do
instance DHT (RealNodeSTM s) where instance DHT (RealNodeSTM s) where
lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring
forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring
-- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState
isResponsibleFor nodeSTM key = do
node <- readTVarIO nodeSTM
foldM (\responsible vsSTM -> do
vs <- readTVarIO vsSTM
pure $ responsible || isInOwnResponsibilitySlice key vs
)
False
$ vservers node
isResponsibleForSTM nodeSTM key = do
node <- readTVar nodeSTM
foldM (\responsible vsSTM -> do
vs <- readTVar vsSTM
pure $ responsible || isInOwnResponsibilitySlice key vs
)
False
$ vservers node
-- | Returns the hostname and port of the host responsible for a key. -- | Returns the hostname and port of the host responsible for a key.
-- Information is provided from a cache, only on a cache miss a new DHT lookup -- Information is provided from a cache, only on a cache miss a new DHT lookup

View file

@ -446,3 +446,5 @@ class DHT d where
-- but force the DHT to do a fresh lookup instead of returning a cached result. -- but force the DHT to do a fresh lookup instead of returning a cached result.
-- Also invalidates old cache entries. -- Also invalidates old cache entries.
forceLookupKey :: d -> String -> IO (Maybe (String, PortNumber)) forceLookupKey :: d -> String -> IO (Maybe (String, PortNumber))
isResponsibleFor :: d -> NodeID -> IO Bool
isResponsibleForSTM :: d -> NodeID -> STM Bool

View file

@ -12,10 +12,13 @@ import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Monad (foldM, forM_, forever) import Control.Exception (Exception (..))
import Control.Monad (foldM, forM, forM_, forever)
import Control.Monad.IO.Class (liftIO) import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
@ -109,7 +112,7 @@ instance DHT d => Service PostService d where
-- | return a WAI application -- | return a WAI application
postServiceApplication :: PostService d -> Application postServiceApplication :: DHT d => PostService d -> Application
postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv
@ -126,7 +129,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance -- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue -- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid -- fetch endpoint for posts, full post ID is http://$domain/post/$postid
@ -145,7 +148,7 @@ type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBod
-- the Origin header to $hashtag -- the Origin header to $hashtag
postServer :: PostService d -> Server PostServiceAPI postServer :: DHT d => PostService d -> Server PostServiceAPI
postServer service = relayInbox service postServer service = relayInbox service
:<|> subscriptionDelivery service :<|> subscriptionDelivery service
:<|> postFetch service :<|> postFetch service
@ -156,15 +159,21 @@ postServer service = relayInbox service
:<|> tagUnsubscribe service :<|> tagUnsubscribe service
relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent
relayInbox serv tag posts = do relayInbox serv tag posts = do
let let
-- skip checking whether the post actually contains the tag, just drop full post -- skip checking whether the post actually contains the tag, just drop full post
postIDs = head . Txt.splitOn "," <$> Txt.lines posts postIDs = head . Txt.splitOn "," <$> Txt.lines posts
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
-- if tag is not in own responsibility, return a 410 Gone -- if tag is not in own responsibility, return a 410 Gone
maybe responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag)
if responsible
then pure ()
else
(throwError $ err410 { errBody = "Relay is not responsible for this tag"}) (throwError $ err410 { errBody = "Relay is not responsible for this tag"})
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
maybe
-- if noone subscribed to the tag, nothing needs to be done
(pure ())
-- otherwise enqueue posts into broadcast queue of the tag -- otherwise enqueue posts into broadcast queue of the tag
(\queue -> (\queue ->
liftIO $ forM_ postIDs (atomically . writeTChan queue) liftIO $ forM_ postIDs (atomically . writeTChan queue)
@ -172,15 +181,35 @@ relayInbox serv tag posts = do
broadcastChan broadcastChan
pure NoContent pure NoContent
subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent -- exception to be thrown when a tag is not in the responsibility of a relay
newtype UnhandledTagException = UnhandledTagException String
deriving (Show, Typeable)
instance Exception UnhandledTagException
subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv subList = do subscriptionDelivery serv subList = do
let let
tagSubs = Txt.lines subList tagSubs = Txt.lines subList
liftIO $ forM_ tagSubs $ processTag (subscribers serv) -- In favor of having the convenience of rolling back the transaction once a
pure NoContent -- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad.
res <- liftIO . atomically $ (foldM (\_ tag' -> do
responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag')
if responsible
then processTag (subscribers serv) tag'
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
pure $ Right ()
) (pure ()) tagSubs
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
-- TODO: potentially log this
:: STM (Either String ()))
case res of
Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure ""
-- TODO: check and only accept tags in own (future?) responsibility -- TODO: check and only accept tags in own (future?) responsibility
where where
processTag :: TVar RelayTags -> Txt.Text -> IO () processTag :: TVar RelayTags -> Txt.Text -> STM ()
processTag subscriberSTM tagData = do processTag subscriberSTM tagData = do
let let
tag:subText:lease:posts:_ = Txt.splitOn "," tagData tag:subText:lease:posts:_ = Txt.splitOn "," tagData
@ -292,11 +321,11 @@ enqueueSubscription :: TVar RelayTags -- tag-subscriber map
-> (String, Int) -- subscriber's connection information -> (String, Int) -- subscriber's connection information
-> [PostID] -- pending posts -> [PostID] -- pending posts
-> POSIXTime -- lease expiry time -> POSIXTime -- lease expiry time
-> IO () -> STM ()
enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
-- get the tag output queue and, if necessary, create it -- get the tag output queue and, if necessary, create it
subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime subChan <- setupSubscriberChannel tagMapSTM tag subscriber leaseTime
forM_ posts (atomically . writeTChan subChan) forM_ posts (writeTChan subChan)
-- | STM operation to return the outgoing post queue of a tag to a specified subscriber. -- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
@ -391,7 +420,7 @@ processIncomingPosts serv = forever $ do
Nothing -> threadDelay $ 10 * 10^6 Nothing -> threadDelay $ 10 * 10^6
Just (responsibleHost, responsiblePort) -> do Just (responsibleHost, responsiblePort) -> do
httpMan <- HTTP.newManager HTTP.defaultManagerSettings httpMan <- HTTP.newManager HTTP.defaultManagerSettings
resp <- runClientM (relayInboxClient tag (pID <> "," <> pContent)) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of case resp of
Left err -> putStrLn $ "Error: " <> show err Left err -> putStrLn $ "Error: " <> show err
Right yay -> putStrLn $ "Yay! " <> show yay Right yay -> putStrLn $ "Yay! " <> show yay

View file

@ -196,26 +196,25 @@ takeRMapEntries_ :: (Integral i, Bounded k, Ord k)
takeRMapEntries_ getterFunc startAt num rmap = reverse $ takeRMapEntries_ getterFunc startAt num rmap = reverse $
case getterFunc startAt rmap of case getterFunc startAt rmap of
Nothing -> [] Nothing -> []
Just (foundKey, anEntry) -> takeEntriesUntil rmap getterFunc foundKey foundKey (num-1) [anEntry] Just (foundKey, anEntry) -> takeEntriesUntil_ rmap getterFunc foundKey foundKey (Just $ num-1) [anEntry]
where
-- for some reason, just reusing the already-bound @rmap@ and @getterFunc@
-- variables leads to a type error, these need to be passed explicitly takeEntriesUntil_ :: (Integral i, Bounded k, Ord k)
takeEntriesUntil :: (Integral i, Bounded k, Ord k)
=> RingMap k a => RingMap k a
-> (k -> RingMap k a -> Maybe (k, a)) -- getter function -> (k -> RingMap k a -> Maybe (k, a)) -- getter function
-> k -> k -- limit value
-> k -> k -- start value
-> i -> Maybe i -- possible number limit
-> [a] -> [a]
-> [a] -> [a]
takeEntriesUntil rmap' getterFunc' havingReached previousEntry remaining takeAcc takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc
-- length limit reached -- length limit reached
| remaining <= 0 = takeAcc | remaining <= 0 = takeAcc
-- takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc =
| otherwise = case nextEntry of case nextEntry of
Just (fKey, gotEntry) Just (fKey, gotEntry)
| fKey == havingReached -> takeAcc | fKey == havingReached -> takeAcc
| otherwise -> takeEntriesUntil rmap' getterFunc' havingReached fKey (remaining - 1) (gotEntry:takeAcc) | otherwise -> takeEntriesUntil_ rmap' getterFunc' havingReached fKey (fmap pred numLimit) (gotEntry:takeAcc)
Nothing -> takeAcc Nothing -> takeAcc
where where
nextEntry = getterFunc' previousEntry rmap' nextEntry = getterFunc' previousEntry rmap'
@ -235,3 +234,16 @@ takeRMapSuccessors :: (Integral i, Bounded k, Ord k, Num k)
-> [a] -> [a]
takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k)
=> k -- start value for taking
-> k -- stop value for taking
-> RingMap k a
-> [a]
takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing []
takeRMapSuccesorsFromTo :: (Bounded k, Ord k, Num k)
=> k -- start value for taking
-> k -- stop value for taking
-> RingMap k a
-> [a]
takeRMapSuccesorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []