diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index fce5fe2be..96da5dc95 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -256,9 +256,10 @@ impl RestCatalog { async fn context(&self) -> Result<&RestContext> { self.ctx .get_or_try_init(|| async { - let catalog_config = RestCatalog::load_config(&self.user_config).await?; + let client = HttpClient::new(&self.user_config)?; + let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?; let config = self.user_config.clone().merge_with_config(catalog_config); - let client = HttpClient::new(&config)?; + let client = client.update_with(&config)?; Ok(RestContext { config, client }) }) @@ -268,9 +269,10 @@ impl RestCatalog { /// Load the runtime config from the server by user_config. /// /// It's required for a rest catalog to update it's config after creation. - async fn load_config(user_config: &RestCatalogConfig) -> Result { - let client = HttpClient::new(user_config)?; - + async fn load_config( + client: &HttpClient, + user_config: &RestCatalogConfig, + ) -> Result { let mut request = client.request(Method::GET, user_config.config_endpoint()); if let Some(warehouse_location) = &user_config.warehouse { @@ -280,6 +282,7 @@ impl RestCatalog { let config = client .query::(request.build()?) .await?; + Ok(config) } @@ -777,7 +780,7 @@ mod tests { "expires_in": 86400 }"#, ) - .expect(2) + .expect(1) .create_async() .await } @@ -831,7 +834,7 @@ mod tests { "expires_in": 86400 }"#, ) - .expect(2) + .expect(1) .create_async() .await; diff --git a/crates/catalog/rest/src/client.rs b/crates/catalog/rest/src/client.rs index 7027edef8..e06090134 100644 --- a/crates/catalog/rest/src/client.rs +++ b/crates/catalog/rest/src/client.rs @@ -54,6 +54,7 @@ impl Debug for HttpClient { } impl HttpClient { + /// Create a new http client. pub fn new(cfg: &RestCatalogConfig) -> Result { Ok(HttpClient { client: Client::new(), @@ -66,6 +67,32 @@ impl HttpClient { }) } + /// Update the http client with new configuration. + /// + /// If cfg carries new value, we will use cfg instead. + /// Otherwise, we will keep the old value. + pub fn update_with(self, cfg: &RestCatalogConfig) -> Result { + Ok(HttpClient { + client: self.client, + + token: Mutex::new( + cfg.token() + .or_else(|| self.token.into_inner().ok().flatten()), + ), + token_endpoint: (!cfg.get_token_endpoint().is_empty()) + .then(|| cfg.get_token_endpoint()) + .unwrap_or(self.token_endpoint), + credential: cfg.credential().or(self.credential), + extra_headers: (!cfg.extra_headers()?.is_empty()) + .then(|| cfg.extra_headers()) + .transpose()? + .unwrap_or(self.extra_headers), + extra_oauth_params: (!cfg.extra_oauth_params().is_empty()) + .then(|| cfg.extra_oauth_params()) + .unwrap_or(self.extra_oauth_params), + }) + } + /// This API is testing only to assert the token. #[cfg(test)] pub(crate) async fn token(&self) -> Option {