From def94a839236f3b04727a07c378668c9ada807f0 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Fri, 22 Nov 2024 14:55:12 +0000 Subject: [PATCH] object_store: Add support for requester pays buckets (#6768) * Add support for requester pays buckets * Add tests * fix rustdoc --- object_store/src/aws/builder.rs | 24 ++++++ object_store/src/aws/client.rs | 4 +- object_store/src/aws/credential.rs | 114 +++++++++++++++++++++++++++++ object_store/src/aws/mod.rs | 3 +- 4 files changed, 143 insertions(+), 2 deletions(-) diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs index eb79f5e6dc28..840245a7b5d4 100644 --- a/object_store/src/aws/builder.rs +++ b/object_store/src/aws/builder.rs @@ -170,6 +170,8 @@ pub struct AmazonS3Builder { encryption_bucket_key_enabled: Option>, /// base64-encoded 256-bit customer encryption key for SSE-C. encryption_customer_key_base64: Option, + /// When set to true, charge requester for bucket operations + request_payer: ConfigValue, } /// Configuration keys for [`AmazonS3Builder`] @@ -330,6 +332,13 @@ pub enum AmazonS3ConfigKey { /// - `s3_express` S3Express, + /// Enable Support for S3 Requester Pays + /// + /// Supported keys: + /// - `aws_request_payer` + /// - `request_payer` + RequestPayer, + /// Client options Client(ClientConfigKey), @@ -358,6 +367,7 @@ impl AsRef for AmazonS3ConfigKey { Self::CopyIfNotExists => "aws_copy_if_not_exists", Self::ConditionalPut => "aws_conditional_put", Self::DisableTagging => "aws_disable_tagging", + Self::RequestPayer => "aws_request_payer", Self::Client(opt) => opt.as_ref(), Self::Encryption(opt) => opt.as_ref(), } @@ -389,6 +399,7 @@ impl FromStr for AmazonS3ConfigKey { "aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists), "aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut), "aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging), + "aws_request_payer" | "request_payer" => Ok(Self::RequestPayer), // Backwards compatibility "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), "aws_server_side_encryption" => Ok(Self::Encryption( @@ -510,6 +521,9 @@ impl AmazonS3Builder { AmazonS3ConfigKey::ConditionalPut => { self.conditional_put = Some(ConfigValue::Deferred(value.into())) } + AmazonS3ConfigKey::RequestPayer => { + self.request_payer = ConfigValue::Deferred(value.into()) + } AmazonS3ConfigKey::Encryption(key) => match key { S3EncryptionConfigKey::ServerSideEncryption => { self.encryption_type = Some(ConfigValue::Deferred(value.into())) @@ -567,6 +581,7 @@ impl AmazonS3Builder { self.conditional_put.as_ref().map(ToString::to_string) } AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()), + AmazonS3ConfigKey::RequestPayer => Some(self.request_payer.to_string()), AmazonS3ConfigKey::Encryption(key) => match key { S3EncryptionConfigKey::ServerSideEncryption => { self.encryption_type.as_ref().map(ToString::to_string) @@ -845,6 +860,14 @@ impl AmazonS3Builder { self } + /// Set whether to charge requester for bucket operations. + /// + /// + pub fn with_request_payer(mut self, enabled: bool) -> Self { + self.request_payer = ConfigValue::Parsed(enabled); + self + } + /// Create a [`AmazonS3`] instance from the provided values, /// consuming `self`. pub fn build(mut self) -> Result { @@ -996,6 +1019,7 @@ impl AmazonS3Builder { copy_if_not_exists, conditional_put: put_precondition, encryption_headers, + request_payer: self.request_payer.get()?, }; let client = Arc::new(S3Client::new(config)?); diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 895308f5880e..b19e0e2ab7fe 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -202,6 +202,7 @@ pub(crate) struct S3Config { pub checksum: Option, pub copy_if_not_exists: Option, pub conditional_put: Option, + pub request_payer: bool, pub(super) encryption_headers: S3EncryptionHeaders, } @@ -249,7 +250,8 @@ impl<'a> SessionCredential<'a> { fn authorizer(&self) -> Option> { let mut authorizer = AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region) - .with_sign_payload(self.config.sign_payload); + .with_sign_payload(self.config.sign_payload) + .with_request_payer(self.config.request_payer); if self.session_token { let token = HeaderName::from_static("x-amz-s3session-token"); diff --git a/object_store/src/aws/credential.rs b/object_store/src/aws/credential.rs index 33972c6fa14a..ee2f8e2ec953 100644 --- a/object_store/src/aws/credential.rs +++ b/object_store/src/aws/credential.rs @@ -101,11 +101,14 @@ pub struct AwsAuthorizer<'a> { region: &'a str, token_header: Option, sign_payload: bool, + request_payer: bool, } static DATE_HEADER: HeaderName = HeaderName::from_static("x-amz-date"); static HASH_HEADER: HeaderName = HeaderName::from_static("x-amz-content-sha256"); static TOKEN_HEADER: HeaderName = HeaderName::from_static("x-amz-security-token"); +static REQUEST_PAYER_HEADER: HeaderName = HeaderName::from_static("x-amz-request-payer"); +static REQUEST_PAYER_HEADER_VALUE: HeaderValue = HeaderValue::from_static("requester"); const ALGORITHM: &str = "AWS4-HMAC-SHA256"; impl<'a> AwsAuthorizer<'a> { @@ -118,6 +121,7 @@ impl<'a> AwsAuthorizer<'a> { date: None, sign_payload: true, token_header: None, + request_payer: false, } } @@ -134,6 +138,14 @@ impl<'a> AwsAuthorizer<'a> { self } + /// Set whether to include requester pays headers + /// + /// + pub fn with_request_payer(mut self, request_payer: bool) -> Self { + self.request_payer = request_payer; + self + } + /// Authorize `request` with an optional pre-calculated SHA256 digest by attaching /// the relevant [AWS SigV4] headers /// @@ -180,6 +192,15 @@ impl<'a> AwsAuthorizer<'a> { let header_digest = HeaderValue::from_str(&digest).unwrap(); request.headers_mut().insert(&HASH_HEADER, header_digest); + if self.request_payer { + // For DELETE, GET, HEAD, POST, and PUT requests, include x-amz-request-payer : + // requester in the header + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html + request + .headers_mut() + .insert(&REQUEST_PAYER_HEADER, REQUEST_PAYER_HEADER_VALUE.clone()); + } + let (signed_headers, canonical_headers) = canonicalize_headers(request.headers()); let scope = self.scope(date); @@ -226,6 +247,13 @@ impl<'a> AwsAuthorizer<'a> { .append_pair("X-Amz-Expires", &expires_in.as_secs().to_string()) .append_pair("X-Amz-SignedHeaders", "host"); + if self.request_payer { + // For signed URLs, include x-amz-request-payer=requester in the request + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html + url.query_pairs_mut() + .append_pair("x-amz-request-payer", "requester"); + } + // For S3, you must include the X-Amz-Security-Token query parameter in the URL if // using credentials sourced from the STS service. if let Some(ref token) = self.credential.token { @@ -763,12 +791,53 @@ mod tests { region: "us-east-1", sign_payload: true, token_header: None, + request_payer: false, }; signer.authorize(&mut request, None); assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=a3c787a7ed37f7fdfbfd2d7056a3d7c9d85e6d52a2bfbec73793c0be6e7862d4") } + #[test] + fn test_sign_with_signed_payload_request_payer() { + let client = Client::new(); + + // Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html + let credential = AwsCredential { + key_id: "AKIAIOSFODNN7EXAMPLE".to_string(), + secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(), + token: None, + }; + + // method = 'GET' + // service = 'ec2' + // host = 'ec2.amazonaws.com' + // region = 'us-east-1' + // endpoint = 'https://ec2.amazonaws.com' + // request_parameters = '' + let date = DateTime::parse_from_rfc3339("2022-08-06T18:01:34Z") + .unwrap() + .with_timezone(&Utc); + + let mut request = client + .request(Method::GET, "https://ec2.amazon.com/") + .build() + .unwrap(); + + let signer = AwsAuthorizer { + date: Some(date), + credential: &credential, + service: "ec2", + region: "us-east-1", + sign_payload: true, + token_header: None, + request_payer: true, + }; + + signer.authorize(&mut request, None); + assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-request-payer, Signature=7030625a9e9b57ed2a40e63d749f4a4b7714b6e15004cab026152f870dd8565d") + } + #[test] fn test_sign_with_unsigned_payload() { let client = Client::new(); @@ -802,6 +871,7 @@ mod tests { region: "us-east-1", token_header: None, sign_payload: false, + request_payer: false, }; authorizer.authorize(&mut request, None); @@ -828,6 +898,7 @@ mod tests { region: "us-east-1", token_header: None, sign_payload: false, + request_payer: false, }; let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap(); @@ -848,6 +919,48 @@ mod tests { ); } + #[test] + fn signed_get_url_request_payer() { + // Values from https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html + let credential = AwsCredential { + key_id: "AKIAIOSFODNN7EXAMPLE".to_string(), + secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(), + token: None, + }; + + let date = DateTime::parse_from_rfc3339("2013-05-24T00:00:00Z") + .unwrap() + .with_timezone(&Utc); + + let authorizer = AwsAuthorizer { + date: Some(date), + credential: &credential, + service: "s3", + region: "us-east-1", + token_header: None, + sign_payload: false, + request_payer: true, + }; + + let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap(); + authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400)); + + assert_eq!( + url, + Url::parse( + "https://examplebucket.s3.amazonaws.com/test.txt?\ + X-Amz-Algorithm=AWS4-HMAC-SHA256&\ + X-Amz-Credential=AKIAIOSFODNN7EXAMPLE%2F20130524%2Fus-east-1%2Fs3%2Faws4_request&\ + X-Amz-Date=20130524T000000Z&\ + X-Amz-Expires=86400&\ + X-Amz-SignedHeaders=host&\ + x-amz-request-payer=requester&\ + X-Amz-Signature=9ad7c781cc30121f199b47d35ed3528473e4375b63c5d91cd87c927803e4e00a" + ) + .unwrap() + ); + } + #[test] fn test_sign_port() { let client = Client::new(); @@ -880,6 +993,7 @@ mod tests { region: "us-east-1", token_header: None, sign_payload: true, + request_payer: false, }; authorizer.authorize(&mut request, None); diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index b238d90eb6d7..81511bad7b08 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -136,7 +136,8 @@ impl Signer for AmazonS3 { /// ``` async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result { let credential = self.credentials().get_credential().await?; - let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region); + let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region) + .with_request_payer(self.client.config.request_payer); let path_url = self.path_url(path); let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {