From 405b787f540d0a09a5856c217032d6bc51759905 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 13 Oct 2023 10:48:23 +0100 Subject: [PATCH] Use SelfSignedJwt for Service Accounts --- object_store/src/gcp/credential.rs | 131 +++++++++++------------------ object_store/src/gcp/mod.rs | 22 +++-- 2 files changed, 59 insertions(+), 94 deletions(-) diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index 0cb79ba14268..2e89e917d17f 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -36,9 +36,9 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::info; -// TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes pub const DEFAULT_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.full_control"; -pub const DEFAULT_AUDIENCE: &str = "https://www.googleapis.com/oauth2/v4/token"; + +pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com"; #[derive(Debug, Snafu)] pub enum Error { @@ -92,48 +92,48 @@ pub struct GcpCredential { pub type Result = std::result::Result; #[derive(Debug, Default, serde::Serialize)] -pub struct JwtHeader { +pub struct JwtHeader<'a> { /// The type of JWS: it can only be "JWT" here /// /// Defined in [RFC7515#4.1.9](https://tools.ietf.org/html/rfc7515#section-4.1.9). #[serde(skip_serializing_if = "Option::is_none")] - pub typ: Option, + pub typ: Option<&'a str>, /// The algorithm used /// /// Defined in [RFC7515#4.1.1](https://tools.ietf.org/html/rfc7515#section-4.1.1). - pub alg: String, + pub alg: &'a str, /// Content type /// /// Defined in [RFC7519#5.2](https://tools.ietf.org/html/rfc7519#section-5.2). #[serde(skip_serializing_if = "Option::is_none")] - pub cty: Option, + pub cty: Option<&'a str>, /// JSON Key URL /// /// Defined in [RFC7515#4.1.2](https://tools.ietf.org/html/rfc7515#section-4.1.2). #[serde(skip_serializing_if = "Option::is_none")] - pub jku: Option, + pub jku: Option<&'a str>, /// Key ID /// /// Defined in [RFC7515#4.1.4](https://tools.ietf.org/html/rfc7515#section-4.1.4). #[serde(skip_serializing_if = "Option::is_none")] - pub kid: Option, + pub kid: Option<&'a str>, /// X.509 URL /// /// Defined in [RFC7515#4.1.5](https://tools.ietf.org/html/rfc7515#section-4.1.5). #[serde(skip_serializing_if = "Option::is_none")] - pub x5u: Option, + pub x5u: Option<&'a str>, /// X.509 certificate thumbprint /// /// Defined in [RFC7515#4.1.7](https://tools.ietf.org/html/rfc7515#section-4.1.7). #[serde(skip_serializing_if = "Option::is_none")] - pub x5t: Option, + pub x5t: Option<&'a str>, } #[derive(serde::Serialize)] struct TokenClaims<'a> { iss: &'a str, + sub: &'a str, scope: &'a str, - aud: &'a str, exp: u64, iat: u64, } @@ -144,28 +144,29 @@ struct TokenResponse { expires_in: u64, } -/// Encapsulates the logic to perform an OAuth token challenge +/// #[derive(Debug)] -pub struct OAuthProvider { +pub struct SelfSignedJwt { issuer: String, scope: String, - audience: String, key_pair: RsaKeyPair, jwt_header: String, random: ring::rand::SystemRandom, } -impl OAuthProvider { - /// Create a new [`OAuthProvider`] +impl SelfSignedJwt { + /// Create a new [`SelfSignedJwt`] pub fn new( + key_id: String, issuer: String, private_key_pem: String, scope: String, - audience: String, ) -> Result { let key_pair = decode_first_rsa_key(private_key_pem)?; let jwt_header = b64_encode_obj(&JwtHeader { - alg: "RS256".to_string(), + alg: "RS256", + typ: Some("JWT"), + kid: Some(&key_id), ..Default::default() })?; @@ -173,7 +174,6 @@ impl OAuthProvider { issuer, key_pair, scope, - audience, jwt_header, random: ring::rand::SystemRandom::new(), }) @@ -181,24 +181,24 @@ impl OAuthProvider { } #[async_trait] -impl TokenProvider for OAuthProvider { +impl TokenProvider for SelfSignedJwt { type Credential = GcpCredential; /// Fetch a fresh token async fn fetch_token( &self, - client: &Client, - retry: &RetryConfig, + _client: &Client, + _retry: &RetryConfig, ) -> crate::Result>> { let now = seconds_since_epoch(); let exp = now + 3600; let claims = TokenClaims { iss: &self.issuer, + sub: &self.issuer, scope: &self.scope, - aud: &self.audience, - exp, iat: now, + exp, }; let claim_str = b64_encode_obj(&claims)?; @@ -214,30 +214,11 @@ impl TokenProvider for OAuthProvider { .context(SignSnafu)?; let signature = BASE64_URL_SAFE_NO_PAD.encode(sig_bytes); - let jwt = [message, signature].join("."); - - let body = [ - ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"), - ("assertion", &jwt), - ]; - - let response: TokenResponse = client - .request(Method::POST, &self.audience) - .form(&body) - .send_retry(retry) - .await - .context(TokenRequestSnafu)? - .json() - .await - .context(TokenResponseBodySnafu)?; - - println!("{}", response.access_token); + let bearer = [message, signature].join("."); Ok(TemporaryToken { - token: Arc::new(GcpCredential { - bearer: response.access_token, - }), - expiry: Some(Instant::now() + Duration::from_secs(response.expires_in)), + token: Arc::new(GcpCredential { bearer }), + expiry: Some(Instant::now() + Duration::from_secs(3600)), }) } } @@ -261,29 +242,24 @@ pub struct ServiceAccountCredentials { /// The private key in RSA format. pub private_key: String, + /// The private key ID + pub private_key_id: String, + /// The email address associated with the service account. pub client_email: String, /// Base URL for GCS - #[serde(default = "default_gcs_base_url")] - pub gcs_base_url: String, + #[serde(default)] + pub gcs_base_url: Option, /// Disable oauth and use empty tokens. - #[serde(default = "default_disable_oauth")] + #[serde(default)] pub disable_oauth: bool, } -pub fn default_gcs_base_url() -> String { - "https://storage.googleapis.com".to_owned() -} - -pub fn default_disable_oauth() -> bool { - false -} - impl ServiceAccountCredentials { /// Create a new [`ServiceAccountCredentials`] from a file. - pub fn from_file>(path: P) -> Result { + pub fn from_file>(path: P) -> Result { read_credentials_file(path) } @@ -292,13 +268,19 @@ impl ServiceAccountCredentials { serde_json::from_str(key).context(DecodeCredentialsSnafu) } - /// Create an [`OAuthProvider`] from this credentials struct. - pub fn oauth_provider(self) -> crate::Result { - Ok(OAuthProvider::new( + /// Create a [`SelfSignedJwt`] from this credentials struct. + /// + /// We use a scope of [`DEFAULT_SCOPE`] as opposed to an audience + /// as GCS appears to not support audience + /// + /// + /// + pub fn token_provider(self) -> crate::Result { + Ok(SelfSignedJwt::new( + self.private_key_id, self.client_email, self.private_key, DEFAULT_SCOPE.to_string(), - DEFAULT_AUDIENCE.to_string(), )?) } } @@ -335,25 +317,13 @@ fn b64_encode_obj(obj: &T) -> Result { /// /// #[derive(Debug, Default)] -pub struct InstanceCredentialProvider { - audience: String, -} - -impl InstanceCredentialProvider { - /// Create a new [`InstanceCredentialProvider`], we need to control the client in order to enable http access so save the options. - pub fn new>(audience: T) -> Self { - Self { - audience: audience.into(), - } - } -} +pub struct InstanceCredentialProvider {} /// Make a request to the metadata server to fetch a token, using a a given hostname. async fn make_metadata_request( client: &Client, hostname: &str, retry: &RetryConfig, - audience: &str, ) -> crate::Result { let url = format!( "http://{hostname}/computeMetadata/v1/instance/service-accounts/default/token" @@ -361,7 +331,7 @@ async fn make_metadata_request( let response: TokenResponse = client .request(Method::GET, url) .header("Metadata-Flavor", "Google") - .query(&[("audience", audience)]) + .query(&[("audience", "https://www.googleapis.com/oauth2/v4/token")]) .send_retry(retry) .await .context(TokenRequestSnafu)? @@ -386,12 +356,9 @@ impl TokenProvider for InstanceCredentialProvider { const METADATA_HOST: &str = "metadata"; info!("fetching token from metadata server"); - let response = - make_metadata_request(client, METADATA_HOST, retry, &self.audience) - .or_else(|_| { - make_metadata_request(client, METADATA_IP, retry, &self.audience) - }) - .await?; + let response = make_metadata_request(client, METADATA_HOST, retry) + .or_else(|_| make_metadata_request(client, METADATA_IP, retry)) + .await?; let token = TemporaryToken { token: Arc::new(GcpCredential { bearer: response.access_token, diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 6fdad18b68f9..3c583c67039f 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -57,9 +57,7 @@ use crate::{ ObjectStore, Result, RetryConfig, }; -use credential::{ - default_gcs_base_url, InstanceCredentialProvider, ServiceAccountCredentials, -}; +use credential::{InstanceCredentialProvider, ServiceAccountCredentials}; mod credential; @@ -67,7 +65,7 @@ const STORE: &str = "GCS"; /// [`CredentialProvider`] for [`GoogleCloudStorage`] pub type GcpCredentialProvider = Arc>; -use crate::gcp::credential::{ApplicationDefaultCredentials, DEFAULT_AUDIENCE}; +use crate::gcp::credential::{ApplicationDefaultCredentials, DEFAULT_GCS_BASE_URL}; pub use credential::GcpCredential; #[derive(Debug, Snafu)] @@ -1052,10 +1050,10 @@ impl GoogleCloudStorageBuilder { .map(|c| c.disable_oauth) .unwrap_or(false); - let gcs_base_url = service_account_credentials + let gcs_base_url: String = service_account_credentials .as_ref() - .map(|c| c.gcs_base_url.clone()) - .unwrap_or_else(default_gcs_base_url); + .and_then(|c| c.gcs_base_url.clone()) + .unwrap_or_else(|| DEFAULT_GCS_BASE_URL.to_string()); let credentials = if let Some(credentials) = self.credentials { credentials @@ -1065,7 +1063,7 @@ impl GoogleCloudStorageBuilder { })) as _ } else if let Some(credentials) = service_account_credentials { Arc::new(TokenCredentialProvider::new( - credentials.oauth_provider()?, + credentials.token_provider()?, self.client_options.client()?, self.retry_config.clone(), )) as _ @@ -1080,7 +1078,7 @@ impl GoogleCloudStorageBuilder { } ApplicationDefaultCredentials::ServiceAccount(token) => { Arc::new(TokenCredentialProvider::new( - token.oauth_provider()?, + token.token_provider()?, self.client_options.client()?, self.retry_config.clone(), )) as _ @@ -1088,7 +1086,7 @@ impl GoogleCloudStorageBuilder { } } else { Arc::new(TokenCredentialProvider::new( - InstanceCredentialProvider::new(DEFAULT_AUDIENCE), + InstanceCredentialProvider::default(), self.client_options.clone().with_allow_http(true).client()?, self.retry_config.clone(), )) as _ @@ -1123,7 +1121,7 @@ mod test { use super::*; - const FAKE_KEY: &str = r#"{"private_key": "private_key", "client_email":"client_email", "disable_oauth":true}"#; + const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id": "private_key_id", "client_email":"client_email", "disable_oauth":true}"#; const NON_EXISTENT_NAME: &str = "nonexistentname"; #[tokio::test] @@ -1135,7 +1133,7 @@ mod test { list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; - if integration.client.base_url == default_gcs_base_url() { + if integration.client.base_url == DEFAULT_GCS_BASE_URL { // Fake GCS server doesn't currently honor ifGenerationMatch // https://github.com/fsouza/fake-gcs-server/issues/994 copy_if_not_exists(&integration).await;