Skip to content

Commit

Permalink
object_store: Add support for requester pays buckets (apache#6768)
Browse files Browse the repository at this point in the history
* Add support for requester pays buckets

* Add tests

* fix rustdoc
  • Loading branch information
kylebarron authored Nov 22, 2024
1 parent 7ef302d commit def94a8
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 2 deletions.
24 changes: 24 additions & 0 deletions object_store/src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct AmazonS3Builder {
encryption_bucket_key_enabled: Option<ConfigValue<bool>>,
/// base64-encoded 256-bit customer encryption key for SSE-C.
encryption_customer_key_base64: Option<String>,
/// When set to true, charge requester for bucket operations
request_payer: ConfigValue<bool>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -330,6 +332,13 @@ pub enum AmazonS3ConfigKey {
/// - `s3_express`
S3Express,

/// Enable Support for S3 Requester Pays
///
/// Supported keys:
/// - `aws_request_payer`
/// - `request_payer`
RequestPayer,

/// Client options
Client(ClientConfigKey),

Expand Down Expand Up @@ -358,6 +367,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::DisableTagging => "aws_disable_tagging",
Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
}
Expand Down Expand Up @@ -389,6 +399,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_copy_if_not_exists" | "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
"aws_server_side_encryption" => Ok(Self::Encryption(
Expand Down Expand Up @@ -510,6 +521,9 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ConditionalPut => {
self.conditional_put = Some(ConfigValue::Deferred(value.into()))
}
AmazonS3ConfigKey::RequestPayer => {
self.request_payer = ConfigValue::Deferred(value.into())
}
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
self.encryption_type = Some(ConfigValue::Deferred(value.into()))
Expand Down Expand Up @@ -567,6 +581,7 @@ impl AmazonS3Builder {
self.conditional_put.as_ref().map(ToString::to_string)
}
AmazonS3ConfigKey::DisableTagging => Some(self.disable_tagging.to_string()),
AmazonS3ConfigKey::RequestPayer => Some(self.request_payer.to_string()),
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
self.encryption_type.as_ref().map(ToString::to_string)
Expand Down Expand Up @@ -845,6 +860,14 @@ impl AmazonS3Builder {
self
}

/// Set whether to charge requester for bucket operations.
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html>
pub fn with_request_payer(mut self, enabled: bool) -> Self {
self.request_payer = ConfigValue::Parsed(enabled);
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -996,6 +1019,7 @@ impl AmazonS3Builder {
copy_if_not_exists,
conditional_put: put_precondition,
encryption_headers,
request_payer: self.request_payer.get()?,
};

let client = Arc::new(S3Client::new(config)?);
Expand Down
4 changes: 3 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ pub(crate) struct S3Config {
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
pub request_payer: bool,
pub(super) encryption_headers: S3EncryptionHeaders,
}

Expand Down Expand Up @@ -249,7 +250,8 @@ impl<'a> SessionCredential<'a> {
fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
let mut authorizer =
AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region)
.with_sign_payload(self.config.sign_payload);
.with_sign_payload(self.config.sign_payload)
.with_request_payer(self.config.request_payer);

if self.session_token {
let token = HeaderName::from_static("x-amz-s3session-token");
Expand Down
114 changes: 114 additions & 0 deletions object_store/src/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ pub struct AwsAuthorizer<'a> {
region: &'a str,
token_header: Option<HeaderName>,
sign_payload: bool,
request_payer: bool,
}

static DATE_HEADER: HeaderName = HeaderName::from_static("x-amz-date");
static HASH_HEADER: HeaderName = HeaderName::from_static("x-amz-content-sha256");
static TOKEN_HEADER: HeaderName = HeaderName::from_static("x-amz-security-token");
static REQUEST_PAYER_HEADER: HeaderName = HeaderName::from_static("x-amz-request-payer");
static REQUEST_PAYER_HEADER_VALUE: HeaderValue = HeaderValue::from_static("requester");
const ALGORITHM: &str = "AWS4-HMAC-SHA256";

impl<'a> AwsAuthorizer<'a> {
Expand All @@ -118,6 +121,7 @@ impl<'a> AwsAuthorizer<'a> {
date: None,
sign_payload: true,
token_header: None,
request_payer: false,
}
}

Expand All @@ -134,6 +138,14 @@ impl<'a> AwsAuthorizer<'a> {
self
}

/// Set whether to include requester pays headers
///
/// <https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html>
pub fn with_request_payer(mut self, request_payer: bool) -> Self {
self.request_payer = request_payer;
self
}

/// Authorize `request` with an optional pre-calculated SHA256 digest by attaching
/// the relevant [AWS SigV4] headers
///
Expand Down Expand Up @@ -180,6 +192,15 @@ impl<'a> AwsAuthorizer<'a> {
let header_digest = HeaderValue::from_str(&digest).unwrap();
request.headers_mut().insert(&HASH_HEADER, header_digest);

if self.request_payer {
// For DELETE, GET, HEAD, POST, and PUT requests, include x-amz-request-payer :
// requester in the header
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
request
.headers_mut()
.insert(&REQUEST_PAYER_HEADER, REQUEST_PAYER_HEADER_VALUE.clone());
}

let (signed_headers, canonical_headers) = canonicalize_headers(request.headers());

let scope = self.scope(date);
Expand Down Expand Up @@ -226,6 +247,13 @@ impl<'a> AwsAuthorizer<'a> {
.append_pair("X-Amz-Expires", &expires_in.as_secs().to_string())
.append_pair("X-Amz-SignedHeaders", "host");

if self.request_payer {
// For signed URLs, include x-amz-request-payer=requester in the request
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
url.query_pairs_mut()
.append_pair("x-amz-request-payer", "requester");
}

// For S3, you must include the X-Amz-Security-Token query parameter in the URL if
// using credentials sourced from the STS service.
if let Some(ref token) = self.credential.token {
Expand Down Expand Up @@ -763,12 +791,53 @@ mod tests {
region: "us-east-1",
sign_payload: true,
token_header: None,
request_payer: false,
};

signer.authorize(&mut request, None);
assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=a3c787a7ed37f7fdfbfd2d7056a3d7c9d85e6d52a2bfbec73793c0be6e7862d4")
}

#[test]
fn test_sign_with_signed_payload_request_payer() {
let client = Client::new();

// Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html
let credential = AwsCredential {
key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
token: None,
};

// method = 'GET'
// service = 'ec2'
// host = 'ec2.amazonaws.com'
// region = 'us-east-1'
// endpoint = 'https://ec2.amazonaws.com'
// request_parameters = ''
let date = DateTime::parse_from_rfc3339("2022-08-06T18:01:34Z")
.unwrap()
.with_timezone(&Utc);

let mut request = client
.request(Method::GET, "https://ec2.amazon.com/")
.build()
.unwrap();

let signer = AwsAuthorizer {
date: Some(date),
credential: &credential,
service: "ec2",
region: "us-east-1",
sign_payload: true,
token_header: None,
request_payer: true,
};

signer.authorize(&mut request, None);
assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-request-payer, Signature=7030625a9e9b57ed2a40e63d749f4a4b7714b6e15004cab026152f870dd8565d")
}

#[test]
fn test_sign_with_unsigned_payload() {
let client = Client::new();
Expand Down Expand Up @@ -802,6 +871,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: false,
request_payer: false,
};

authorizer.authorize(&mut request, None);
Expand All @@ -828,6 +898,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: false,
request_payer: false,
};

let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
Expand All @@ -848,6 +919,48 @@ mod tests {
);
}

#[test]
fn signed_get_url_request_payer() {
// Values from https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
let credential = AwsCredential {
key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
token: None,
};

let date = DateTime::parse_from_rfc3339("2013-05-24T00:00:00Z")
.unwrap()
.with_timezone(&Utc);

let authorizer = AwsAuthorizer {
date: Some(date),
credential: &credential,
service: "s3",
region: "us-east-1",
token_header: None,
sign_payload: false,
request_payer: true,
};

let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400));

assert_eq!(
url,
Url::parse(
"https://examplebucket.s3.amazonaws.com/test.txt?\
X-Amz-Algorithm=AWS4-HMAC-SHA256&\
X-Amz-Credential=AKIAIOSFODNN7EXAMPLE%2F20130524%2Fus-east-1%2Fs3%2Faws4_request&\
X-Amz-Date=20130524T000000Z&\
X-Amz-Expires=86400&\
X-Amz-SignedHeaders=host&\
x-amz-request-payer=requester&\
X-Amz-Signature=9ad7c781cc30121f199b47d35ed3528473e4375b63c5d91cd87c927803e4e00a"
)
.unwrap()
);
}

#[test]
fn test_sign_port() {
let client = Client::new();
Expand Down Expand Up @@ -880,6 +993,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: true,
request_payer: false,
};

authorizer.authorize(&mut request, None);
Expand Down
3 changes: 2 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ impl Signer for AmazonS3 {
/// ```
async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
let credential = self.credentials().get_credential().await?;
let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region);
let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config.region)
.with_request_payer(self.client.config.request_payer);

let path_url = self.path_url(path);
let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
Expand Down

0 comments on commit def94a8

Please sign in to comment.