diff --git a/.gitignore b/.gitignore index d889fbf..83dac5d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ **/*.rs.bk Cargo.lock .idea +*.env diff --git a/Cargo.toml b/Cargo.toml index e60073f..a5aa11e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ serde_json = "1.0.105" async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } async-recursion = "1.0.4" os_info = "3.7.0" +tokio-stream = "0.1.14" [dependencies.reqwest] version = "0.11.20" diff --git a/src/s3/args.rs b/src/s3/args.rs index 7173a99..88cfbb5 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -1026,22 +1026,22 @@ impl<'a> ListObjectsV1Args<'a> { } /// Argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API -pub struct ListObjectsV2Args<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub delimiter: Option<&'a str>, - pub encoding_type: Option<&'a str>, +pub struct ListObjectsV2Args { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, + pub bucket: String, + pub delimiter: Option, + pub encoding_type: Option, pub max_keys: Option, - pub prefix: Option<&'a str>, + pub prefix: Option, pub start_after: Option, pub continuation_token: Option, pub fetch_owner: bool, pub include_user_metadata: bool, } -impl<'a> ListObjectsV2Args<'a> { +impl ListObjectsV2Args { /// Returns argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API with given bucket name /// /// # Examples @@ -1050,14 +1050,13 @@ impl<'a> ListObjectsV2Args<'a> { /// use minio::s3::args::*; /// let args = ListObjectsV2Args::new("my-bucket").unwrap(); /// ``` - pub fn new(bucket_name: &'a str) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; - Ok(ListObjectsV2Args { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), delimiter: None, encoding_type: None, max_keys: None, diff --git a/src/s3/client.rs b/src/s3/client.rs index ecb2374..e47ba7b 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -30,19 +30,25 @@ use crate::s3::utils::{ from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap, }; + use async_recursion::async_recursion; use bytes::{Buf, Bytes}; use dashmap::DashMap; use hyper::http::Method; use os_info; use reqwest::header::HeaderMap; +use xmltree::Element; + use std::collections::{HashMap, VecDeque}; use std::fs::File; use std::io::prelude::*; use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; -use xmltree::Element; + +mod list_objects; + +pub use list_objects::*; fn url_decode( encoding_type: &Option, @@ -2589,9 +2595,11 @@ impl Client { /// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API pub async fn list_objects_v2( &self, - args: &ListObjectsV2Args<'_>, + args: &ListObjectsV2Args, ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; + let region = self + .get_region(&args.bucket, args.region.as_deref()) + .await?; let mut headers = Multimap::new(); if let Some(v) = &args.extra_headers { @@ -2605,10 +2613,10 @@ impl Client { query_params.insert(String::from("list-type"), String::from("2")); add_common_list_objects_query_params( &mut query_params, - args.delimiter, - args.encoding_type, + args.delimiter.as_deref(), + args.encoding_type.as_deref(), args.max_keys, - args.prefix, + args.prefix.as_deref(), ); if let Some(v) = &args.continuation_token { query_params.insert(String::from("continuation-token"), v.to_string()); @@ -2629,7 +2637,7 @@ impl Client { ®ion, &mut headers, &query_params, - Some(args.bucket), + Some(&args.bucket), None, None, ) @@ -2772,20 +2780,21 @@ impl Client { lov1_args.marker = args.marker.map(|x| x.to_string()); let mut lov2_args = ListObjectsV2Args::new(args.bucket)?; - lov2_args.extra_headers = args.extra_headers; - lov2_args.extra_query_params = args.extra_query_params; - lov2_args.region = args.region; + lov2_args.extra_headers = args.extra_headers.map(|x| x.clone()); + lov2_args.extra_query_params = args.extra_query_params.map(|x| x.clone()); + lov2_args.region = args.region.map(|x| x.to_string()); if args.recursive { lov2_args.delimiter = None; } else { - lov2_args.delimiter = Some(args.delimiter.unwrap_or("/")); + lov2_args.delimiter = + Some(args.delimiter.map(|x| x.clone()).unwrap_or("/").to_string()); } lov2_args.encoding_type = match args.use_url_encoding_type { - true => Some("url"), + true => Some("url".to_string()), false => None, }; lov2_args.max_keys = args.max_keys; - lov2_args.prefix = args.prefix; + lov2_args.prefix = args.prefix.map(|x| x.to_string()); lov2_args.start_after = args.start_after.map(|x| x.to_string()); lov2_args.continuation_token = args.continuation_token.map(|x| x.to_string()); lov2_args.fetch_owner = args.fetch_owner; diff --git a/src/s3/client/list_objects.rs b/src/s3/client/list_objects.rs new file mode 100644 index 0000000..3f75839 --- /dev/null +++ b/src/s3/client/list_objects.rs @@ -0,0 +1,36 @@ +use super::Client; +use crate::s3::{args::ListObjectsV2Args, error::Error, response::ListObjectsV2Response}; + +use futures_core::Stream; +use futures_util::stream as futures_stream; + +impl Client { + /// Executes + /// [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) + /// S3 API repeatedly and returns a stream of results. Each result + /// corresponds to the response of a single list request. This function + /// manages pagination internally and stops when there are no more results. + pub async fn list_objects_v2_stream( + &self, + args: ListObjectsV2Args, + ) -> impl Stream> + Unpin { + Box::pin(futures_stream::unfold( + (self.clone(), args), + move |(client, mut args)| async move { + // args.continuation_token = continuation_token.clone(); + let resp = client.list_objects_v2(&args).await; + match resp { + Ok(resp) => { + if !resp.is_truncated { + None + } else { + args.continuation_token = resp.next_continuation_token.clone(); + Some((Ok(resp), (client, args))) + } + } + Err(e) => Some((Err(e), (client, args))), + } + }, + )) + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 33aed54..656c2f0 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -24,6 +24,7 @@ use std::io::BufReader; use std::path::{Path, PathBuf}; use std::{fs, io}; use tokio::sync::mpsc; +use tokio_stream::StreamExt; use minio::s3::args::*; use minio::s3::client::Client; @@ -454,6 +455,66 @@ impl ClientTest { .unwrap(); } + async fn list_objects_v2_stream(&self) { + let bucket_name = rand_bucket_name(); + self.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let mut names: Vec = Vec::new(); + for _ in 1..=3 { + let object_name = rand_object_name(); + let size = 0_usize; + self.client + .put_object( + &mut PutObjectArgs::new( + &self.test_bucket, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) + .await + .unwrap(); + names.push(object_name); + } + + let mut list_stream = self + .client + .list_objects_v2_stream(ListObjectsV2Args::new(&self.test_bucket).unwrap()) + .await; + + while let Some(list_resp) = list_stream.next().await { + let list_resp = list_resp.unwrap(); + for item in list_resp.contents.iter() { + assert!(names.contains(&item.name)); + } + } + + let mut objects: Vec = Vec::new(); + for name in names.iter() { + objects.push(DeleteObject { + name, + version_id: None, + }); + } + + self.client + .remove_objects( + &mut RemoveObjectsArgs::new(&self.test_bucket, &mut objects.iter()).unwrap(), + ) + .await + .unwrap(); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + async fn select_object_content(&self) { let object_name = rand_object_name(); let mut data = String::new(); @@ -1198,6 +1259,9 @@ async fn s3_tests() -> Result<(), Box> { println!("list_objects()"); ctest.list_objects().await; + println!("list_objects_v2_stream()"); + ctest.list_objects_v2_stream().await; + println!("select_object_content()"); ctest.select_object_content().await;