Skip to content

Commit

Permalink
Add streaming API for list objects v2
Browse files Browse the repository at this point in the history
  • Loading branch information
donatello committed Sep 29, 2023
1 parent 6f51637 commit 4122be8
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
**/*.rs.bk
Cargo.lock
.idea
*.env
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ serde_json = "1.0.105"
async-std = { version = "1.12.0", features = ["attributes", "tokio1"] }
async-recursion = "1.0.4"
os_info = "3.7.0"
tokio-stream = "0.1.14"

[dependencies.reqwest]
version = "0.11.20"
Expand Down
23 changes: 11 additions & 12 deletions src/s3/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,22 +1026,22 @@ impl<'a> ListObjectsV1Args<'a> {
}

/// Argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API
pub struct ListObjectsV2Args<'a> {
pub extra_headers: Option<&'a Multimap>,
pub extra_query_params: Option<&'a Multimap>,
pub region: Option<&'a str>,
pub bucket: &'a str,
pub delimiter: Option<&'a str>,
pub encoding_type: Option<&'a str>,
pub struct ListObjectsV2Args {
pub extra_headers: Option<Multimap>,
pub extra_query_params: Option<Multimap>,
pub region: Option<String>,
pub bucket: String,
pub delimiter: Option<String>,
pub encoding_type: Option<String>,
pub max_keys: Option<u16>,
pub prefix: Option<&'a str>,
pub prefix: Option<String>,
pub start_after: Option<String>,
pub continuation_token: Option<String>,
pub fetch_owner: bool,
pub include_user_metadata: bool,
}

impl<'a> ListObjectsV2Args<'a> {
impl ListObjectsV2Args {
/// Returns argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API with given bucket name
///
/// # Examples
Expand All @@ -1050,14 +1050,13 @@ impl<'a> ListObjectsV2Args<'a> {
/// use minio::s3::args::*;
/// let args = ListObjectsV2Args::new("my-bucket").unwrap();
/// ```
pub fn new(bucket_name: &'a str) -> Result<ListObjectsV2Args<'a>, Error> {
pub fn new(bucket_name: &str) -> Result<ListObjectsV2Args, Error> {
check_bucket_name(bucket_name, true)?;

Ok(ListObjectsV2Args {
extra_headers: None,
extra_query_params: None,
region: None,
bucket: bucket_name,
bucket: bucket_name.to_owned(),
delimiter: None,
encoding_type: None,
max_keys: None,
Expand Down
59 changes: 44 additions & 15 deletions src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,29 @@ use crate::s3::types::{
NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig,
};
use crate::s3::utils::{
from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash,
to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap,
check_bucket_name, from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash,
merge, sha256_hash, to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap,
};

use async_recursion::async_recursion;
use bytes::{Buf, Bytes};
use dashmap::DashMap;
use futures_core::Stream;
use hyper::http::Method;
use os_info;
use reqwest::header::HeaderMap;
use xmltree::Element;

use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::prelude::*;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use xmltree::Element;

mod paginated;

use self::paginated::ListObjectsV2Paginated;

fn url_decode(
encoding_type: &Option<String>,
Expand Down Expand Up @@ -614,6 +621,11 @@ impl Client {
object_name: Option<&str>,
data: Option<&[u8]>,
) -> Result<reqwest::Response, Error> {
// Check bucket name validity.
bucket_name
.map(|b| check_bucket_name(b, true))
.unwrap_or(Ok(()))?;

let res = self
.do_execute(
method.clone(),
Expand Down Expand Up @@ -2589,9 +2601,11 @@ impl Client {
/// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API
pub async fn list_objects_v2(
&self,
args: &ListObjectsV2Args<'_>,
args: &ListObjectsV2Args,
) -> Result<ListObjectsV2Response, Error> {
let region = self.get_region(args.bucket, args.region).await?;
let region = self
.get_region(&args.bucket, args.region.as_deref())
.await?;

let mut headers = Multimap::new();
if let Some(v) = &args.extra_headers {
Expand All @@ -2605,10 +2619,10 @@ impl Client {
query_params.insert(String::from("list-type"), String::from("2"));
add_common_list_objects_query_params(
&mut query_params,
args.delimiter,
args.encoding_type,
args.delimiter.as_deref(),
args.encoding_type.as_deref(),
args.max_keys,
args.prefix,
args.prefix.as_deref(),
);
if let Some(v) = &args.continuation_token {
query_params.insert(String::from("continuation-token"), v.to_string());
Expand All @@ -2629,7 +2643,7 @@ impl Client {
&region,
&mut headers,
&query_params,
Some(args.bucket),
Some(&args.bucket),
None,
None,
)
Expand Down Expand Up @@ -2750,6 +2764,20 @@ impl Client {
})
}

/// Executes
/// [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)
/// S3 API repeatedly and returns a stream of results. Each result
/// corresponds to the response of a single list request. This function
/// manages pagination internally and stops when there are no more results.
pub async fn list_objects_v2_stream(
&self,
args: ListObjectsV2Args,
) -> impl Stream<Item = Result<ListObjectsV2Response, Error>> + Unpin {
ListObjectsV2Paginated::new(self.clone(), args)
.stream()
.await
}

/// List objects with version information optionally. `results_fn` callback
/// function is repeatedly called with object information and returning
/// false from the callback stops further listing.
Expand All @@ -2772,20 +2800,21 @@ impl Client {
lov1_args.marker = args.marker.map(|x| x.to_string());

let mut lov2_args = ListObjectsV2Args::new(args.bucket)?;
lov2_args.extra_headers = args.extra_headers;
lov2_args.extra_query_params = args.extra_query_params;
lov2_args.region = args.region;
lov2_args.extra_headers = args.extra_headers.map(|x| x.clone());
lov2_args.extra_query_params = args.extra_query_params.map(|x| x.clone());
lov2_args.region = args.region.map(|x| x.to_string());
if args.recursive {
lov2_args.delimiter = None;
} else {
lov2_args.delimiter = Some(args.delimiter.unwrap_or("/"));
lov2_args.delimiter =
Some(args.delimiter.map(|x| x.clone()).unwrap_or("/").to_string());
}
lov2_args.encoding_type = match args.use_url_encoding_type {
true => Some("url"),
true => Some("url".to_string()),
false => None,
};
lov2_args.max_keys = args.max_keys;
lov2_args.prefix = args.prefix;
lov2_args.prefix = args.prefix.map(|x| x.to_string());
lov2_args.start_after = args.start_after.map(|x| x.to_string());
lov2_args.continuation_token = args.continuation_token.map(|x| x.to_string());
lov2_args.fetch_owner = args.fetch_owner;
Expand Down
38 changes: 38 additions & 0 deletions src/s3/client/paginated.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crate::s3::{
args::ListObjectsV2Args, client::Client, error::Error, response::ListObjectsV2Response,
};

use futures_core::Stream;
use futures_util::stream as futures_stream;

pub struct ListObjectsV2Paginated {
client: Client,
args: ListObjectsV2Args,
}

impl ListObjectsV2Paginated {
pub fn new(client: Client, args: ListObjectsV2Args) -> Self {
ListObjectsV2Paginated { client, args }
}

pub async fn stream(self) -> impl Stream<Item = Result<ListObjectsV2Response, Error>> + Unpin {
Box::pin(futures_stream::unfold(
(self.client, self.args),
move |(client, mut args)| async move {
// args.continuation_token = continuation_token.clone();
let resp = client.list_objects_v2(&args).await;
match resp {
Ok(resp) => {
if !resp.is_truncated {
None
} else {
args.continuation_token = resp.next_continuation_token.clone();
Some((Ok(resp), (client, args)))
}
}
Err(e) => Some((Err(e), (client, args))),
}
},
))
}
}
64 changes: 64 additions & 0 deletions tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::{fs, io};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;

use minio::s3::args::*;
use minio::s3::client::Client;
Expand Down Expand Up @@ -454,6 +455,66 @@ impl ClientTest {
.unwrap();
}

async fn list_objects_v2_stream(&self) {
let bucket_name = rand_bucket_name();
self.client
.make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap())
.await
.unwrap();

let mut names: Vec<String> = Vec::new();
for _ in 1..=3 {
let object_name = rand_object_name();
let size = 0_usize;
self.client
.put_object(
&mut PutObjectArgs::new(
&self.test_bucket,
&object_name,
&mut RandReader::new(size),
Some(size),
None,
)
.unwrap(),
)
.await
.unwrap();
names.push(object_name);
}

let mut list_stream = self
.client
.list_objects_v2_stream(ListObjectsV2Args::new(&self.test_bucket).unwrap())
.await;

while let Some(list_resp) = list_stream.next().await {
let list_resp = list_resp.unwrap();
for item in list_resp.contents.iter() {
assert!(names.contains(&item.name));
}
}

let mut objects: Vec<DeleteObject> = Vec::new();
for name in names.iter() {
objects.push(DeleteObject {
name,
version_id: None,
});
}

self.client
.remove_objects(
&mut RemoveObjectsArgs::new(&self.test_bucket, &mut objects.iter()).unwrap(),
)
.await
.unwrap();

self.client
.remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap())
.await
.unwrap();
}

async fn select_object_content(&self) {
let object_name = rand_object_name();
let mut data = String::new();
Expand Down Expand Up @@ -1198,6 +1259,9 @@ async fn s3_tests() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("list_objects()");
ctest.list_objects().await;

println!("list_objects_v2_stream()");
ctest.list_objects_v2_stream().await;

println!("select_object_content()");
ctest.select_object_content().await;

Expand Down

0 comments on commit 4122be8

Please sign in to comment.