diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index f962d58..a071132 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -34,6 +34,7 @@ module Hash2Pub.DHTProtocol , ackRequest , isPossibleSuccessor , isPossiblePredecessor + , isInOwnResponsibilitySlice , isJoined , closestCachePredecessors ) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 858b38e..7911f3c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -656,6 +656,25 @@ fediMessageHandler sendQ recvQ nsSTM = do instance DHT (RealNodeSTM s) where lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring + -- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState + isResponsibleFor nodeSTM key = do + node <- readTVarIO nodeSTM + foldM (\responsible vsSTM -> do + vs <- readTVarIO vsSTM + pure $ responsible || isInOwnResponsibilitySlice key vs + ) + False + $ vservers node + isResponsibleForSTM nodeSTM key = do + node <- readTVar nodeSTM + foldM (\responsible vsSTM -> do + vs <- readTVar vsSTM + pure $ responsible || isInOwnResponsibilitySlice key vs + ) + False + $ vservers node + + -- | Returns the hostname and port of the host responsible for a key. -- Information is provided from a cache, only on a cache miss a new DHT lookup diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 91b3822..20d65fe 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -446,3 +446,5 @@ class DHT d where -- but force the DHT to do a fresh lookup instead of returning a cached result. -- Also invalidates old cache entries. forceLookupKey :: d -> String -> IO (Maybe (String, PortNumber)) + isResponsibleFor :: d -> NodeID -> IO Bool + isResponsibleForSTM :: d -> NodeID -> STM Bool diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index e9144df..8f47227 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -12,10 +12,13 @@ import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar -import Control.Monad (foldM, forM_, forever) +import Control.Exception (Exception (..)) +import Control.Monad (foldM, forM, forM_, forever) import Control.Monad.IO.Class (liftIO) +import Control.Monad.STM import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU import qualified Data.HashMap.Strict as HMap @@ -109,7 +112,7 @@ instance DHT d => Service PostService d where -- | return a WAI application -postServiceApplication :: PostService d -> Application +postServiceApplication :: DHT d => PostService d -> Application postServiceApplication serv = serve exposedPostServiceAPI $ postServer serv @@ -126,7 +129,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- delivery endpoint of newly published posts of the relay's instance - :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent + :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text -- fetch endpoint for posts, full post ID is http://$domain/post/$postid @@ -145,7 +148,7 @@ type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBod -- the Origin header to $hashtag -postServer :: PostService d -> Server PostServiceAPI +postServer :: DHT d => PostService d -> Server PostServiceAPI postServer service = relayInbox service :<|> subscriptionDelivery service :<|> postFetch service @@ -156,15 +159,21 @@ postServer service = relayInbox service :<|> tagUnsubscribe service -relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent +relayInbox :: DHT d => PostService d -> Hashtag -> Txt.Text -> Handler NoContent relayInbox serv tag posts = do let -- skip checking whether the post actually contains the tag, just drop full post postIDs = head . Txt.splitOn "," <$> Txt.lines posts - broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag -- if tag is not in own responsibility, return a 410 Gone - maybe + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag) + if responsible + then pure () + else (throwError $ err410 { errBody = "Relay is not responsible for this tag"}) + broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag + maybe + -- if noone subscribed to the tag, nothing needs to be done + (pure ()) -- otherwise enqueue posts into broadcast queue of the tag (\queue -> liftIO $ forM_ postIDs (atomically . writeTChan queue) @@ -172,15 +181,35 @@ relayInbox serv tag posts = do broadcastChan pure NoContent -subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent +-- exception to be thrown when a tag is not in the responsibility of a relay +newtype UnhandledTagException = UnhandledTagException String + deriving (Show, Typeable) + +instance Exception UnhandledTagException + +subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text subscriptionDelivery serv subList = do let tagSubs = Txt.lines subList - liftIO $ forM_ tagSubs $ processTag (subscribers serv) - pure NoContent + -- In favor of having the convenience of rolling back the transaction once a + -- not-handled tag occurs, this results in a single large transaction. + -- Hopefully the performance isn't too bad. + res <- liftIO . atomically $ (foldM (\_ tag' -> do + responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag') + if responsible + then processTag (subscribers serv) tag' + else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay") + pure $ Right () + ) (pure ()) tagSubs + `catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException)) + -- TODO: potentially log this + :: STM (Either String ())) + case res of + Left err -> throwError err410 {errBody = BSUL.fromString err} + Right _ -> pure "" -- TODO: check and only accept tags in own (future?) responsibility where - processTag :: TVar RelayTags -> Txt.Text -> IO () + processTag :: TVar RelayTags -> Txt.Text -> STM () processTag subscriberSTM tagData = do let tag:subText:lease:posts:_ = Txt.splitOn "," tagData @@ -292,11 +321,11 @@ enqueueSubscription :: TVar RelayTags -- tag-subscriber map -> (String, Int) -- subscriber's connection information -> [PostID] -- pending posts -> POSIXTime -- lease expiry time - -> IO () + -> STM () enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do -- get the tag output queue and, if necessary, create it - subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime - forM_ posts (atomically . writeTChan subChan) + subChan <- setupSubscriberChannel tagMapSTM tag subscriber leaseTime + forM_ posts (writeTChan subChan) -- | STM operation to return the outgoing post queue of a tag to a specified subscriber. @@ -391,7 +420,7 @@ processIncomingPosts serv = forever $ do Nothing -> threadDelay $ 10 * 10^6 Just (responsibleHost, responsiblePort) -> do httpMan <- HTTP.newManager HTTP.defaultManagerSettings - resp <- runClientM (relayInboxClient tag (pID <> "," <> pContent)) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) + resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of - Left err -> putStrLn $ "Error: " <> show err + Left err -> putStrLn $ "Error: " <> show err Right yay -> putStrLn $ "Yay! " <> show yay diff --git a/src/Hash2Pub/RingMap.hs b/src/Hash2Pub/RingMap.hs index 016f9f1..9b439e9 100644 --- a/src/Hash2Pub/RingMap.hs +++ b/src/Hash2Pub/RingMap.hs @@ -196,29 +196,28 @@ takeRMapEntries_ :: (Integral i, Bounded k, Ord k) takeRMapEntries_ getterFunc startAt num rmap = reverse $ case getterFunc startAt rmap of Nothing -> [] - Just (foundKey, anEntry) -> takeEntriesUntil rmap getterFunc foundKey foundKey (num-1) [anEntry] - where - -- for some reason, just reusing the already-bound @rmap@ and @getterFunc@ - -- variables leads to a type error, these need to be passed explicitly - takeEntriesUntil :: (Integral i, Bounded k, Ord k) - => RingMap k a - -> (k -> RingMap k a -> Maybe (k, a)) -- getter function - -> k - -> k - -> i - -> [a] - -> [a] - takeEntriesUntil rmap' getterFunc' havingReached previousEntry remaining takeAcc - -- length limit reached - | remaining <= 0 = takeAcc - -- - | otherwise = case nextEntry of - Just (fKey, gotEntry) - | fKey == havingReached -> takeAcc - | otherwise -> takeEntriesUntil rmap' getterFunc' havingReached fKey (remaining - 1) (gotEntry:takeAcc) - Nothing -> takeAcc - where - nextEntry = getterFunc' previousEntry rmap' + Just (foundKey, anEntry) -> takeEntriesUntil_ rmap getterFunc foundKey foundKey (Just $ num-1) [anEntry] + + +takeEntriesUntil_ :: (Integral i, Bounded k, Ord k) + => RingMap k a + -> (k -> RingMap k a -> Maybe (k, a)) -- getter function + -> k -- limit value + -> k -- start value + -> Maybe i -- possible number limit + -> [a] + -> [a] +takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry (Just remaining) takeAcc + -- length limit reached + | remaining <= 0 = takeAcc +takeEntriesUntil_ rmap' getterFunc' havingReached previousEntry numLimit takeAcc = + case nextEntry of + Just (fKey, gotEntry) + | fKey == havingReached -> takeAcc + | otherwise -> takeEntriesUntil_ rmap' getterFunc' havingReached fKey (fmap pred numLimit) (gotEntry:takeAcc) + Nothing -> takeAcc + where + nextEntry = getterFunc' previousEntry rmap' takeRMapPredecessors :: (Integral i, Bounded k, Ord k, Num k) @@ -235,3 +234,16 @@ takeRMapSuccessors :: (Integral i, Bounded k, Ord k, Num k) -> [a] takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc +takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k) + => k -- start value for taking + -> k -- stop value for taking + -> RingMap k a + -> [a] +takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing [] + +takeRMapSuccesorsFromTo :: (Bounded k, Ord k, Num k) + => k -- start value for taking + -> k -- stop value for taking + -> RingMap k a + -> [a] +takeRMapSuccesorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []