Skip to content

Commit

Permalink
feat(core): implement if_modified_since and if_unmodified_since for r…
Browse files Browse the repository at this point in the history
…ead_with and reader_with
  • Loading branch information
meteorgan committed Jan 2, 2025
1 parent 0146a12 commit cbe9efb
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 10 deletions.
2 changes: 1 addition & 1 deletion core/src/raw/http_util/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct HttpBody {

/// # Safety
///
/// HttpBody is send on non wasm32 targets.
/// HttpBody is sent on non wasm32 targets.
unsafe impl Send for HttpBody {}

/// # Safety
Expand Down
30 changes: 27 additions & 3 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
//!
//! By using ops, users can add more context for operation.
use std::collections::HashMap;
use std::time::Duration;

use crate::raw::*;
use crate::*;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::time::Duration;

/// Args for `create` operation.
///
Expand Down Expand Up @@ -304,6 +304,8 @@ pub struct OpRead {
override_content_disposition: Option<String>,
version: Option<String>,
executor: Option<Executor>,
if_modified_since: Option<DateTime<Utc>>,
if_unmodified_since: Option<DateTime<Utc>>,
}

impl OpRead {
Expand Down Expand Up @@ -419,6 +421,28 @@ impl OpRead {
pub fn executor(&self) -> Option<&Executor> {
self.executor.as_ref()
}

/// Set the If-Modified-Since of the option
pub fn with_if_modified_since(mut self, if_modified_since: DateTime<Utc>) -> Self {
self.if_modified_since = Some(if_modified_since);
self
}

/// Get If-Modified-Since from option
pub fn if_modified_since(&self) -> Option<DateTime<Utc>> {
self.if_modified_since
}

/// Set the If-Unmodified-Since of the option
pub fn with_if_unmodified_since(mut self, if_unmodified_since: DateTime<Utc>) -> Self {
self.if_unmodified_since = Some(if_unmodified_since);
self
}

/// Get If-Unmodified-Since from option
pub fn if_unmodified_since(&self) -> Option<DateTime<Utc>> {
self.if_unmodified_since
}
}

/// Args for reader operation.
Expand Down
2 changes: 2 additions & 0 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,8 @@ impl Access for S3Backend {
read_with_override_content_disposition: true,
read_with_override_content_type: true,
read_with_version: self.core.enable_versioning,
read_with_if_modified_since: true,
read_with_if_unmodified_since: true,

write: true,
write_can_empty: true,
Expand Down
11 changes: 10 additions & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use constants::X_AMZ_META_PREFIX;
use http::header::HeaderName;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_ENCODING;
Expand All @@ -37,6 +36,7 @@ use http::header::CONTENT_TYPE;
use http::header::HOST;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use http::header::{HeaderName, IF_MODIFIED_SINCE, IF_UNMODIFIED_SINCE};
use http::HeaderValue;
use http::Request;
use http::Response;
Expand Down Expand Up @@ -406,6 +406,15 @@ impl S3Core {
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}

if let Some(if_modified_since) = args.if_modified_since() {
req = req.header(IF_MODIFIED_SINCE, if_modified_since.to_rfc2822());
}

if let Some(if_unmodified_since) = args.if_unmodified_since() {
req = req.header(IF_UNMODIFIED_SINCE, if_unmodified_since.to_rfc2822());
}

// Set SSE headers.
// TODO: how will this work with presign?
req = self.insert_sse_headers(req, false);
Expand Down
4 changes: 4 additions & 0 deletions core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub struct Capability {
pub read_with_override_content_type: bool,
/// Indicates if versions read operations are supported.
pub read_with_version: bool,
/// Indicates if conditional read operations using If-Modified-Since are supported.
pub read_with_if_modified_since: bool,
/// Indicates if conditional read operations using If-Unmodified-Since are supported.
pub read_with_if_unmodified_since: bool,

/// Indicates if the operator supports write operations.
pub write: bool,
Expand Down
24 changes: 22 additions & 2 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
//!
//! By using futures, users can add more options for operation.
use chrono::{DateTime, Utc};
use futures::Future;
use std::collections::HashMap;
use std::future::IntoFuture;
use std::ops::RangeBounds;
use std::time::Duration;

use futures::Future;

use crate::raw::*;
use crate::*;

Expand Down Expand Up @@ -236,6 +236,16 @@ impl<F: Future<Output = Result<Buffer>>> FutureRead<F> {
pub fn chunk(self, chunk_size: usize) -> Self {
self.map(|(args, op_reader)| (args, op_reader.with_chunk(chunk_size)))
}

/// Set the If-Modified-Since for this operation.
pub fn if_modified_since(self, v: DateTime<Utc>) -> Self {
self.map(|(args, op_reader)| (args.with_if_modified_since(v), op_reader))
}

/// Set the If-Unmodified-Since for this operation.
pub fn if_unmodified_since(self, v: DateTime<Utc>) -> Self {
self.map(|(args, op_reader)| (args.with_if_unmodified_since(v), op_reader))
}
}

/// Future that generated by [`Operator::read_with`] or [`Operator::reader_with`].
Expand Down Expand Up @@ -277,6 +287,16 @@ impl<F: Future<Output = Result<Reader>>> FutureReader<F> {
pub fn gap(self, gap_size: usize) -> Self {
self.map(|(op_read, op_reader)| (op_read, op_reader.with_gap(gap_size)))
}

/// Set the If-Modified-Since for this operation.
pub fn if_modified_since(self, v: DateTime<Utc>) -> Self {
self.map(|(op_read, op_reader)| (op_read.with_if_modified_since(v), op_reader))
}

/// Set the If-Unmodified-Since for this operation.
pub fn if_unmodified_since(self, v: DateTime<Utc>) -> Self {
self.map(|(op_read, op_reader)| (op_read.with_if_unmodified_since(v), op_reader))
}
}

/// Future that generated by [`Operator::write_with`].
Expand Down
144 changes: 141 additions & 3 deletions core/tests/behavior/async_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
use std::str::FromStr;
use std::time::Duration;

use crate::*;
use futures::AsyncReadExt;
use futures::TryStreamExt;
use http::StatusCode;
use log::warn;
use reqwest::Url;
use sha2::Digest;
use sha2::Sha256;

use crate::*;
use tokio::time::sleep;

pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
let cap = op.info().full_capability();
Expand All @@ -39,6 +39,8 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
test_reader,
test_reader_with_if_match,
test_reader_with_if_none_match,
test_reader_with_if_modified_since,
test_reader_with_if_unmodified_since,
test_read_not_exist,
test_read_with_if_match,
test_read_with_if_none_match,
Expand All @@ -48,7 +50,9 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
test_read_with_override_content_disposition,
test_read_with_override_content_type,
test_read_with_version,
test_read_with_not_existing_version
test_read_with_not_existing_version,
test_read_with_if_modified_since,
test_read_with_if_unmodified_since
))
}

Expand Down Expand Up @@ -270,6 +274,74 @@ pub async fn test_reader_with_if_none_match(op: Operator) -> anyhow::Result<()>
Ok(())
}

/// Reader with if_modified_since should match, otherwise, a ConditionNotMatch error will be returned.
pub async fn test_reader_with_if_modified_since(op: Operator) -> anyhow::Result<()> {
if !op.info().full_capability().read_with_if_modified_since {
return Ok(());
}

let (path, content, _) = TEST_FIXTURE.new_file(op.clone());

op.write(&path, content.clone())
.await
.expect("write must succeed");

let one_hour_ago_check = chrono::Utc::now() - chrono::Duration::seconds(3600);
let reader = op
.reader_with(&path)
.if_modified_since(one_hour_ago_check)
.await?;
let bs = reader.read(..).await?.to_bytes();
assert_eq!(bs, content);

sleep(Duration::from_secs(3)).await;

let one_second_ago_check = chrono::Utc::now() - chrono::Duration::seconds(1);
let reader = op
.reader_with(&path)
.if_modified_since(one_second_ago_check)
.await?;
let res = reader.read(..).await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch);

Ok(())
}

/// Reader with if_unmodified_since should match, otherwise, a ConditionNotMatch error will be returned.
pub async fn test_reader_with_if_unmodified_since(op: Operator) -> anyhow::Result<()> {
if !op.info().full_capability().read_with_if_unmodified_since {
return Ok(());
}

let (path, content, _) = TEST_FIXTURE.new_file(op.clone());

op.write(&path, content.clone())
.await
.expect("write must succeed");

let one_hour_ago_check = chrono::Utc::now() - chrono::Duration::seconds(3600);
let reader = op
.reader_with(&path)
.if_unmodified_since(one_hour_ago_check)
.await?;
let res = reader.read(..).await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch);

sleep(Duration::from_secs(3)).await;

let one_second_ago_check = chrono::Utc::now() - chrono::Duration::seconds(1);
let reader = op
.reader_with(&path)
.if_unmodified_since(one_second_ago_check)
.await?;
let bs = reader.read(..).await?.to_bytes();
assert_eq!(bs, content);

Ok(())
}

/// Read with if_none_match should match, else get a ConditionNotMatch error.
pub async fn test_read_with_if_none_match(op: Operator) -> anyhow::Result<()> {
if !op.info().full_capability().read_with_if_none_match {
Expand Down Expand Up @@ -492,6 +564,72 @@ pub async fn test_read_with_override_content_type(op: Operator) -> anyhow::Resul
Ok(())
}

/// Read with if_modified_since should match, otherwise, a ConditionNotMatch error will be returned.
pub async fn test_read_with_if_modified_since(op: Operator) -> anyhow::Result<()> {
if !op.info().full_capability().read_with_if_modified_since {
return Ok(());
}

let (path, content, _) = TEST_FIXTURE.new_file(op.clone());

op.write(&path, content.clone())
.await
.expect("write must succeed");

let one_hour_ago_check = chrono::Utc::now() - chrono::Duration::seconds(3600);
let bs = op
.read_with(&path)
.if_modified_since(one_hour_ago_check)
.await?
.to_bytes();
assert_eq!(bs, content);

sleep(Duration::from_secs(3)).await;

let one_second_ago_check = chrono::Utc::now() - chrono::Duration::seconds(1);
let res = op
.read_with(&path)
.if_modified_since(one_second_ago_check)
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch);

Ok(())
}

/// Read with if_unmodified_since should match, otherwise, a ConditionNotMatch error will be returned.
pub async fn test_read_with_if_unmodified_since(op: Operator) -> anyhow::Result<()> {
if !op.info().full_capability().read_with_if_unmodified_since {
return Ok(());
}

let (path, content, _) = TEST_FIXTURE.new_file(op.clone());

op.write(&path, content.clone())
.await
.expect("write must succeed");

let one_hour_ago_check = chrono::Utc::now() - chrono::Duration::seconds(3600);
let res = op
.read_with(&path)
.if_unmodified_since(one_hour_ago_check)
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::ConditionNotMatch);

sleep(Duration::from_secs(3)).await;

let one_second_ago_check = chrono::Utc::now() - chrono::Duration::seconds(1);
let bs = op
.read_with(&path)
.if_unmodified_since(one_second_ago_check)
.await?
.to_bytes();
assert_eq!(bs, content);

Ok(())
}

/// Read full content should match.
pub async fn test_read_only_read_full(op: Operator) -> anyhow::Result<()> {
let bs = op.read("normal_file.txt").await?.to_bytes();
Expand Down

0 comments on commit cbe9efb

Please sign in to comment.