diff --git a/.github/workflows/object_store.yml b/.github/workflows/object_store.yml index c28f8037a307..1b991e33c097 100644 --- a/.github/workflows/object_store.yml +++ b/.github/workflows/object_store.yml @@ -126,7 +126,7 @@ jobs: # Give the container a moment to start up prior to configuring it sleep 1 curl -v -X POST --data-binary '{"name":"test-bucket"}' -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" - echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}' > "$GOOGLE_SERVICE_ACCOUNT" + echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": "", "private_key_id": ""}' > "$GOOGLE_SERVICE_ACCOUNT" - name: Setup WebDav run: docker run -d -p 8080:80 rclone/rclone serve webdav /data --addr :80 diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index ad21c33b8b9d..87f8e244f21c 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -17,10 +17,8 @@ use crate::client::retry::RetryExt; use crate::client::token::TemporaryToken; -use crate::client::{TokenCredentialProvider, TokenProvider}; -use crate::gcp::credential::Error::UnsupportedCredentialsType; -use crate::gcp::{GcpCredentialProvider, STORE}; -use crate::ClientOptions; +use crate::client::TokenProvider; +use crate::gcp::STORE; use crate::RetryConfig; use async_trait::async_trait; use base64::prelude::BASE64_URL_SAFE_NO_PAD; @@ -28,6 +26,7 @@ use base64::Engine; use futures::TryFutureExt; use reqwest::{Client, Method}; use ring::signature::RsaKeyPair; +use serde::Deserialize; use snafu::{ResultExt, Snafu}; use std::env; use std::fs::File; @@ -37,6 +36,10 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::info; +pub const DEFAULT_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.full_control"; + +pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com"; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Unable to open service account file from {}: {}", path.display(), source))] @@ -68,9 +71,6 @@ pub enum Error { #[snafu(display("Error getting token response body: {}", source))] TokenResponseBody { source: reqwest::Error }, - - #[snafu(display("Unsupported ApplicationCredentials type: {}", type_))] - UnsupportedCredentialsType { type_: String }, } impl From for crate::Error { @@ -92,48 +92,48 @@ pub struct GcpCredential { pub type Result = std::result::Result; #[derive(Debug, Default, serde::Serialize)] -pub struct JwtHeader { +pub struct JwtHeader<'a> { /// The type of JWS: it can only be "JWT" here /// /// Defined in [RFC7515#4.1.9](https://tools.ietf.org/html/rfc7515#section-4.1.9). #[serde(skip_serializing_if = "Option::is_none")] - pub typ: Option, + pub typ: Option<&'a str>, /// The algorithm used /// /// Defined in [RFC7515#4.1.1](https://tools.ietf.org/html/rfc7515#section-4.1.1). - pub alg: String, + pub alg: &'a str, /// Content type /// /// Defined in [RFC7519#5.2](https://tools.ietf.org/html/rfc7519#section-5.2). #[serde(skip_serializing_if = "Option::is_none")] - pub cty: Option, + pub cty: Option<&'a str>, /// JSON Key URL /// /// Defined in [RFC7515#4.1.2](https://tools.ietf.org/html/rfc7515#section-4.1.2). #[serde(skip_serializing_if = "Option::is_none")] - pub jku: Option, + pub jku: Option<&'a str>, /// Key ID /// /// Defined in [RFC7515#4.1.4](https://tools.ietf.org/html/rfc7515#section-4.1.4). #[serde(skip_serializing_if = "Option::is_none")] - pub kid: Option, + pub kid: Option<&'a str>, /// X.509 URL /// /// Defined in [RFC7515#4.1.5](https://tools.ietf.org/html/rfc7515#section-4.1.5). #[serde(skip_serializing_if = "Option::is_none")] - pub x5u: Option, + pub x5u: Option<&'a str>, /// X.509 certificate thumbprint /// /// Defined in [RFC7515#4.1.7](https://tools.ietf.org/html/rfc7515#section-4.1.7). #[serde(skip_serializing_if = "Option::is_none")] - pub x5t: Option, + pub x5t: Option<&'a str>, } #[derive(serde::Serialize)] struct TokenClaims<'a> { iss: &'a str, + sub: &'a str, scope: &'a str, - aud: &'a str, exp: u64, iat: u64, } @@ -144,28 +144,32 @@ struct TokenResponse { expires_in: u64, } -/// Encapsulates the logic to perform an OAuth token challenge +/// Self-signed JWT (JSON Web Token). +/// +/// # References +/// - #[derive(Debug)] -pub struct OAuthProvider { +pub struct SelfSignedJwt { issuer: String, scope: String, - audience: String, key_pair: RsaKeyPair, jwt_header: String, random: ring::rand::SystemRandom, } -impl OAuthProvider { - /// Create a new [`OAuthProvider`] +impl SelfSignedJwt { + /// Create a new [`SelfSignedJwt`] pub fn new( + key_id: String, issuer: String, private_key_pem: String, scope: String, - audience: String, ) -> Result { let key_pair = decode_first_rsa_key(private_key_pem)?; let jwt_header = b64_encode_obj(&JwtHeader { - alg: "RS256".to_string(), + alg: "RS256", + typ: Some("JWT"), + kid: Some(&key_id), ..Default::default() })?; @@ -173,7 +177,6 @@ impl OAuthProvider { issuer, key_pair, scope, - audience, jwt_header, random: ring::rand::SystemRandom::new(), }) @@ -181,24 +184,24 @@ impl OAuthProvider { } #[async_trait] -impl TokenProvider for OAuthProvider { +impl TokenProvider for SelfSignedJwt { type Credential = GcpCredential; /// Fetch a fresh token async fn fetch_token( &self, - client: &Client, - retry: &RetryConfig, + _client: &Client, + _retry: &RetryConfig, ) -> crate::Result>> { let now = seconds_since_epoch(); let exp = now + 3600; let claims = TokenClaims { iss: &self.issuer, + sub: &self.issuer, scope: &self.scope, - aud: &self.audience, - exp, iat: now, + exp, }; let claim_str = b64_encode_obj(&claims)?; @@ -214,28 +217,11 @@ impl TokenProvider for OAuthProvider { .context(SignSnafu)?; let signature = BASE64_URL_SAFE_NO_PAD.encode(sig_bytes); - let jwt = [message, signature].join("."); - - let body = [ - ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"), - ("assertion", &jwt), - ]; - - let response: TokenResponse = client - .request(Method::POST, &self.audience) - .form(&body) - .send_retry(retry) - .await - .context(TokenRequestSnafu)? - .json() - .await - .context(TokenResponseBodySnafu)?; + let bearer = [message, signature].join("."); Ok(TemporaryToken { - token: Arc::new(GcpCredential { - bearer: response.access_token, - }), - expiry: Some(Instant::now() + Duration::from_secs(response.expires_in)), + token: Arc::new(GcpCredential { bearer }), + expiry: Some(Instant::now() + Duration::from_secs(3600)), }) } } @@ -259,29 +245,24 @@ pub struct ServiceAccountCredentials { /// The private key in RSA format. pub private_key: String, + /// The private key ID + pub private_key_id: String, + /// The email address associated with the service account. pub client_email: String, /// Base URL for GCS - #[serde(default = "default_gcs_base_url")] - pub gcs_base_url: String, + #[serde(default)] + pub gcs_base_url: Option, /// Disable oauth and use empty tokens. - #[serde(default = "default_disable_oauth")] + #[serde(default)] pub disable_oauth: bool, } -pub fn default_gcs_base_url() -> String { - "https://storage.googleapis.com".to_owned() -} - -pub fn default_disable_oauth() -> bool { - false -} - impl ServiceAccountCredentials { /// Create a new [`ServiceAccountCredentials`] from a file. - pub fn from_file>(path: P) -> Result { + pub fn from_file>(path: P) -> Result { read_credentials_file(path) } @@ -290,17 +271,20 @@ impl ServiceAccountCredentials { serde_json::from_str(key).context(DecodeCredentialsSnafu) } - /// Create an [`OAuthProvider`] from this credentials struct. - pub fn oauth_provider( - self, - scope: &str, - audience: &str, - ) -> crate::Result { - Ok(OAuthProvider::new( + /// Create a [`SelfSignedJwt`] from this credentials struct. + /// + /// We use a scope of [`DEFAULT_SCOPE`] as opposed to an audience + /// as GCS appears to not support audience + /// + /// # References + /// - + /// - + pub fn token_provider(self) -> crate::Result { + Ok(SelfSignedJwt::new( + self.private_key_id, self.client_email, self.private_key, - scope.to_string(), - audience.to_string(), + DEFAULT_SCOPE.to_string(), )?) } } @@ -337,25 +321,13 @@ fn b64_encode_obj(obj: &T) -> Result { /// /// #[derive(Debug, Default)] -pub struct InstanceCredentialProvider { - audience: String, -} - -impl InstanceCredentialProvider { - /// Create a new [`InstanceCredentialProvider`], we need to control the client in order to enable http access so save the options. - pub fn new>(audience: T) -> Self { - Self { - audience: audience.into(), - } - } -} +pub struct InstanceCredentialProvider {} /// Make a request to the metadata server to fetch a token, using a a given hostname. async fn make_metadata_request( client: &Client, hostname: &str, retry: &RetryConfig, - audience: &str, ) -> crate::Result { let url = format!( "http://{hostname}/computeMetadata/v1/instance/service-accounts/default/token" @@ -363,7 +335,7 @@ async fn make_metadata_request( let response: TokenResponse = client .request(Method::GET, url) .header("Metadata-Flavor", "Google") - .query(&[("audience", audience)]) + .query(&[("audience", "https://www.googleapis.com/oauth2/v4/token")]) .send_retry(retry) .await .context(TokenRequestSnafu)? @@ -388,12 +360,9 @@ impl TokenProvider for InstanceCredentialProvider { const METADATA_HOST: &str = "metadata"; info!("fetching token from metadata server"); - let response = - make_metadata_request(client, METADATA_HOST, retry, &self.audience) - .or_else(|_| { - make_metadata_request(client, METADATA_IP, retry, &self.audience) - }) - .await?; + let response = make_metadata_request(client, METADATA_HOST, retry) + .or_else(|_| make_metadata_request(client, METADATA_IP, retry)) + .await?; let token = TemporaryToken { token: Arc::new(GcpCredential { bearer: response.access_token, @@ -404,62 +373,36 @@ impl TokenProvider for InstanceCredentialProvider { } } -/// ApplicationDefaultCredentials -/// -pub fn application_default_credentials( - path: Option<&str>, - client: &ClientOptions, - retry: &RetryConfig, -) -> crate::Result> { - let file = match ApplicationDefaultCredentialsFile::read(path)? { - Some(x) => x, - None => return Ok(None), - }; - - match file.type_.as_str() { - // - "authorized_user" => { - let token = AuthorizedUserCredentials { - client_id: file.client_id, - client_secret: file.client_secret, - refresh_token: file.refresh_token, - }; - - Ok(Some(Arc::new(TokenCredentialProvider::new( - token, - client.client()?, - retry.clone(), - )))) - } - type_ => Err(UnsupportedCredentialsType { - type_: type_.to_string(), - } - .into()), - } -} - /// A deserialized `application_default_credentials.json`-file. -/// +/// +/// # References +/// - +/// - #[derive(serde::Deserialize)] -struct ApplicationDefaultCredentialsFile { - #[serde(default)] - client_id: String, - #[serde(default)] - client_secret: String, - #[serde(default)] - refresh_token: String, - #[serde(rename = "type")] - type_: String, +#[serde(tag = "type")] +pub enum ApplicationDefaultCredentials { + /// Service Account. + /// + /// # References + /// - + #[serde(rename = "service_account")] + ServiceAccount(ServiceAccountCredentials), + /// Authorized user via "gcloud CLI Integration". + /// + /// # References + /// - + #[serde(rename = "authorized_user")] + AuthorizedUser(AuthorizedUserCredentials), } -impl ApplicationDefaultCredentialsFile { +impl ApplicationDefaultCredentials { const CREDENTIALS_PATH: &'static str = ".config/gcloud/application_default_credentials.json"; // Create a new application default credential in the following situations: // 1. a file is passed in and the type matches. // 2. without argument if the well-known configuration file is present. - fn read(path: Option<&str>) -> Result, Error> { + pub fn read(path: Option<&str>) -> Result, Error> { if let Some(path) = path { return read_credentials_file::(path).map(Some); } @@ -478,8 +421,8 @@ impl ApplicationDefaultCredentialsFile { const DEFAULT_TOKEN_GCP_URI: &str = "https://accounts.google.com/o/oauth2/token"; /// -#[derive(Debug)] -struct AuthorizedUserCredentials { +#[derive(Debug, Deserialize)] +pub struct AuthorizedUserCredentials { client_id: String, client_secret: String, refresh_token: String, diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index f8a16310dd1e..a75527fe7b9f 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -57,10 +57,7 @@ use crate::{ ObjectStore, Result, RetryConfig, }; -use credential::{ - application_default_credentials, default_gcs_base_url, InstanceCredentialProvider, - ServiceAccountCredentials, -}; +use credential::{InstanceCredentialProvider, ServiceAccountCredentials}; mod credential; @@ -68,6 +65,7 @@ const STORE: &str = "GCS"; /// [`CredentialProvider`] for [`GoogleCloudStorage`] pub type GcpCredentialProvider = Arc>; +use crate::gcp::credential::{ApplicationDefaultCredentials, DEFAULT_GCS_BASE_URL}; pub use credential::GcpCredential; #[derive(Debug, Snafu)] @@ -1034,10 +1032,8 @@ impl GoogleCloudStorageBuilder { }; // Then try to initialize from the application credentials file, or the environment. - let application_default_credentials = application_default_credentials( + let application_default_credentials = ApplicationDefaultCredentials::read( self.application_credentials_path.as_deref(), - &self.client_options, - &self.retry_config, )?; let disable_oauth = service_account_credentials @@ -1045,14 +1041,10 @@ impl GoogleCloudStorageBuilder { .map(|c| c.disable_oauth) .unwrap_or(false); - let gcs_base_url = service_account_credentials + let gcs_base_url: String = service_account_credentials .as_ref() - .map(|c| c.gcs_base_url.clone()) - .unwrap_or_else(default_gcs_base_url); - - // TODO: https://cloud.google.com/storage/docs/authentication#oauth-scopes - let scope = "https://www.googleapis.com/auth/devstorage.full_control"; - let audience = "https://www.googleapis.com/oauth2/v4/token"; + .and_then(|c| c.gcs_base_url.clone()) + .unwrap_or_else(|| DEFAULT_GCS_BASE_URL.to_string()); let credentials = if let Some(credentials) = self.credentials { credentials @@ -1062,15 +1054,30 @@ impl GoogleCloudStorageBuilder { })) as _ } else if let Some(credentials) = service_account_credentials { Arc::new(TokenCredentialProvider::new( - credentials.oauth_provider(scope, audience)?, + credentials.token_provider()?, self.client_options.client()?, self.retry_config.clone(), )) as _ } else if let Some(credentials) = application_default_credentials { - credentials + match credentials { + ApplicationDefaultCredentials::AuthorizedUser(token) => { + Arc::new(TokenCredentialProvider::new( + token, + self.client_options.client()?, + self.retry_config.clone(), + )) as _ + } + ApplicationDefaultCredentials::ServiceAccount(token) => { + Arc::new(TokenCredentialProvider::new( + token.token_provider()?, + self.client_options.client()?, + self.retry_config.clone(), + )) as _ + } + } } else { Arc::new(TokenCredentialProvider::new( - InstanceCredentialProvider::new(audience), + InstanceCredentialProvider::default(), self.client_options.metadata_client()?, self.retry_config.clone(), )) as _ @@ -1105,7 +1112,7 @@ mod test { use super::*; - const FAKE_KEY: &str = r#"{"private_key": "private_key", "client_email":"client_email", "disable_oauth":true}"#; + const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id": "private_key_id", "client_email":"client_email", "disable_oauth":true}"#; const NON_EXISTENT_NAME: &str = "nonexistentname"; #[tokio::test] @@ -1117,7 +1124,7 @@ mod test { list_uses_directories_correctly(&integration).await; list_with_delimiter(&integration).await; rename_and_copy(&integration).await; - if integration.client.base_url == default_gcs_base_url() { + if integration.client.base_url == DEFAULT_GCS_BASE_URL { // Fake GCS server doesn't currently honor ifGenerationMatch // https://github.com/fsouza/fake-gcs-server/issues/994 copy_if_not_exists(&integration).await;