diff --git a/.gitignore b/.gitignore index d889fbf..83dac5d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ **/*.rs.bk Cargo.lock .idea +*.env diff --git a/Cargo.toml b/Cargo.toml index e60073f..a5aa11e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ serde_json = "1.0.105" async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } async-recursion = "1.0.4" os_info = "3.7.0" +tokio-stream = "0.1.14" [dependencies.reqwest] version = "0.11.20" diff --git a/src/s3/args.rs b/src/s3/args.rs index 7173a99..88cfbb5 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -1026,22 +1026,22 @@ impl<'a> ListObjectsV1Args<'a> { } /// Argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API -pub struct ListObjectsV2Args<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub delimiter: Option<&'a str>, - pub encoding_type: Option<&'a str>, +pub struct ListObjectsV2Args { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, + pub bucket: String, + pub delimiter: Option, + pub encoding_type: Option, pub max_keys: Option, - pub prefix: Option<&'a str>, + pub prefix: Option, pub start_after: Option, pub continuation_token: Option, pub fetch_owner: bool, pub include_user_metadata: bool, } -impl<'a> ListObjectsV2Args<'a> { +impl ListObjectsV2Args { /// Returns argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API with given bucket name /// /// # Examples @@ -1050,14 +1050,13 @@ impl<'a> ListObjectsV2Args<'a> { /// use minio::s3::args::*; /// let args = ListObjectsV2Args::new("my-bucket").unwrap(); /// ``` - pub fn new(bucket_name: &'a str) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; - Ok(ListObjectsV2Args { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), delimiter: None, encoding_type: None, max_keys: None, diff --git a/src/s3/client.rs b/src/s3/client.rs index ecb2374..09bba46 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -27,22 +27,29 @@ use crate::s3::types::{ NotificationRecords, ObjectLockConfig, Part, ReplicationConfig, RetentionMode, SseConfig, }; use crate::s3::utils::{ - from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, - to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap, + check_bucket_name, from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, + merge, sha256_hash, to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap, }; + use async_recursion::async_recursion; use bytes::{Buf, Bytes}; use dashmap::DashMap; +use futures_core::Stream; use hyper::http::Method; use os_info; use reqwest::header::HeaderMap; +use xmltree::Element; + use std::collections::{HashMap, VecDeque}; use std::fs::File; use std::io::prelude::*; use std::io::Read; use std::path::{Path, PathBuf}; use std::sync::Arc; -use xmltree::Element; + +mod paginated; + +use self::paginated::ListObjectsV2Paginated; fn url_decode( encoding_type: &Option, @@ -614,6 +621,11 @@ impl Client { object_name: Option<&str>, data: Option<&[u8]>, ) -> Result { + // Check bucket name validity. + bucket_name + .map(|b| check_bucket_name(b, true)) + .unwrap_or(Ok(()))?; + let res = self .do_execute( method.clone(), @@ -2589,9 +2601,11 @@ impl Client { /// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API pub async fn list_objects_v2( &self, - args: &ListObjectsV2Args<'_>, + args: &ListObjectsV2Args, ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; + let region = self + .get_region(&args.bucket, args.region.as_deref()) + .await?; let mut headers = Multimap::new(); if let Some(v) = &args.extra_headers { @@ -2605,10 +2619,10 @@ impl Client { query_params.insert(String::from("list-type"), String::from("2")); add_common_list_objects_query_params( &mut query_params, - args.delimiter, - args.encoding_type, + args.delimiter.as_deref(), + args.encoding_type.as_deref(), args.max_keys, - args.prefix, + args.prefix.as_deref(), ); if let Some(v) = &args.continuation_token { query_params.insert(String::from("continuation-token"), v.to_string()); @@ -2629,7 +2643,7 @@ impl Client { ®ion, &mut headers, &query_params, - Some(args.bucket), + Some(&args.bucket), None, None, ) @@ -2750,6 +2764,20 @@ impl Client { }) } + /// Executes + /// [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) + /// S3 API repeatedly and returns a stream of results. Each result + /// corresponds to the response of a single list request. This function + /// manages pagination internally and stops when there are no more results. + pub async fn list_objects_v2_stream( + &self, + args: ListObjectsV2Args, + ) -> impl Stream> + Unpin { + ListObjectsV2Paginated::new(self.clone(), args) + .stream() + .await + } + /// List objects with version information optionally. `results_fn` callback /// function is repeatedly called with object information and returning /// false from the callback stops further listing. @@ -2772,20 +2800,21 @@ impl Client { lov1_args.marker = args.marker.map(|x| x.to_string()); let mut lov2_args = ListObjectsV2Args::new(args.bucket)?; - lov2_args.extra_headers = args.extra_headers; - lov2_args.extra_query_params = args.extra_query_params; - lov2_args.region = args.region; + lov2_args.extra_headers = args.extra_headers.map(|x| x.clone()); + lov2_args.extra_query_params = args.extra_query_params.map(|x| x.clone()); + lov2_args.region = args.region.map(|x| x.to_string()); if args.recursive { lov2_args.delimiter = None; } else { - lov2_args.delimiter = Some(args.delimiter.unwrap_or("/")); + lov2_args.delimiter = + Some(args.delimiter.map(|x| x.clone()).unwrap_or("/").to_string()); } lov2_args.encoding_type = match args.use_url_encoding_type { - true => Some("url"), + true => Some("url".to_string()), false => None, }; lov2_args.max_keys = args.max_keys; - lov2_args.prefix = args.prefix; + lov2_args.prefix = args.prefix.map(|x| x.to_string()); lov2_args.start_after = args.start_after.map(|x| x.to_string()); lov2_args.continuation_token = args.continuation_token.map(|x| x.to_string()); lov2_args.fetch_owner = args.fetch_owner; diff --git a/src/s3/client/paginated.rs b/src/s3/client/paginated.rs new file mode 100644 index 0000000..de4b888 --- /dev/null +++ b/src/s3/client/paginated.rs @@ -0,0 +1,38 @@ +use crate::s3::{ + args::ListObjectsV2Args, client::Client, error::Error, response::ListObjectsV2Response, +}; + +use futures_core::Stream; +use futures_util::stream as futures_stream; + +pub struct ListObjectsV2Paginated { + client: Client, + args: ListObjectsV2Args, +} + +impl ListObjectsV2Paginated { + pub fn new(client: Client, args: ListObjectsV2Args) -> Self { + ListObjectsV2Paginated { client, args } + } + + pub async fn stream(self) -> impl Stream> + Unpin { + Box::pin(futures_stream::unfold( + (self.client, self.args), + move |(client, mut args)| async move { + // args.continuation_token = continuation_token.clone(); + let resp = client.list_objects_v2(&args).await; + match resp { + Ok(resp) => { + if !resp.is_truncated { + None + } else { + args.continuation_token = resp.next_continuation_token.clone(); + Some((Ok(resp), (client, args))) + } + } + Err(e) => Some((Err(e), (client, args))), + } + }, + )) + } +} diff --git a/tests/tests.rs b/tests/tests.rs index 33aed54..656c2f0 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -24,6 +24,7 @@ use std::io::BufReader; use std::path::{Path, PathBuf}; use std::{fs, io}; use tokio::sync::mpsc; +use tokio_stream::StreamExt; use minio::s3::args::*; use minio::s3::client::Client; @@ -454,6 +455,66 @@ impl ClientTest { .unwrap(); } + async fn list_objects_v2_stream(&self) { + let bucket_name = rand_bucket_name(); + self.client + .make_bucket(&MakeBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + + let mut names: Vec = Vec::new(); + for _ in 1..=3 { + let object_name = rand_object_name(); + let size = 0_usize; + self.client + .put_object( + &mut PutObjectArgs::new( + &self.test_bucket, + &object_name, + &mut RandReader::new(size), + Some(size), + None, + ) + .unwrap(), + ) + .await + .unwrap(); + names.push(object_name); + } + + let mut list_stream = self + .client + .list_objects_v2_stream(ListObjectsV2Args::new(&self.test_bucket).unwrap()) + .await; + + while let Some(list_resp) = list_stream.next().await { + let list_resp = list_resp.unwrap(); + for item in list_resp.contents.iter() { + assert!(names.contains(&item.name)); + } + } + + let mut objects: Vec = Vec::new(); + for name in names.iter() { + objects.push(DeleteObject { + name, + version_id: None, + }); + } + + self.client + .remove_objects( + &mut RemoveObjectsArgs::new(&self.test_bucket, &mut objects.iter()).unwrap(), + ) + .await + .unwrap(); + + self.client + .remove_bucket(&RemoveBucketArgs::new(&bucket_name).unwrap()) + .await + .unwrap(); + } + async fn select_object_content(&self) { let object_name = rand_object_name(); let mut data = String::new(); @@ -1198,6 +1259,9 @@ async fn s3_tests() -> Result<(), Box> { println!("list_objects()"); ctest.list_objects().await; + println!("list_objects_v2_stream()"); + ctest.list_objects_v2_stream().await; + println!("select_object_content()"); ctest.select_object_content().await;