diff --git a/.github/services/s3/r2/action.yml b/.github/services/s3/r2/action.yml index 38b85141f6c6..892da1fac8f9 100644 --- a/.github/services/s3/r2/action.yml +++ b/.github/services/s3/r2/action.yml @@ -39,4 +39,5 @@ runs: cat << EOF >> $GITHUB_ENV OPENDAL_S3_REGION=auto OPENDAL_S3_BATCH_MAX_OPERATIONS=700 + OPENDAL_S3_DISABLE_STAT_WITH_OVERRIDE=true EOF diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index caa894d623cb..5e870efab895 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -213,6 +213,11 @@ impl OpPresign { pub fn expire(&self) -> Duration { self.expire } + + /// Consume OpPresign into (Duration, PresignOperation) + pub fn into_parts(self) -> (Duration, PresignOperation) { + (self.expire, self.op) + } } /// Presign operation used for presign. @@ -411,6 +416,9 @@ impl OpRead { pub struct OpStat { if_match: Option, if_none_match: Option, + override_content_type: Option, + override_cache_control: Option, + override_content_disposition: Option, version: Option, } @@ -442,6 +450,40 @@ impl OpStat { self.if_none_match.as_deref() } + /// Sets the content-disposition header that should be send back by the remote read operation. + pub fn with_override_content_disposition(mut self, content_disposition: &str) -> Self { + self.override_content_disposition = Some(content_disposition.into()); + self + } + + /// Returns the content-disposition header that should be send back by the remote read + /// operation. + pub fn override_content_disposition(&self) -> Option<&str> { + self.override_content_disposition.as_deref() + } + + /// Sets the cache-control header that should be send back by the remote read operation. + pub fn with_override_cache_control(mut self, cache_control: &str) -> Self { + self.override_cache_control = Some(cache_control.into()); + self + } + + /// Returns the cache-control header that should be send back by the remote read operation. + pub fn override_cache_control(&self) -> Option<&str> { + self.override_cache_control.as_deref() + } + + /// Sets the content-type header that should be send back by the remote read operation. + pub fn with_override_content_type(mut self, content_type: &str) -> Self { + self.override_content_type = Some(content_type.into()); + self + } + + /// Returns the content-type header that should be send back by the remote read operation. + pub fn override_content_type(&self) -> Option<&str> { + self.override_content_type.as_deref() + } + /// Set the version of the option pub fn with_version(mut self, version: &str) -> Self { self.version = Some(version.to_string()); diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 6bde0532c757..47260c506bc5 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -196,6 +196,10 @@ pub struct S3Config { /// /// Please tune this value based on services' document. pub batch_max_operations: Option, + /// Disable stat with override so that opendal will not send stat request with override queries. + /// + /// For example, R2 doesn't support stat with `response_content_type` query. + pub disable_stat_with_override: bool, } impl Debug for S3Config { @@ -556,6 +560,14 @@ impl S3Builder { self } + /// Disable stat with override so that opendal will not send stat request with override queries. + /// + /// For example, R2 doesn't support stat with `response_content_type` query. + pub fn disable_stat_with_override(&mut self) -> &mut Self { + self.config.disable_stat_with_override = true; + self + } + /// Adding a customed credential load for service. /// /// If customed_credential_load has been set, we will ignore all other @@ -948,6 +960,7 @@ impl Builder for S3Builder { server_side_encryption_customer_key_md5, default_storage_class, allow_anonymous: self.config.allow_anonymous, + disable_stat_with_override: self.config.disable_stat_with_override, signer, loader, client, @@ -981,6 +994,9 @@ impl Accessor for S3Backend { stat: true, stat_with_if_match: true, stat_with_if_none_match: true, + stat_with_override_cache_control: !self.core.disable_stat_with_override, + stat_with_override_content_disposition: !self.core.disable_stat_with_override, + stat_with_override_content_type: !self.core.disable_stat_with_override, read: true, read_can_next: true, @@ -1079,10 +1095,7 @@ impl Accessor for S3Backend { } async fn stat(&self, path: &str, args: OpStat) -> Result { - let resp = self - .core - .s3_head_object(path, args.if_none_match(), args.if_match()) - .await?; + let resp = self.core.s3_head_object(path, args).await?; let status = resp.status(); @@ -1119,13 +1132,12 @@ impl Accessor for S3Backend { } async fn presign(&self, path: &str, args: OpPresign) -> Result { + let (expire, op) = args.into_parts(); + // We will not send this request out, just for signing. - let mut req = match args.operation() { - PresignOperation::Stat(v) => { - self.core - .s3_head_object_request(path, v.if_none_match(), v.if_match())? - } - PresignOperation::Read(v) => self.core.s3_get_object_request(path, v.clone())?, + let mut req = match op { + PresignOperation::Stat(v) => self.core.s3_head_object_request(path, v)?, + PresignOperation::Read(v) => self.core.s3_get_object_request(path, v)?, PresignOperation::Write(_) => self.core.s3_put_object_request( path, None, @@ -1134,7 +1146,7 @@ impl Accessor for S3Backend { )?, }; - self.core.sign_query(&mut req, args.expire()).await?; + self.core.sign_query(&mut req, expire).await?; // We don't need this request anymore, consume it directly. let (parts, _) = req.into_parts(); diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index f20fc9a86c3f..35342d1479c0 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -79,6 +79,7 @@ pub struct S3Core { pub server_side_encryption_customer_key_md5: Option, pub default_storage_class: Option, pub allow_anonymous: bool, + pub disable_stat_with_override: bool, pub signer: AwsV4Signer, pub loader: Box, @@ -232,25 +233,47 @@ impl S3Core { } impl S3Core { - pub fn s3_head_object_request( - &self, - path: &str, - if_none_match: Option<&str>, - if_match: Option<&str>, - ) -> Result> { + pub fn s3_head_object_request(&self, path: &str, args: OpStat) -> Result> { let p = build_abs_path(&self.root, path); - let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + let mut url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + + // Add query arguments to the URL based on response overrides + let mut query_args = Vec::new(); + if let Some(override_content_disposition) = args.override_content_disposition() { + query_args.push(format!( + "{}={}", + constants::RESPONSE_CONTENT_DISPOSITION, + percent_encode_path(override_content_disposition) + )) + } + if let Some(override_content_type) = args.override_content_type() { + query_args.push(format!( + "{}={}", + constants::RESPONSE_CONTENT_TYPE, + percent_encode_path(override_content_type) + )) + } + if let Some(override_cache_control) = args.override_cache_control() { + query_args.push(format!( + "{}={}", + constants::RESPONSE_CACHE_CONTROL, + percent_encode_path(override_cache_control) + )) + } + if !query_args.is_empty() { + url.push_str(&format!("?{}", query_args.join("&"))); + } let mut req = Request::head(&url); req = self.insert_sse_headers(req, false); - if let Some(if_none_match) = if_none_match { + if let Some(if_none_match) = args.if_none_match() { req = req.header(IF_NONE_MATCH, if_none_match); } - if let Some(if_match) = if_match { + if let Some(if_match) = args.if_match() { req = req.header(IF_MATCH, if_match); } @@ -377,10 +400,9 @@ impl S3Core { pub async fn s3_head_object( &self, path: &str, - if_none_match: Option<&str>, - if_match: Option<&str>, + args: OpStat, ) -> Result> { - let mut req = self.s3_head_object_request(path, if_none_match, if_match)?; + let mut req = self.s3_head_object_request(path, args)?; self.sign(&mut req).await?; diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 0847afa5ec23..ca0936b09521 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -61,6 +61,12 @@ pub struct Capability { pub stat_with_if_match: bool, /// If operator supports stat with if none match. pub stat_with_if_none_match: bool, + /// if operator supports read with override cache control. + pub stat_with_override_cache_control: bool, + /// if operator supports read with override content disposition. + pub stat_with_override_content_disposition: bool, + /// if operator supports read with override content type. + pub stat_with_override_content_type: bool, /// If operator supports read. pub read: bool, diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 69bec0bca29d..9475606c93c4 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -1359,6 +1359,41 @@ impl Operator { Ok(rp.into_presigned_request()) } + /// Presign an operation for stat(head). + /// + /// # Example + /// + /// ```no_run + /// use anyhow::Result; + /// use futures::io; + /// use opendal::Operator; + /// use std::time::Duration; + /// + /// #[tokio::main] + /// async fn test(op: Operator) -> Result<()> { + /// let signed_req = op.presign_stat_with("test",Duration::from_secs(3600)).override_content_disposition("attachment; filename=\"othertext.txt\"").await?; + /// # Ok(()) + /// # } + /// ``` + pub fn presign_stat_with(&self, path: &str, expire: Duration) -> FuturePresignStat { + let path = normalize_path(path); + + let fut = FuturePresignStat(OperatorFuture::new( + self.inner().clone(), + path, + (OpStat::default(), expire), + |inner, path, (args, dur)| { + let fut = async move { + let op = OpPresign::new(args, dur); + let rp = inner.presign(&path, op).await?; + Ok(rp.into_presigned_request()) + }; + Box::pin(fut) + }, + )); + fut + } + /// Presign an operation for read. /// /// # Example diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 58dd9e74cb82..bcf6b720f6ad 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -148,6 +148,59 @@ impl Future for FutureStat { } } +/// Future that generated by [`Operator::presign_stat_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub struct FuturePresignStat(pub(crate) OperatorFuture<(OpStat, Duration), PresignedRequest>); + +impl FuturePresignStat { + /// Sets the content-disposition header that should be send back by the remote read operation. + pub fn override_content_disposition(mut self, v: &str) -> Self { + self.0 = self + .0 + .map_args(|(args, dur)| (args.with_override_content_disposition(v), dur)); + self + } + + /// Sets the cache-control header that should be send back by the remote read operation. + pub fn override_cache_control(mut self, v: &str) -> Self { + self.0 = self + .0 + .map_args(|(args, dur)| (args.with_override_cache_control(v), dur)); + self + } + + /// Sets the content-type header that should be send back by the remote read operation. + pub fn override_content_type(mut self, v: &str) -> Self { + self.0 = self + .0 + .map_args(|(args, dur)| (args.with_override_content_type(v), dur)); + self + } + + /// Set the If-Match of the option + pub fn if_match(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|(args, dur)| (args.with_if_match(v), dur)); + self + } + + /// Set the If-None-Match of the option + pub fn if_none_match(mut self, v: &str) -> Self { + self.0 = self + .0 + .map_args(|(args, dur)| (args.with_if_none_match(v), dur)); + self + } +} + +impl Future for FuturePresignStat { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_unpin(cx) + } +} + /// Future that generated by [`Operator::presign_read_with`]. /// /// Users can add more options by public functions provided by this struct. diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index f76765945e57..5c3a81458ca7 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -62,6 +62,9 @@ pub fn behavior_write_tests(op: &Operator) -> Vec { test_stat_not_exist, test_stat_with_if_match, test_stat_with_if_none_match, + test_stat_with_override_cache_control, + test_stat_with_override_content_disposition, + test_stat_with_override_content_type, test_stat_root, test_read_full, test_read_range, @@ -478,6 +481,154 @@ pub async fn test_stat_with_if_none_match(op: Operator) -> Result<()> { Ok(()) } +/// Stat file with override-cache-control should succeed. +pub async fn test_stat_with_override_cache_control(op: Operator) -> Result<()> { + if !(op.info().full_capability().stat_with_override_cache_control + && op.info().full_capability().presign) + { + return Ok(()); + } + + let path = uuid::Uuid::new_v4().to_string(); + let (content, _) = gen_bytes(op.info().full_capability()); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + let target_cache_control = "no-cache, no-store, must-revalidate"; + let signed_req = op + .presign_stat_with(&path, Duration::from_secs(60)) + .override_cache_control(target_cache_control) + .await + .expect("sign must succeed"); + + let client = reqwest::Client::new(); + let mut req = client.request( + signed_req.method().clone(), + Url::from_str(&signed_req.uri().to_string()).expect("must be valid url"), + ); + for (k, v) in signed_req.header() { + req = req.header(k, v); + } + + let resp = req.send().await.expect("send must succeed"); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + resp.headers() + .get("cache-control") + .expect("cache-control header must exist") + .to_str() + .expect("cache-control header must be string"), + target_cache_control + ); + + op.delete(&path).await.expect("delete must succeed"); + Ok(()) +} + +/// Stat file with override_content_disposition should succeed. +pub async fn test_stat_with_override_content_disposition(op: Operator) -> Result<()> { + if !(op + .info() + .full_capability() + .stat_with_override_content_disposition + && op.info().full_capability().presign) + { + return Ok(()); + } + + let path = uuid::Uuid::new_v4().to_string(); + let (content, _) = gen_bytes(op.info().full_capability()); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + let target_content_disposition = "attachment; filename=foo.txt"; + + let signed_req = op + .presign_stat_with(&path, Duration::from_secs(60)) + .override_content_disposition(target_content_disposition) + .await + .expect("presign must succeed"); + + let client = reqwest::Client::new(); + let mut req = client.request( + signed_req.method().clone(), + Url::from_str(&signed_req.uri().to_string()).expect("must be valid url"), + ); + for (k, v) in signed_req.header() { + req = req.header(k, v); + } + + let resp = req.send().await.expect("send must succeed"); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + resp.headers() + .get(http::header::CONTENT_DISPOSITION) + .expect("content-disposition header must exist") + .to_str() + .expect("content-disposition header must be string"), + target_content_disposition + ); + + op.delete(&path).await.expect("delete must succeed"); + + Ok(()) +} + +/// Stat file with override_content_type should succeed. +pub async fn test_stat_with_override_content_type(op: Operator) -> Result<()> { + if !(op.info().full_capability().stat_with_override_content_type + && op.info().full_capability().presign) + { + return Ok(()); + } + + let path = uuid::Uuid::new_v4().to_string(); + let (content, _) = gen_bytes(op.info().full_capability()); + + op.write(&path, content.clone()) + .await + .expect("write must succeed"); + + let target_content_type = "application/opendal"; + + let signed_req = op + .presign_stat_with(&path, Duration::from_secs(60)) + .override_content_type(target_content_type) + .await + .expect("presign must succeed"); + + let client = reqwest::Client::new(); + let mut req = client.request( + signed_req.method().clone(), + Url::from_str(&signed_req.uri().to_string()).expect("must be valid url"), + ); + for (k, v) in signed_req.header() { + req = req.header(k, v); + } + + let resp = req.send().await.expect("send must succeed"); + + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + resp.headers() + .get(http::header::CONTENT_TYPE) + .expect("content-type header must exist") + .to_str() + .expect("content-type header must be string"), + target_content_type + ); + + op.delete(&path).await.expect("delete must succeed"); + + Ok(()) +} + /// Root should be able to stat and returns DIR. pub async fn test_stat_root(op: Operator) -> Result<()> { let meta = op.stat("").await?;