From 5a0baf1aeeead4c461f1c8d14bfc65c85cfff78e Mon Sep 17 00:00:00 2001 From: Yu Zeng Date: Thu, 4 Apr 2024 23:23:18 +0800 Subject: [PATCH] Add GCS signed URL support (#5300) * add util function for gcp sign url * add string to sign and other sign functions * add GoogleCloudStorageConfig::new and config and move functions to client * add more code and rearrange struct * add client_email for credential and return the signed url * clean some code * add client email for AuthorizedUserCredentials * tidy some code * format doc * Add GcpSigningCredentialProvider for getting email * add test * Move some functions which shared by aws and gcp to utils. * fix some bug and make it can get proper result * remoe useless code * tidy some code * do not export host * add sign_by_key * Cleanup * Add ServiceAccountKey * Further tweaks * add more scope for signing. * tidy * Tweak and add test * Retry and handle errors for signBlob --------- Co-authored-by: Raphael Taylor-Davies --- object_store/src/aws/credential.rs | 19 +- object_store/src/aws/mod.rs | 11 +- object_store/src/gcp/builder.rs | 55 +++- object_store/src/gcp/client.rs | 105 ++++++- object_store/src/gcp/credential.rs | 465 ++++++++++++++++++++++++++--- object_store/src/gcp/mod.rs | 74 ++++- object_store/src/util.rs | 29 ++ 7 files changed, 677 insertions(+), 81 deletions(-) diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index dd7fa5b41da3..478e56dd09c2 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -19,7 +19,7 @@ use crate::aws::{AwsCredentialProvider, STORE, STRICT_ENCODE_SET, STRICT_PATH_EN use crate::client::retry::RetryExt; use crate::client::token::{TemporaryToken, TokenCache}; use crate::client::TokenProvider; -use crate::util::hmac_sha256; +use crate::util::{hex_digest, hex_encode, hmac_sha256}; use crate::{CredentialProvider, Result, RetryConfig}; use async_trait::async_trait; use bytes::Buf; @@ -342,23 +342,6 @@ impl CredentialExt for RequestBuilder { } } -/// Computes the SHA256 digest of `body` returned as a hex encoded string -fn hex_digest(bytes: &[u8]) -> String { - let digest = ring::digest::digest(&ring::digest::SHA256, bytes); - hex_encode(digest.as_ref()) -} - -/// Returns `bytes` as a lower-case hex encoded string -fn hex_encode(bytes: &[u8]) -> String { - use std::fmt::Write; - let mut out = String::with_capacity(bytes.len() * 2); - for byte in bytes { - // String writing is infallible - let _ = write!(out, "{byte:02x}"); - } - out -} - /// Canonicalizes query parameters into the AWS canonical form /// /// diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b33771de9a86..76d01d597042 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -43,6 +43,7 @@ use crate::client::list::ListClientExt; use crate::client::CredentialProvider; use crate::multipart::{MultipartStore, PartId}; use crate::signer::Signer; +use crate::util::STRICT_ENCODE_SET; use crate::{ Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart, @@ -64,16 +65,6 @@ pub use dynamo::DynamoCommit; pub use precondition::{S3ConditionalPut, S3CopyIfNotExists}; pub use resolve::resolve_bucket_region; -// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html -// -// Do not URI-encode any of the unreserved characters that RFC 3986 defines: -// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ). -pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC - .remove(b'-') - .remove(b'.') - .remove(b'_') - .remove(b'~'); - /// This struct is used to maintain the URI path encoding const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/'); diff --git a/object_store/src/gcp/builder.rs b/object_store/src/gcp/builder.rs index 2cf75040b858..4fa91677aa7a 100644 --- a/object_store/src/gcp/builder.rs +++ b/object_store/src/gcp/builder.rs @@ -21,7 +21,10 @@ use crate::gcp::credential::{ ApplicationDefaultCredentials, InstanceCredentialProvider, ServiceAccountCredentials, DEFAULT_GCS_BASE_URL, }; -use crate::gcp::{credential, GcpCredential, GcpCredentialProvider, GoogleCloudStorage, STORE}; +use crate::gcp::{ + credential, GcpCredential, GcpCredentialProvider, GcpSigningCredential, + GcpSigningCredentialProvider, GoogleCloudStorage, STORE, +}; use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; @@ -29,6 +32,8 @@ use std::str::FromStr; use std::sync::Arc; use url::Url; +use super::credential::{AuthorizedUserSigningCredentials, InstanceSigningCredentialProvider}; + #[derive(Debug, Snafu)] enum Error { #[snafu(display("Missing bucket name"))] @@ -107,6 +112,8 @@ pub struct GoogleCloudStorageBuilder { client_options: ClientOptions, /// Credentials credentials: Option, + /// Credentials for sign url + signing_cedentials: Option, } /// Configuration keys for [`GoogleCloudStorageBuilder`] @@ -202,6 +209,7 @@ impl Default for GoogleCloudStorageBuilder { client_options: ClientOptions::new().with_allow_http(true), url: None, credentials: None, + signing_cedentials: None, } } } @@ -452,13 +460,13 @@ impl GoogleCloudStorageBuilder { Arc::new(StaticCredentialProvider::new(GcpCredential { bearer: "".to_string(), })) as _ - } else if let Some(credentials) = service_account_credentials { + } else if let Some(credentials) = service_account_credentials.clone() { Arc::new(TokenCredentialProvider::new( credentials.token_provider()?, self.client_options.client()?, self.retry_config.clone(), )) as _ - } else if let Some(credentials) = application_default_credentials { + } else if let Some(credentials) = application_default_credentials.clone() { match credentials { ApplicationDefaultCredentials::AuthorizedUser(token) => { Arc::new(TokenCredentialProvider::new( @@ -483,13 +491,44 @@ impl GoogleCloudStorageBuilder { )) as _ }; - let config = GoogleCloudStorageConfig { - base_url: gcs_base_url, + let signing_credentials = if let Some(signing_credentials) = self.signing_cedentials { + signing_credentials + } else if disable_oauth { + Arc::new(StaticCredentialProvider::new(GcpSigningCredential { + email: "".to_string(), + private_key: None, + })) as _ + } else if let Some(credentials) = service_account_credentials.clone() { + credentials.signing_credentials()? + } else if let Some(credentials) = application_default_credentials.clone() { + match credentials { + ApplicationDefaultCredentials::AuthorizedUser(token) => { + Arc::new(TokenCredentialProvider::new( + AuthorizedUserSigningCredentials::from(token)?, + self.client_options.client()?, + self.retry_config.clone(), + )) as _ + } + ApplicationDefaultCredentials::ServiceAccount(token) => { + token.signing_credentials()? + } + } + } else { + Arc::new(TokenCredentialProvider::new( + InstanceSigningCredentialProvider::default(), + self.client_options.metadata_client()?, + self.retry_config.clone(), + )) as _ + }; + + let config = GoogleCloudStorageConfig::new( + gcs_base_url, credentials, + signing_credentials, bucket_name, - retry_config: self.retry_config, - client_options: self.client_options, - }; + self.retry_config, + self.client_options, + ); Ok(GoogleCloudStorage { client: Arc::new(GoogleCloudStorageClient::new(config)?), diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index def53beefe78..901257f917cf 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -24,19 +24,22 @@ use crate::client::s3::{ ListResponse, }; use crate::client::GetOptionsExt; -use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE}; +use crate::gcp::{GcpCredential, GcpCredentialProvider, GcpSigningCredentialProvider, STORE}; use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; +use crate::util::hex_encode; use crate::{ ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, PutResult, Result, RetryConfig, }; use async_trait::async_trait; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; use bytes::{Buf, Bytes}; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::header::HeaderName; use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use std::sync::Arc; @@ -101,6 +104,15 @@ enum Error { #[snafu(display("Got invalid multipart response: {}", source))] InvalidMultipartResponse { source: quick_xml::de::DeError }, + + #[snafu(display("Error signing blob: {}", source))] + SignBlobRequest { source: crate::client::retry::Error }, + + #[snafu(display("Got invalid signing blob repsonse: {}", source))] + InvalidSignBlobResponse { source: reqwest::Error }, + + #[snafu(display("Got invalid signing blob signature: {}", source))] + InvalidSignBlobSignature { source: base64::DecodeError }, } impl From for crate::Error { @@ -123,6 +135,8 @@ pub struct GoogleCloudStorageConfig { pub credentials: GcpCredentialProvider, + pub signing_credentials: GcpSigningCredentialProvider, + pub bucket_name: String, pub retry_config: RetryConfig, @@ -130,6 +144,30 @@ pub struct GoogleCloudStorageConfig { pub client_options: ClientOptions, } +impl GoogleCloudStorageConfig { + pub fn new( + base_url: String, + credentials: GcpCredentialProvider, + signing_credentials: GcpSigningCredentialProvider, + bucket_name: String, + retry_config: RetryConfig, + client_options: ClientOptions, + ) -> Self { + Self { + base_url, + credentials, + signing_credentials, + bucket_name, + retry_config, + client_options, + } + } + + pub fn path_url(&self, path: &Path) -> String { + format!("{}/{}/{}", self.base_url, self.bucket_name, path) + } +} + /// A builder for a put request allowing customisation of the headers and query string pub struct PutRequest<'a> { path: &'a Path, @@ -163,6 +201,21 @@ impl<'a> PutRequest<'a> { } } +/// Sign Blob Request Body +#[derive(Debug, Serialize)] +struct SignBlobBody { + /// The payload to sign + payload: String, +} + +/// Sign Blob Response +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct SignBlobResponse { + /// The signature for the payload + signed_blob: String, +} + #[derive(Debug)] pub struct GoogleCloudStorageClient { config: GoogleCloudStorageConfig, @@ -197,6 +250,54 @@ impl GoogleCloudStorageClient { self.config.credentials.get_credential().await } + /// Create a signature from a string-to-sign using Google Cloud signBlob method. + /// form like: + /// ```plaintext + /// curl -X POST --data-binary @JSON_FILE_NAME \ + /// -H "Authorization: Bearer OAUTH2_TOKEN" \ + /// -H "Content-Type: application/json" \ + /// "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT_EMAIL:signBlob" + /// ``` + /// + /// 'JSON_FILE_NAME' is a file containing the following JSON object: + /// ```plaintext + /// { + /// "payload": "REQUEST_INFORMATION" + /// } + /// ``` + pub async fn sign_blob(&self, string_to_sign: &str, client_email: &str) -> Result { + let credential = self.get_credential().await?; + let body = SignBlobBody { + payload: BASE64_STANDARD.encode(string_to_sign), + }; + + let url = format!( + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob", + client_email + ); + + let response = self + .client + .post(&url) + .bearer_auth(&credential.bearer) + .json(&body) + .send_retry(&self.config.retry_config) + .await + .context(SignBlobRequestSnafu)?; + + //If successful, the signature is returned in the signedBlob field in the response. + let response = response + .json::() + .await + .context(InvalidSignBlobResponseSnafu)?; + + let signed_blob = BASE64_STANDARD + .decode(response.signed_blob) + .context(InvalidSignBlobSignatureSnafu)?; + + Ok(hex_encode(&signed_blob)) + } + pub fn object_url(&self, path: &Path) -> String { let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC); format!( diff --git a/object_store/src/gcp/credential.rs b/object_store/src/gcp/credential.rs index 34cd6eeb6ea4..fcd516a1bf1a 100644 --- a/object_store/src/gcp/credential.rs +++ b/object_store/src/gcp/credential.rs @@ -15,19 +15,26 @@ // specific language governing permissions and limitations // under the License. +use super::client::GoogleCloudStorageClient; use crate::client::retry::RetryExt; use crate::client::token::TemporaryToken; use crate::client::TokenProvider; -use crate::gcp::STORE; -use crate::RetryConfig; +use crate::gcp::{GcpSigningCredentialProvider, STORE}; +use crate::util::{hex_digest, hex_encode, STRICT_ENCODE_SET}; +use crate::{RetryConfig, StaticCredentialProvider}; use async_trait::async_trait; use base64::prelude::BASE64_URL_SAFE_NO_PAD; use base64::Engine; +use chrono::{DateTime, Utc}; use futures::TryFutureExt; +use hyper::HeaderMap; +use itertools::Itertools; +use percent_encoding::utf8_percent_encode; use reqwest::{Client, Method}; use ring::signature::RsaKeyPair; use serde::Deserialize; use snafu::{ResultExt, Snafu}; +use std::collections::BTreeMap; use std::env; use std::fs::File; use std::io::BufReader; @@ -35,11 +42,15 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; use tracing::info; +use url::Url; -pub const DEFAULT_SCOPE: &str = "https://www.googleapis.com/auth/devstorage.full_control"; +pub const DEFAULT_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform"; pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com"; +const DEFAULT_GCS_PLAYLOAD_STRING: &str = "UNSIGNED-PAYLOAD"; +const DEFAULT_GCS_SIGN_BLOB_HOST: &str = "storage.googleapis.com"; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Unable to open service account file from {}: {}", path.display(), source))] @@ -57,7 +68,7 @@ pub enum Error { #[snafu(display("Invalid RSA key: {}", source), context(false))] InvalidKey { source: ring::error::KeyRejected }, - #[snafu(display("Error signing jwt: {}", source))] + #[snafu(display("Error signing: {}", source))] Sign { source: ring::error::Unspecified }, #[snafu(display("Error encoding jwt payload: {}", source))] @@ -82,6 +93,69 @@ impl From for crate::Error { } } +/// A Google Cloud Storage Credential for signing +#[derive(Debug)] +pub struct GcpSigningCredential { + /// The email of the service account + pub email: String, + + /// An optional RSA private key + /// + /// If provided this will be used to sign the URL, otherwise a call will be made to + /// [`iam.serviceAccounts.signBlob`]. This allows supporting credential sources + /// that don't expose the service account private key, e.g. [IMDS]. + /// + /// [IMDS]: https://cloud.google.com/docs/authentication/get-id-token#metadata-server + /// [`iam.serviceAccounts.signBlob`]: https://cloud.google.com/storage/docs/authentication/creating-signatures + pub private_key: Option, +} + +/// A private RSA key for a service account +#[derive(Debug)] +pub struct ServiceAccountKey(RsaKeyPair); + +impl ServiceAccountKey { + /// Parses a pem-encoded RSA key + pub fn from_pem(encoded: &[u8]) -> Result { + use rustls_pemfile::Item; + use std::io::Cursor; + + let mut cursor = Cursor::new(encoded); + let mut reader = BufReader::new(&mut cursor); + + // Reading from string is infallible + match rustls_pemfile::read_one(&mut reader).unwrap() { + Some(Item::Pkcs8Key(key)) => Self::from_pkcs8(key.secret_pkcs8_der()), + Some(Item::Pkcs1Key(key)) => Self::from_der(key.secret_pkcs1_der()), + _ => Err(Error::MissingKey), + } + } + + /// Parses an unencrypted PKCS#8-encoded RSA private key. + pub fn from_pkcs8(key: &[u8]) -> Result { + Ok(Self(RsaKeyPair::from_pkcs8(key)?)) + } + + /// Parses an unencrypted PKCS#8-encoded RSA private key. + pub fn from_der(key: &[u8]) -> Result { + Ok(Self(RsaKeyPair::from_der(key)?)) + } + + fn sign(&self, string_to_sign: &str) -> Result { + let mut signature = vec![0; self.0.public().modulus_len()]; + self.0 + .sign( + &ring::signature::RSA_PKCS1_SHA256, + &ring::rand::SystemRandom::new(), + string_to_sign.as_bytes(), + &mut signature, + ) + .context(SignSnafu)?; + + Ok(hex_encode(&signature)) + } +} + /// A Google Cloud Storage Credential #[derive(Debug, Eq, PartialEq)] pub struct GcpCredential { @@ -152,9 +226,8 @@ struct TokenResponse { pub struct SelfSignedJwt { issuer: String, scope: String, - key_pair: RsaKeyPair, - jwt_header: String, - random: ring::rand::SystemRandom, + private_key: ServiceAccountKey, + key_id: String, } impl SelfSignedJwt { @@ -162,23 +235,14 @@ impl SelfSignedJwt { pub fn new( key_id: String, issuer: String, - private_key_pem: String, + private_key: ServiceAccountKey, scope: String, ) -> Result { - let key_pair = decode_first_rsa_key(private_key_pem)?; - let jwt_header = b64_encode_obj(&JwtHeader { - alg: "RS256", - typ: Some("JWT"), - kid: Some(&key_id), - ..Default::default() - })?; - Ok(Self { issuer, - key_pair, scope, - jwt_header, - random: ring::rand::SystemRandom::new(), + private_key, + key_id, }) } } @@ -204,13 +268,21 @@ impl TokenProvider for SelfSignedJwt { exp, }; + let jwt_header = b64_encode_obj(&JwtHeader { + alg: "RS256", + typ: Some("JWT"), + kid: Some(&self.key_id), + ..Default::default() + })?; + let claim_str = b64_encode_obj(&claims)?; - let message = [self.jwt_header.as_ref(), claim_str.as_ref()].join("."); - let mut sig_bytes = vec![0; self.key_pair.public().modulus_len()]; - self.key_pair + let message = [jwt_header.as_ref(), claim_str.as_ref()].join("."); + let mut sig_bytes = vec![0; self.private_key.0.public().modulus_len()]; + self.private_key + .0 .sign( &ring::signature::RSA_PKCS1_SHA256, - &self.random, + &ring::rand::SystemRandom::new(), message.as_bytes(), &mut sig_bytes, ) @@ -238,7 +310,7 @@ where } /// A deserialized `service-account-********.json`-file. -#[derive(serde::Deserialize, Debug)] +#[derive(serde::Deserialize, Debug, Clone)] pub struct ServiceAccountCredentials { /// The private key in RSA format. pub private_key: String, @@ -281,10 +353,19 @@ impl ServiceAccountCredentials { Ok(SelfSignedJwt::new( self.private_key_id, self.client_email, - self.private_key, + ServiceAccountKey::from_pem(self.private_key.as_bytes())?, DEFAULT_SCOPE.to_string(), )?) } + + pub fn signing_credentials(self) -> crate::Result { + Ok(Arc::new(StaticCredentialProvider::new( + GcpSigningCredential { + email: self.client_email, + private_key: Some(ServiceAccountKey::from_pem(self.private_key.as_bytes())?), + }, + ))) + } } /// Returns the number of seconds since unix epoch @@ -295,21 +376,6 @@ fn seconds_since_epoch() -> u64 { .as_secs() } -fn decode_first_rsa_key(private_key_pem: String) -> Result { - use rustls_pemfile::Item; - use std::io::Cursor; - - let mut cursor = Cursor::new(private_key_pem); - let mut reader = BufReader::new(&mut cursor); - - // Reading from string is infallible - match rustls_pemfile::read_one(&mut reader).unwrap() { - Some(Item::Pkcs8Key(key)) => Ok(RsaKeyPair::from_pkcs8(key.secret_pkcs8_der())?), - Some(Item::Pkcs1Key(key)) => Ok(RsaKeyPair::from_der(key.secret_pkcs1_der())?), - _ => Err(Error::MissingKey), - } -} - fn b64_encode_obj(obj: &T) -> Result { let string = serde_json::to_string(obj).context(EncodeSnafu)?; Ok(BASE64_URL_SAFE_NO_PAD.encode(string)) @@ -360,6 +426,7 @@ impl TokenProvider for InstanceCredentialProvider { let response = make_metadata_request(client, METADATA_HOST, retry) .or_else(|_| make_metadata_request(client, METADATA_IP, retry)) .await?; + let token = TemporaryToken { token: Arc::new(GcpCredential { bearer: response.access_token, @@ -370,12 +437,69 @@ impl TokenProvider for InstanceCredentialProvider { } } +/// Make a request to the metadata server to fetch the client email, using a given hostname. +async fn make_metadata_request_for_email( + client: &Client, + hostname: &str, + retry: &RetryConfig, +) -> crate::Result { + let url = + format!("http://{hostname}/computeMetadata/v1/instance/service-accounts/default/email",); + let response = client + .request(Method::GET, url) + .header("Metadata-Flavor", "Google") + .send_retry(retry) + .await + .context(TokenRequestSnafu)? + .text() + .await + .context(TokenResponseBodySnafu)?; + Ok(response) +} + +/// A provider that uses the Google Cloud Platform metadata server to fetch a email for signing. +/// +/// +#[derive(Debug, Default)] +pub struct InstanceSigningCredentialProvider {} + +#[async_trait] +impl TokenProvider for InstanceSigningCredentialProvider { + type Credential = GcpSigningCredential; + + /// Fetch a token from the metadata server. + /// Since the connection is local we need to enable http access and don't actually use the client object passed in. + async fn fetch_token( + &self, + client: &Client, + retry: &RetryConfig, + ) -> crate::Result>> { + const METADATA_IP: &str = "169.254.169.254"; + const METADATA_HOST: &str = "metadata"; + + info!("fetching token from metadata server"); + + let email = make_metadata_request_for_email(client, METADATA_HOST, retry) + .or_else(|_| make_metadata_request_for_email(client, METADATA_IP, retry)) + .await?; + + let token = TemporaryToken { + token: Arc::new(GcpSigningCredential { + email, + private_key: None, + }), + expiry: None, + }; + Ok(token) + } +} + /// A deserialized `application_default_credentials.json`-file. /// /// # References /// - /// - -#[derive(serde::Deserialize)] +#[derive(serde::Deserialize, Clone)] #[serde(tag = "type")] pub enum ApplicationDefaultCredentials { /// Service Account. @@ -423,13 +547,65 @@ impl ApplicationDefaultCredentials { const DEFAULT_TOKEN_GCP_URI: &str = "https://accounts.google.com/o/oauth2/token"; /// -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct AuthorizedUserCredentials { client_id: String, client_secret: String, refresh_token: String, } +#[derive(Debug, Deserialize)] +pub struct AuthorizedUserSigningCredentials { + credential: AuthorizedUserCredentials, +} + +/// +#[derive(Debug, Deserialize)] +struct EmailResponse { + email: String, +} + +impl AuthorizedUserSigningCredentials { + pub fn from(credential: AuthorizedUserCredentials) -> crate::Result { + Ok(Self { credential }) + } + + async fn client_email(&self, client: &Client, retry: &RetryConfig) -> crate::Result { + let response = client + .request(Method::GET, "https://oauth2.googleapis.com/tokeninfo") + .query(&[("access_token", &self.credential.refresh_token)]) + .send_retry(retry) + .await + .context(TokenRequestSnafu)? + .json::() + .await + .context(TokenResponseBodySnafu)?; + + Ok(response.email) + } +} + +#[async_trait] +impl TokenProvider for AuthorizedUserSigningCredentials { + type Credential = GcpSigningCredential; + + async fn fetch_token( + &self, + client: &Client, + retry: &RetryConfig, + ) -> crate::Result>> { + let email = self.client_email(client, retry).await?; + + Ok(TemporaryToken { + token: Arc::new(GcpSigningCredential { + email, + private_key: None, + }), + expiry: None, + }) + } +} + #[async_trait] impl TokenProvider for AuthorizedUserCredentials { type Credential = GcpCredential; @@ -462,3 +638,208 @@ impl TokenProvider for AuthorizedUserCredentials { }) } } + +/// Trim whitespace from header values +fn trim_header_value(value: &str) -> String { + let mut ret = value.to_string(); + ret.retain(|c| !c.is_whitespace()); + ret +} + +/// A Google Cloud Storage Authorizer for generating signed URL using [Google SigV4] +/// +/// [Google SigV4]: https://cloud.google.com/storage/docs/access-control/signed-urls +#[derive(Debug)] +pub struct GCSAuthorizer { + date: Option>, + credential: Arc, +} + +impl GCSAuthorizer { + /// Create a new [`GCSAuthorizer`] + pub fn new(credential: Arc) -> Self { + Self { + date: None, + credential, + } + } + + pub(crate) async fn sign( + &self, + method: Method, + url: &mut Url, + expires_in: Duration, + client: &GoogleCloudStorageClient, + ) -> crate::Result<()> { + let email = &self.credential.email; + let date = self.date.unwrap_or_else(Utc::now); + let scope = self.scope(date); + let credential_with_scope = format!("{}/{}", email, scope); + + let mut headers = HeaderMap::new(); + headers.insert("host", DEFAULT_GCS_SIGN_BLOB_HOST.parse().unwrap()); + + let (_, signed_headers) = Self::canonicalize_headers(&headers); + + url.query_pairs_mut() + .append_pair("X-Goog-Algorithm", "GOOG4-RSA-SHA256") + .append_pair("X-Goog-Credential", &credential_with_scope) + .append_pair("X-Goog-Date", &date.format("%Y%m%dT%H%M%SZ").to_string()) + .append_pair("X-Goog-Expires", &expires_in.as_secs().to_string()) + .append_pair("X-Goog-SignedHeaders", &signed_headers); + + let string_to_sign = self.string_to_sign(date, &method, url, &headers); + let signature = match &self.credential.private_key { + Some(key) => key.sign(&string_to_sign)?, + None => client.sign_blob(&string_to_sign, email).await?, + }; + + url.query_pairs_mut() + .append_pair("X-Goog-Signature", &signature); + Ok(()) + } + + /// Get scope for the request + /// + /// + fn scope(&self, date: DateTime) -> String { + format!("{}/auto/storage/goog4_request", date.format("%Y%m%d"),) + } + + /// Canonicalizes query parameters into the GCP canonical form + /// form like: + ///```plaintext + ///HTTP_VERB + ///PATH_TO_RESOURCE + ///CANONICAL_QUERY_STRING + ///CANONICAL_HEADERS + /// + ///SIGNED_HEADERS + ///PAYLOAD + ///``` + /// + /// + fn canonicalize_request(url: &Url, methond: &Method, headers: &HeaderMap) -> String { + let verb = methond.as_str(); + let path = url.path(); + let query = Self::canonicalize_query(url); + let (canaonical_headers, signed_headers) = Self::canonicalize_headers(headers); + + format!( + "{}\n{}\n{}\n{}\n\n{}\n{}", + verb, path, query, canaonical_headers, signed_headers, DEFAULT_GCS_PLAYLOAD_STRING + ) + } + + /// Canonicalizes query parameters into the GCP canonical form + /// form like `max-keys=2&prefix=object` + /// + /// + fn canonicalize_query(url: &Url) -> String { + url.query_pairs() + .sorted_unstable_by(|a, b| a.0.cmp(&b.0)) + .map(|(k, v)| { + format!( + "{}={}", + utf8_percent_encode(k.as_ref(), &STRICT_ENCODE_SET), + utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET) + ) + }) + .join("&") + } + + /// Canonicalizes header into the GCP canonical form + /// + /// + fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) { + //FIXME add error handling for invalid header values + let mut headers = BTreeMap::>::new(); + for (k, v) in header_map { + headers + .entry(k.as_str().to_lowercase()) + .or_default() + .push(std::str::from_utf8(v.as_bytes()).unwrap()); + } + + let canonicalize_headers = headers + .iter() + .map(|(k, v)| { + format!( + "{}:{}", + k.trim(), + v.iter().map(|v| trim_header_value(v)).join(",") + ) + }) + .join("\n"); + + let signed_headers = headers.keys().join(";"); + + (canonicalize_headers, signed_headers) + } + + ///construct the string to sign + ///form like: + ///```plaintext + ///SIGNING_ALGORITHM + ///ACTIVE_DATETIME + ///CREDENTIAL_SCOPE + ///HASHED_CANONICAL_REQUEST + ///``` + ///`ACTIVE_DATETIME` format:`YYYYMMDD'T'HHMMSS'Z'` + /// + pub fn string_to_sign( + &self, + date: DateTime, + request_method: &Method, + url: &Url, + headers: &HeaderMap, + ) -> String { + let caninical_request = Self::canonicalize_request(url, request_method, headers); + let hashed_canonical_req = hex_digest(caninical_request.as_bytes()); + let scope = self.scope(date); + + format!( + "{}\n{}\n{}\n{}", + "GOOG4-RSA-SHA256", + date.format("%Y%m%dT%H%M%SZ"), + scope, + hashed_canonical_req + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_canonicalize_headers() { + let mut input_header = HeaderMap::new(); + input_header.insert("content-type", "text/plain".parse().unwrap()); + input_header.insert("host", "storage.googleapis.com".parse().unwrap()); + input_header.insert("x-goog-meta-reviewer", "jane".parse().unwrap()); + input_header.append("x-goog-meta-reviewer", "john".parse().unwrap()); + assert_eq!( + GCSAuthorizer::canonicalize_headers(&input_header), + ( + "content-type:text/plain +host:storage.googleapis.com +x-goog-meta-reviewer:jane,john" + .into(), + "content-type;host;x-goog-meta-reviewer".to_string() + ) + ); + } + + #[test] + fn test_canonicalize_query() { + let mut url = Url::parse("https://storage.googleapis.com/bucket/object").unwrap(); + url.query_pairs_mut() + .append_pair("max-keys", "2") + .append_pair("prefix", "object"); + assert_eq!( + GCSAuthorizer::canonicalize_query(&url), + "max-keys=2&prefix=object".to_string() + ); + } +} diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 2058d1f8055b..96afa45f2b61 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -35,8 +35,11 @@ //! //! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu use std::sync::Arc; +use std::time::Duration; use crate::client::CredentialProvider; +use crate::gcp::credential::GCSAuthorizer; +use crate::signer::Signer; use crate::{ multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart, @@ -45,13 +48,15 @@ use async_trait::async_trait; use bytes::Bytes; use client::GoogleCloudStorageClient; use futures::stream::BoxStream; +use hyper::Method; +use url::Url; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::parts::Parts; use crate::multipart::MultipartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; -pub use credential::GcpCredential; +pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey}; mod builder; mod client; @@ -62,6 +67,10 @@ const STORE: &str = "GCS"; /// [`CredentialProvider`] for [`GoogleCloudStorage`] pub type GcpCredentialProvider = Arc>; +/// [`GcpSigningCredential`] for [`GoogleCloudStorage`] +pub type GcpSigningCredentialProvider = + Arc>; + /// Interface for [Google Cloud Storage](https://cloud.google.com/storage/). #[derive(Debug)] pub struct GoogleCloudStorage { @@ -83,6 +92,11 @@ impl GoogleCloudStorage { pub fn credentials(&self) -> &GcpCredentialProvider { &self.client.config().credentials } + + /// Returns the [`GcpSigningCredentialProvider`] used by [`GoogleCloudStorage`] + pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider { + &self.client.config().signing_credentials + } } #[derive(Debug)] @@ -215,6 +229,34 @@ impl MultipartStore for GoogleCloudStorage { } } +#[async_trait] +impl Signer for GoogleCloudStorage { + async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { + if expires_in.as_secs() > 604800 { + return Err(crate::Error::Generic { + store: STORE, + source: "Expiration Time can't be longer than 604800 seconds (7 days).".into(), + }); + } + + let config = self.client.config(); + let path_url = config.path_url(path); + let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic { + store: STORE, + source: format!("Unable to parse url {path_url}: {e}").into(), + })?; + + let signing_credentials = self.signing_credentials().get_credential().await?; + let authorizer = GCSAuthorizer::new(signing_credentials); + + authorizer + .sign(method, &mut url, expires_in, &self.client) + .await?; + + Ok(url) + } +} + #[cfg(test)] mod test { @@ -250,6 +292,36 @@ mod test { } } + #[tokio::test] + #[ignore] + async fn gcs_test_sign() { + crate::test_util::maybe_skip_integration!(); + let integration = GoogleCloudStorageBuilder::from_env().build().unwrap(); + + let client = reqwest::Client::new(); + + let path = Path::from("test_sign"); + let url = integration + .signed_url(Method::PUT, &path, Duration::from_secs(3600)) + .await + .unwrap(); + println!("PUT {url}"); + + let resp = client.put(url).body("data").send().await.unwrap(); + resp.error_for_status().unwrap(); + + let url = integration + .signed_url(Method::GET, &path, Duration::from_secs(3600)) + .await + .unwrap(); + println!("GET {url}"); + + let resp = client.get(url).send().await.unwrap(); + let resp = resp.error_for_status().unwrap(); + let data = resp.bytes().await.unwrap(); + assert_eq!(data.as_ref(), b"data"); + } + #[tokio::test] async fn gcs_test_get_nonexistent_location() { crate::test_util::maybe_skip_integration!(); diff --git a/object_store/src/util.rs b/object_store/src/util.rs index a19d5aab4b5b..161d2d138e08 100644 --- a/object_store/src/util.rs +++ b/object_store/src/util.rs @@ -285,6 +285,35 @@ impl> From for GetRange { } } } +// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html +// +// Do not URI-encode any of the unreserved characters that RFC 3986 defines: +// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ). +#[cfg(any(feature = "aws", feature = "gcp"))] +pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC + .remove(b'-') + .remove(b'.') + .remove(b'_') + .remove(b'~'); + +/// Computes the SHA256 digest of `body` returned as a hex encoded string +#[cfg(any(feature = "aws", feature = "gcp"))] +pub(crate) fn hex_digest(bytes: &[u8]) -> String { + let digest = ring::digest::digest(&ring::digest::SHA256, bytes); + hex_encode(digest.as_ref()) +} + +/// Returns `bytes` as a lower-case hex encoded string +#[cfg(any(feature = "aws", feature = "gcp"))] +pub(crate) fn hex_encode(bytes: &[u8]) -> String { + use std::fmt::Write; + let mut out = String::with_capacity(bytes.len() * 2); + for byte in bytes { + // String writing is infallible + let _ = write!(out, "{byte:02x}"); + } + out +} #[cfg(test)] mod tests {