Skip to content

Commit

Permalink
feat: Add presign_stat_with support (#3778)
Browse files Browse the repository at this point in the history
* feat: Add presign_stat_with support

Signed-off-by: Xuanwo <[email protected]>

* Fix r2

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Dec 19, 2023
1 parent 535e120 commit f98937f
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 23 deletions.
1 change: 1 addition & 0 deletions .github/services/s3/r2/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ runs:
cat << EOF >> $GITHUB_ENV
OPENDAL_S3_REGION=auto
OPENDAL_S3_BATCH_MAX_OPERATIONS=700
OPENDAL_S3_DISABLE_STAT_WITH_OVERRIDE=true
EOF
42 changes: 42 additions & 0 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ impl OpPresign {
pub fn expire(&self) -> Duration {
self.expire
}

/// Consume OpPresign into (Duration, PresignOperation)
pub fn into_parts(self) -> (Duration, PresignOperation) {
(self.expire, self.op)
}
}

/// Presign operation used for presign.
Expand Down Expand Up @@ -411,6 +416,9 @@ impl OpRead {
pub struct OpStat {
if_match: Option<String>,
if_none_match: Option<String>,
override_content_type: Option<String>,
override_cache_control: Option<String>,
override_content_disposition: Option<String>,
version: Option<String>,
}

Expand Down Expand Up @@ -442,6 +450,40 @@ impl OpStat {
self.if_none_match.as_deref()
}

/// Sets the content-disposition header that should be send back by the remote read operation.
pub fn with_override_content_disposition(mut self, content_disposition: &str) -> Self {
self.override_content_disposition = Some(content_disposition.into());
self
}

/// Returns the content-disposition header that should be send back by the remote read
/// operation.
pub fn override_content_disposition(&self) -> Option<&str> {
self.override_content_disposition.as_deref()
}

/// Sets the cache-control header that should be send back by the remote read operation.
pub fn with_override_cache_control(mut self, cache_control: &str) -> Self {
self.override_cache_control = Some(cache_control.into());
self
}

/// Returns the cache-control header that should be send back by the remote read operation.
pub fn override_cache_control(&self) -> Option<&str> {
self.override_cache_control.as_deref()
}

/// Sets the content-type header that should be send back by the remote read operation.
pub fn with_override_content_type(mut self, content_type: &str) -> Self {
self.override_content_type = Some(content_type.into());
self
}

/// Returns the content-type header that should be send back by the remote read operation.
pub fn override_content_type(&self) -> Option<&str> {
self.override_content_type.as_deref()
}

/// Set the version of the option
pub fn with_version(mut self, version: &str) -> Self {
self.version = Some(version.to_string());
Expand Down
34 changes: 23 additions & 11 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub struct S3Config {
///
/// Please tune this value based on services' document.
pub batch_max_operations: Option<usize>,
/// Disable stat with override so that opendal will not send stat request with override queries.
///
/// For example, R2 doesn't support stat with `response_content_type` query.
pub disable_stat_with_override: bool,
}

impl Debug for S3Config {
Expand Down Expand Up @@ -556,6 +560,14 @@ impl S3Builder {
self
}

/// Disable stat with override so that opendal will not send stat request with override queries.
///
/// For example, R2 doesn't support stat with `response_content_type` query.
pub fn disable_stat_with_override(&mut self) -> &mut Self {
self.config.disable_stat_with_override = true;
self
}

/// Adding a customed credential load for service.
///
/// If customed_credential_load has been set, we will ignore all other
Expand Down Expand Up @@ -948,6 +960,7 @@ impl Builder for S3Builder {
server_side_encryption_customer_key_md5,
default_storage_class,
allow_anonymous: self.config.allow_anonymous,
disable_stat_with_override: self.config.disable_stat_with_override,
signer,
loader,
client,
Expand Down Expand Up @@ -981,6 +994,9 @@ impl Accessor for S3Backend {
stat: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
stat_with_override_cache_control: !self.core.disable_stat_with_override,
stat_with_override_content_disposition: !self.core.disable_stat_with_override,
stat_with_override_content_type: !self.core.disable_stat_with_override,

read: true,
read_can_next: true,
Expand Down Expand Up @@ -1079,10 +1095,7 @@ impl Accessor for S3Backend {
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let resp = self
.core
.s3_head_object(path, args.if_none_match(), args.if_match())
.await?;
let resp = self.core.s3_head_object(path, args).await?;

let status = resp.status();

Expand Down Expand Up @@ -1119,13 +1132,12 @@ impl Accessor for S3Backend {
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let (expire, op) = args.into_parts();

// We will not send this request out, just for signing.
let mut req = match args.operation() {
PresignOperation::Stat(v) => {
self.core
.s3_head_object_request(path, v.if_none_match(), v.if_match())?
}
PresignOperation::Read(v) => self.core.s3_get_object_request(path, v.clone())?,
let mut req = match op {
PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v)?,
PresignOperation::Read(v) => self.core.s3_get_object_request(path, v)?,
PresignOperation::Write(_) => self.core.s3_put_object_request(
path,
None,
Expand All @@ -1134,7 +1146,7 @@ impl Accessor for S3Backend {
)?,
};

self.core.sign_query(&mut req, args.expire()).await?;
self.core.sign_query(&mut req, expire).await?;

// We don't need this request anymore, consume it directly.
let (parts, _) = req.into_parts();
Expand Down
46 changes: 34 additions & 12 deletions core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub struct S3Core {
pub server_side_encryption_customer_key_md5: Option<HeaderValue>,
pub default_storage_class: Option<HeaderValue>,
pub allow_anonymous: bool,
pub disable_stat_with_override: bool,

pub signer: AwsV4Signer,
pub loader: Box<dyn AwsCredentialLoad>,
Expand Down Expand Up @@ -232,25 +233,47 @@ impl S3Core {
}

impl S3Core {
pub fn s3_head_object_request(
&self,
path: &str,
if_none_match: Option<&str>,
if_match: Option<&str>,
) -> Result<Request<AsyncBody>> {
pub fn s3_head_object_request(&self, path: &str, args: OpStat) -> Result<Request<AsyncBody>> {
let p = build_abs_path(&self.root, path);

let url = format!("{}/{}", self.endpoint, percent_encode_path(&p));
let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p));

// Add query arguments to the URL based on response overrides
let mut query_args = Vec::new();
if let Some(override_content_disposition) = args.override_content_disposition() {
query_args.push(format!(
"{}={}",
constants::RESPONSE_CONTENT_DISPOSITION,
percent_encode_path(override_content_disposition)
))
}
if let Some(override_content_type) = args.override_content_type() {
query_args.push(format!(
"{}={}",
constants::RESPONSE_CONTENT_TYPE,
percent_encode_path(override_content_type)
))
}
if let Some(override_cache_control) = args.override_cache_control() {
query_args.push(format!(
"{}={}",
constants::RESPONSE_CACHE_CONTROL,
percent_encode_path(override_cache_control)
))
}
if !query_args.is_empty() {
url.push_str(&format!("?{}", query_args.join("&")));
}

let mut req = Request::head(&url);

req = self.insert_sse_headers(req, false);

if let Some(if_none_match) = if_none_match {
if let Some(if_none_match) = args.if_none_match() {
req = req.header(IF_NONE_MATCH, if_none_match);
}

if let Some(if_match) = if_match {
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}

Expand Down Expand Up @@ -377,10 +400,9 @@ impl S3Core {
pub async fn s3_head_object(
&self,
path: &str,
if_none_match: Option<&str>,
if_match: Option<&str>,
args: OpStat,
) -> Result<Response<IncomingAsyncBody>> {
let mut req = self.s3_head_object_request(path, if_none_match, if_match)?;
let mut req = self.s3_head_object_request(path, args)?;

self.sign(&mut req).await?;

Expand Down
6 changes: 6 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ pub struct Capability {
pub stat_with_if_match: bool,
/// If operator supports stat with if none match.
pub stat_with_if_none_match: bool,
/// if operator supports read with override cache control.
pub stat_with_override_cache_control: bool,
/// if operator supports read with override content disposition.
pub stat_with_override_content_disposition: bool,
/// if operator supports read with override content type.
pub stat_with_override_content_type: bool,

/// If operator supports read.
pub read: bool,
Expand Down
35 changes: 35 additions & 0 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,41 @@ impl Operator {
Ok(rp.into_presigned_request())
}

/// Presign an operation for stat(head).
///
/// # Example
///
/// ```no_run
/// use anyhow::Result;
/// use futures::io;
/// use opendal::Operator;
/// use std::time::Duration;
///
/// #[tokio::main]
/// async fn test(op: Operator) -> Result<()> {
/// let signed_req = op.presign_stat_with("test",Duration::from_secs(3600)).override_content_disposition("attachment; filename=\"othertext.txt\"").await?;
/// # Ok(())
/// # }
/// ```
pub fn presign_stat_with(&self, path: &str, expire: Duration) -> FuturePresignStat {
let path = normalize_path(path);

let fut = FuturePresignStat(OperatorFuture::new(
self.inner().clone(),
path,
(OpStat::default(), expire),
|inner, path, (args, dur)| {
let fut = async move {
let op = OpPresign::new(args, dur);
let rp = inner.presign(&path, op).await?;
Ok(rp.into_presigned_request())
};
Box::pin(fut)
},
));
fut
}

/// Presign an operation for read.
///
/// # Example
Expand Down
53 changes: 53 additions & 0 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,59 @@ impl Future for FutureStat {
}
}

/// Future that generated by [`Operator::presign_stat_with`].
///
/// Users can add more options by public functions provided by this struct.
pub struct FuturePresignStat(pub(crate) OperatorFuture<(OpStat, Duration), PresignedRequest>);

impl FuturePresignStat {
/// Sets the content-disposition header that should be send back by the remote read operation.
pub fn override_content_disposition(mut self, v: &str) -> Self {
self.0 = self
.0
.map_args(|(args, dur)| (args.with_override_content_disposition(v), dur));
self
}

/// Sets the cache-control header that should be send back by the remote read operation.
pub fn override_cache_control(mut self, v: &str) -> Self {
self.0 = self
.0
.map_args(|(args, dur)| (args.with_override_cache_control(v), dur));
self
}

/// Sets the content-type header that should be send back by the remote read operation.
pub fn override_content_type(mut self, v: &str) -> Self {
self.0 = self
.0
.map_args(|(args, dur)| (args.with_override_content_type(v), dur));
self
}

/// Set the If-Match of the option
pub fn if_match(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|(args, dur)| (args.with_if_match(v), dur));
self
}

/// Set the If-None-Match of the option
pub fn if_none_match(mut self, v: &str) -> Self {
self.0 = self
.0
.map_args(|(args, dur)| (args.with_if_none_match(v), dur));
self
}
}

impl Future for FuturePresignStat {
type Output = Result<PresignedRequest>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}

/// Future that generated by [`Operator::presign_read_with`].
///
/// Users can add more options by public functions provided by this struct.
Expand Down
Loading

0 comments on commit f98937f

Please sign in to comment.