Skip to content

Commit

Permalink
rename bucket_versioning_enabled to enable_versioning && support star…
Browse files Browse the repository at this point in the history
…t_after
  • Loading branch information
meteorgan committed Sep 10, 2024
1 parent 081b4a3 commit 4901312
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 21 deletions.
6 changes: 5 additions & 1 deletion .github/services/s3/0_minio_s3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ runs:
AWS_ACCESS_KEY_ID: "minioadmin"
AWS_SECRET_ACCESS_KEY: "minioadmin"
AWS_EC2_METADATA_DISABLED: "true"
run: aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
run: |
aws --endpoint-url http://127.0.0.1:9000/ s3 mb s3://test
aws --endpoint-url http://127.0.0.1:9000/ s3api put-bucket-versioning --bucket test --versioning-configuration Status=Enabled
- name: Setup
shell: bash
run: |
Expand All @@ -41,4 +44,5 @@ runs:
OPENDAL_S3_ACCESS_KEY_ID=minioadmin
OPENDAL_S3_SECRET_ACCESS_KEY=minioadmin
OPENDAL_S3_REGION=us-east-1
OPENDAL_S3_ENABLE_VERSIONING=true
EOF
28 changes: 18 additions & 10 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,6 @@ impl S3Builder {
self
}

/// Set bucket versioning status of this backend
pub fn bucket_versioning_enabled(mut self, enabled: bool) -> Self {
self.config.bucket_versioning_enabled = enabled;

self
}

/// Set endpoint of this backend.
///
/// Endpoint must be full uri, e.g.
Expand Down Expand Up @@ -465,6 +458,13 @@ impl S3Builder {
self
}

/// Set bucket versioning status for this backend
pub fn enable_versioning(mut self, enabled: bool) -> Self {
self.config.enable_versioning = enabled;

self
}

/// Check if `bucket` is valid
/// `bucket` must be not empty and if `enable_virtual_host_style` is true
/// it couldn't contain dot(.) character
Expand Down Expand Up @@ -864,7 +864,7 @@ impl Builder for S3Builder {
default_storage_class,
allow_anonymous: self.config.allow_anonymous,
disable_stat_with_override: self.config.disable_stat_with_override,
bucket_versioning_enabled: self.config.bucket_versioning_enabled,
enable_versioning: self.config.enable_versioning,
signer,
loader,
credential_loaded: AtomicBool::new(false),
Expand Down Expand Up @@ -938,7 +938,7 @@ impl Access for S3Backend {
list_with_limit: true,
list_with_start_after: true,
list_with_recursive: true,
list_with_version: self.core.bucket_versioning_enabled,
list_with_version: self.core.enable_versioning,

presign: true,
presign_stat: true,
Expand Down Expand Up @@ -1037,12 +1037,20 @@ impl Access for S3Backend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = if self.core.bucket_versioning_enabled && args.version() {
if args.version() && !self.core.enable_versioning {
return Err(Error::new(
ErrorKind::Unsupported,
"the bucket doesn't enable versioning",
));
}

let l = if args.version() {
TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
)))
} else {
TwoWays::One(PageLister::new(S3Lister::new(
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/s3/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct S3Config {
/// required.
pub bucket: String,
/// is bucket versioning enabled for this bucket
pub bucket_versioning_enabled: bool,
pub enable_versioning: bool,
/// endpoint of this backend.
///
/// Endpoint must be full uri, e.g.
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct S3Core {
pub default_storage_class: Option<HeaderValue>,
pub allow_anonymous: bool,
pub disable_stat_with_override: bool,
pub bucket_versioning_enabled: bool,
pub enable_versioning: bool,

pub signer: AwsV4Signer,
pub loader: Box<dyn AwsCredentialLoad>,
Expand Down
22 changes: 17 additions & 5 deletions core/src/services/s3/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

use std::sync::Arc;

use bytes::Buf;
use quick_xml::de;

use super::core::S3Core;
use super::core::{ListObjectVersionsOutput, ListObjectsOutput};
use super::error::parse_error;
Expand All @@ -28,6 +25,8 @@ use crate::raw::*;
use crate::EntryMode;
use crate::Metadata;
use crate::Result;
use bytes::Buf;
use quick_xml::de;

pub type S3Listers = TwoWays<oio::PageLister<S3Lister>, oio::PageLister<S3ObjectVersionsLister>>;

Expand Down Expand Up @@ -144,30 +143,43 @@ pub struct S3ObjectVersionsLister {
prefix: String,
delimiter: &'static str,
limit: Option<usize>,
start_after: String,
}

impl S3ObjectVersionsLister {
pub fn new(core: Arc<S3Core>, path: &str, recursive: bool, limit: Option<usize>) -> Self {
pub fn new(
core: Arc<S3Core>,
path: &str,
recursive: bool,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };

Self {
core,
prefix: path.to_string(),
delimiter,
limit,
start_after: start_after.map_or("".to_owned(), String::from),
}
}
}

impl oio::PageList for S3ObjectVersionsLister {
async fn next_page(&self, ctx: &mut PageContext) -> Result<()> {
let key_marker = if ctx.key_marker.is_empty() && !self.start_after.is_empty() {
build_abs_path(&self.core.root, &self.start_after)
} else {
ctx.key_marker.clone()
};
let resp = self
.core
.s3_list_object_versions(
&self.prefix,
self.delimiter,
self.limit,
&ctx.key_marker,
&key_marker,
&ctx.version_id_marker,
)
.await?;
Expand Down
55 changes: 52 additions & 3 deletions core/tests/behavior/async_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
test_list_root_with_recursive,
test_remove_all,
test_list_files_with_version,
test_list_files_with_version_and_limit
test_list_with_version_and_limit,
test_list_with_version_and_start_after
))
}

Expand Down Expand Up @@ -714,7 +715,7 @@ pub async fn test_list_files_with_version(op: Operator) -> Result<()> {
}

// listing a directory with version, which contains more object versions than a page can take
pub async fn test_list_files_with_version_and_limit(op: Operator) -> Result<()> {
pub async fn test_list_with_version_and_limit(op: Operator) -> Result<()> {
// Gdrive think that this test is an abuse of their service and redirect us
// to an infinite loop. Let's ignore this test for gdrive.
if op.info().scheme() == Scheme::Gdrive {
Expand All @@ -724,7 +725,7 @@ pub async fn test_list_files_with_version_and_limit(op: Operator) -> Result<()>
return Ok(());
}

let parent = "test_list_rich_dir/";
let parent = "test_list_with_version_and_limit/";
op.create_dir(parent).await?;

let expected: Vec<String> = (0..=10).map(|num| format!("{parent}file-{num}")).collect();
Expand Down Expand Up @@ -771,3 +772,51 @@ pub async fn test_list_files_with_version_and_limit(op: Operator) -> Result<()>
op.remove_all(parent).await?;
Ok(())
}

pub async fn test_list_with_version_and_start_after(op: Operator) -> Result<()> {
if !op.info().full_capability().list_with_version {
return Ok(());
}

let dir = &format!("{}/", uuid::Uuid::new_v4());

let given: Vec<String> = ["file-0", "file-1", "file-2", "file-3", "file-4", "file-5"]
.iter()
.map(|name| format!("{dir}{name}-{}", uuid::Uuid::new_v4()))
.collect();

given
.iter()
.map(|name| async {
op.write(name, "1").await.expect("write must succeed");
op.write(name, "2").await.expect("write must succeed");
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;

let mut objects = op
.lister_with(dir)
.version(true)
.start_after(&given[2])
.await?;
let mut actual = vec![];
while let Some(o) = objects.try_next().await? {
let path = o.path().to_string();
actual.push(path)
}

let expected: Vec<String> = given.into_iter().skip(3).collect();
let mut expected: Vec<String> = expected
.into_iter()
.flat_map(|v| std::iter::repeat(v).take(2))
.collect();

expected.sort_unstable();
actual.sort_unstable();
assert_eq!(expected, actual);

op.remove_all(dir).await?;

Ok(())
}

0 comments on commit 4901312

Please sign in to comment.