From 402378a78bd94c351c1bab77fd6b8d1bc985ed18 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 13 Aug 2020 23:44:24 +0200 Subject: [PATCH] signal and handle non-responsibility to subscriptions --- src/Hash2Pub/PostService.hs | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 1de7302..26c473b 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -272,8 +272,13 @@ tagDelivery serv hashtag posts = do pure () pure $ "Received a postID for tag " <> hashtag -tagSubscribe :: PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer +tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = do + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) + if not responsible + -- GONE if not responsible + then throwError err410 { errBody = "not responsible for this tag" } + else pure () originURL <- maybe (throwError $ err400 { errBody = "Missing Origin header" }) pure @@ -359,16 +364,23 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) clientSubscribeTo serv tag = do lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) - let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer)) - maybe - (pure . Left $ "No node found") - (\(foundHost, foundPort) -> do - resp <- runClientM (tagSubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) "")) - case resp of - Left err -> pure . Left . show $ err - Right lease -> pure . Right $ lease - ) - lookupRes + doSubscribe lookupRes True + where + doSubscribe lookupResponse allowRetry = maybe + (pure . Left $ "No node found") + (\(foundHost, foundPort) -> do + let origin = "http://" <> Txt.pack (confServiceHost $ serviceConf serv) <> ":" <> Txt.pack (show (getListeningPortFromService serv :: Integer)) + resp <- runClientM (tagSubscribeClient tag (Just origin)) (mkClientEnv (httpMan serv) (BaseUrl Http foundHost (fromIntegral foundPort) "")) + case resp of + Left (FailureResponse _ fresp) + |(HTTPT.statusCode . responseStatusCode $ fresp) == 410 && allowRetry -> do -- responsibility gone, force new lookup + newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) + doSubscribe newRes False + Left err -> pure . Left . show $ err + Right lease -> pure . Right $ lease + ) + lookupResponse + -- currently this is unused code getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI