From 8fb211ae0ed2b4762f13e2814d4cd8f3e35be460 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Wed, 4 Oct 2023 21:16:44 -0700 Subject: [PATCH] Add streaming APIs for list objects (#54) --- .gitignore | 1 + .rustfmt.toml | 1 + src/s3/args.rs | 200 +++++++----- src/s3/client.rs | 492 +---------------------------- src/s3/client/list_objects.rs | 561 ++++++++++++++++++++++++++++++++++ src/s3/response.rs | 10 +- src/s3/types.rs | 2 +- tests/tests.rs | 25 +- 8 files changed, 717 insertions(+), 575 deletions(-) create mode 100644 .rustfmt.toml create mode 100644 src/s3/client/list_objects.rs diff --git a/.gitignore b/.gitignore index d889fbf..83dac5d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ **/*.rs.bk Cargo.lock .idea +*.env diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..3a26366 --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1 @@ +edition = "2021" diff --git a/src/s3/args.rs b/src/s3/args.rs index cb5414a..f9640af 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -19,7 +19,7 @@ use crate::s3::error::Error; use crate::s3::signer::post_presign_v4; use crate::s3::sse::{Sse, SseCustomerKey}; use crate::s3::types::{ - DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, + DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, ReplicationConfig, Retention, RetentionMode, SelectRequest, SseConfig, }; use crate::s3::utils::{ @@ -987,19 +987,48 @@ impl<'a> RemoveObjectsArgs<'a> { } /// Argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API -pub struct ListObjectsV1Args<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub delimiter: Option<&'a str>, - pub encoding_type: Option<&'a str>, +#[derive(Clone, Debug)] +pub struct ListObjectsV1Args { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, + pub bucket: String, + pub delimiter: Option, + pub use_url_encoding_type: bool, pub max_keys: Option, - pub prefix: Option<&'a str>, + pub prefix: Option, pub marker: Option, } -impl<'a> ListObjectsV1Args<'a> { +// Helper function delimiter based on recursive flag when delimiter is not +// provided. +fn delim_helper(delim: Option, recursive: bool) -> Option { + if delim.is_some() { + return delim; + } + match recursive { + true => None, + false => Some(String::from("/")), + } +} + +impl From for ListObjectsV1Args { + fn from(value: ListObjectsArgs) -> Self { + ListObjectsV1Args { + extra_headers: value.extra_headers, + extra_query_params: value.extra_query_params, + region: value.region, + bucket: value.bucket, + delimiter: delim_helper(value.delimiter, value.recursive), + use_url_encoding_type: value.use_url_encoding_type, + max_keys: value.max_keys, + prefix: value.prefix, + marker: value.marker, + } + } +} + +impl ListObjectsV1Args { /// Returns argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API with given bucket name /// /// # Examples @@ -1008,16 +1037,16 @@ impl<'a> ListObjectsV1Args<'a> { /// use minio::s3::args::*; /// let args = ListObjectsV1Args::new("my-bucket").unwrap(); /// ``` - pub fn new(bucket_name: &'a str) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; Ok(ListObjectsV1Args { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), delimiter: None, - encoding_type: None, + use_url_encoding_type: true, max_keys: None, prefix: None, marker: None, @@ -1026,22 +1055,42 @@ impl<'a> ListObjectsV1Args<'a> { } /// Argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API -pub struct ListObjectsV2Args<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub delimiter: Option<&'a str>, - pub encoding_type: Option<&'a str>, +#[derive(Clone, Debug)] +pub struct ListObjectsV2Args { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, + pub bucket: String, + pub delimiter: Option, + pub use_url_encoding_type: bool, pub max_keys: Option, - pub prefix: Option<&'a str>, + pub prefix: Option, pub start_after: Option, pub continuation_token: Option, pub fetch_owner: bool, pub include_user_metadata: bool, } -impl<'a> ListObjectsV2Args<'a> { +impl From for ListObjectsV2Args { + fn from(value: ListObjectsArgs) -> Self { + ListObjectsV2Args { + extra_headers: value.extra_headers, + extra_query_params: value.extra_query_params, + region: value.region, + bucket: value.bucket, + delimiter: delim_helper(value.delimiter, value.recursive), + use_url_encoding_type: value.use_url_encoding_type, + max_keys: value.max_keys, + prefix: value.prefix, + start_after: value.start_after, + continuation_token: value.continuation_token, + fetch_owner: value.fetch_owner, + include_user_metadata: value.include_user_metadata, + } + } +} + +impl ListObjectsV2Args { /// Returns argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API with given bucket name /// /// # Examples @@ -1050,16 +1099,15 @@ impl<'a> ListObjectsV2Args<'a> { /// use minio::s3::args::*; /// let args = ListObjectsV2Args::new("my-bucket").unwrap(); /// ``` - pub fn new(bucket_name: &'a str) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; - Ok(ListObjectsV2Args { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), delimiter: None, - encoding_type: None, + use_url_encoding_type: true, max_keys: None, prefix: None, start_after: None, @@ -1071,20 +1119,37 @@ impl<'a> ListObjectsV2Args<'a> { } /// Argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API -pub struct ListObjectVersionsArgs<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, - pub bucket: &'a str, - pub delimiter: Option<&'a str>, - pub encoding_type: Option<&'a str>, +pub struct ListObjectVersionsArgs { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, + pub bucket: String, + pub delimiter: Option, + pub use_url_encoding_type: bool, pub max_keys: Option, - pub prefix: Option<&'a str>, + pub prefix: Option, pub key_marker: Option, pub version_id_marker: Option, } -impl<'a> ListObjectVersionsArgs<'a> { +impl From for ListObjectVersionsArgs { + fn from(value: ListObjectsArgs) -> Self { + ListObjectVersionsArgs { + extra_headers: value.extra_headers, + extra_query_params: value.extra_query_params, + region: value.region, + bucket: value.bucket, + delimiter: delim_helper(value.delimiter, value.recursive), + use_url_encoding_type: value.use_url_encoding_type, + max_keys: value.max_keys, + prefix: value.prefix, + key_marker: value.key_marker, + version_id_marker: value.version_id_marker, + } + } +} + +impl ListObjectVersionsArgs { /// Returns argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API with given bucket name /// /// # Examples @@ -1093,16 +1158,16 @@ impl<'a> ListObjectVersionsArgs<'a> { /// use minio::s3::args::*; /// let args = ListObjectVersionsArgs::new("my-bucket").unwrap(); /// ``` - pub fn new(bucket_name: &'a str) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; Ok(ListObjectVersionsArgs { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), delimiter: None, - encoding_type: None, + use_url_encoding_type: true, max_keys: None, prefix: None, key_marker: None, @@ -1112,68 +1177,62 @@ impl<'a> ListObjectVersionsArgs<'a> { } /// Argument for [list_objects()](crate::s3::client::Client::list_objects) API -pub struct ListObjectsArgs<'a> { - pub extra_headers: Option<&'a Multimap>, - pub extra_query_params: Option<&'a Multimap>, - pub region: Option<&'a str>, +#[derive(Clone, Debug)] +pub struct ListObjectsArgs { + pub extra_headers: Option, + pub extra_query_params: Option, + pub region: Option, /// Specifies the bucket name on which listing is to be performed. - pub bucket: &'a str, + pub bucket: String, /// Delimiter to roll up common prefixes on. - pub delimiter: Option<&'a str>, + pub delimiter: Option, pub use_url_encoding_type: bool, + pub max_keys: Option, + pub prefix: Option, + /// Used only with ListObjectsV1. - pub marker: Option<&'a str>, + pub marker: Option, + /// Used only with ListObjectsV2 - pub start_after: Option<&'a str>, - /// Used only with GetObjectVersions. - pub key_marker: Option<&'a str>, - pub max_keys: Option, - pub prefix: Option<&'a str>, + pub start_after: Option, /// Used only with ListObjectsV2. - pub continuation_token: Option<&'a str>, + pub continuation_token: Option, /// Used only with ListObjectsV2. pub fetch_owner: bool, - /// Used only with GetObjectVersions. - pub version_id_marker: Option<&'a str>, /// MinIO extension for ListObjectsV2. pub include_user_metadata: bool, + + /// Used only with GetObjectVersions. + pub key_marker: Option, + /// Used only with GetObjectVersions. + pub version_id_marker: Option, + + /// This parameter takes effect only when delimiter is None. Enables + /// recursive traversal for listing of the bucket and prefix. pub recursive: bool, /// Set this to use ListObjectsV1. Defaults to false. pub use_api_v1: bool, /// Set this to include versions. pub include_versions: bool, - /// A callback function to process results of object listing. - pub result_fn: &'a dyn Fn(Vec) -> bool, } -impl<'a> ListObjectsArgs<'a> { +impl ListObjectsArgs { /// Returns argument for [list_objects()](crate::s3::client::Client::list_objects) API with given bucket name and callback function for results. /// /// # Examples /// /// ``` /// use minio::s3::args::*; - /// let args = ListObjectsArgs::new( - /// "my-bucket", - /// &|items| { - /// for item in items.iter() { - /// println!("{:?}", item.name); - /// } - /// true - /// }, - /// ).unwrap(); + /// let args = ListObjectsArgs::new("my-bucket").unwrap(); /// ``` - pub fn new( - bucket_name: &'a str, - result_fn: &'a dyn Fn(Vec) -> bool, - ) -> Result, Error> { + pub fn new(bucket_name: &str) -> Result { check_bucket_name(bucket_name, true)?; Ok(ListObjectsArgs { extra_headers: None, extra_query_params: None, region: None, - bucket: bucket_name, + bucket: bucket_name.to_owned(), delimiter: None, use_url_encoding_type: true, marker: None, @@ -1188,7 +1247,6 @@ impl<'a> ListObjectsArgs<'a> { recursive: false, use_api_v1: false, include_versions: false, - result_fn, }) } } diff --git a/src/s3/client.rs b/src/s3/client.rs index e83ae6f..46d5c30 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -23,13 +23,14 @@ use crate::s3::response::*; use crate::s3::signer::{presign_v4, sign_v4_s3}; use crate::s3::sse::SseCustomerKey; use crate::s3::types::{ - Bucket, DeleteObject, Directive, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, - Part, ReplicationConfig, RetentionMode, SseConfig, + Bucket, DeleteObject, Directive, LifecycleConfig, NotificationConfig, ObjectLockConfig, Part, + ReplicationConfig, RetentionMode, SseConfig, }; use crate::s3::utils::{ from_iso8601utc, get_default_text, get_option_text, get_text, md5sum_hash, merge, sha256_hash, - to_amz_date, to_iso8601utc, urldecode, utc_now, Multimap, + to_amz_date, to_iso8601utc, utc_now, Multimap, }; + use async_recursion::async_recursion; use bytes::{Buf, Bytes}; use dashmap::DashMap; @@ -44,170 +45,12 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use xmltree::Element; +mod list_objects; mod listen_bucket_notification; +pub use list_objects::*; pub use listen_bucket_notification::*; -fn url_decode( - encoding_type: &Option, - prefix: Option, -) -> Result, Error> { - if let Some(v) = encoding_type.as_ref() { - if v == "url" { - if let Some(v) = prefix { - return Ok(Some(urldecode(&v)?.to_string())); - } - } - } - - if let Some(v) = prefix.as_ref() { - return Ok(Some(v.to_string())); - } - - Ok(None) -} - -fn add_common_list_objects_query_params( - query_params: &mut Multimap, - delimiter: Option<&str>, - encoding_type: Option<&str>, - max_keys: Option, - prefix: Option<&str>, -) { - query_params.insert( - String::from("delimiter"), - delimiter.unwrap_or("").to_string(), - ); - query_params.insert( - String::from("max-keys"), - max_keys.unwrap_or(1000).to_string(), - ); - query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string()); - if let Some(v) = encoding_type { - query_params.insert(String::from("encoding-type"), v.to_string()); - } -} - -fn parse_common_list_objects_response( - root: &Element, -) -> Result< - ( - String, - Option, - Option, - Option, - bool, - Option, - ), - Error, -> { - let encoding_type = get_option_text(root, "EncodingType"); - let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?; - Ok(( - get_text(root, "Name")?, - encoding_type, - prefix, - get_option_text(root, "Delimiter"), - match get_option_text(root, "IsTruncated") { - Some(v) => v.to_lowercase() == "true", - None => false, - }, - match get_option_text(root, "MaxKeys") { - Some(v) => Some(v.parse::()?), - None => None, - }, - )) -} - -fn parse_list_objects_contents( - contents: &mut Vec, - root: &mut xmltree::Element, - tag: &str, - encoding_type: &Option, - is_delete_marker: bool, -) -> Result<(), Error> { - while let Some(v) = root.take_child(tag) { - let content = v; - let etype = encoding_type.as_ref().cloned(); - let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap(); - let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?); - let etag = get_option_text(&content, "ETag"); - let v = get_default_text(&content, "Size"); - let size = match v.is_empty() { - true => None, - false => Some(v.parse::()?), - }; - let storage_class = get_option_text(&content, "StorageClass"); - let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true"; - let version_id = get_option_text(&content, "VersionId"); - let (owner_id, owner_name) = match content.get_child("Owner") { - Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")), - None => (None, None), - }; - let user_metadata = match content.get_child("UserMetadata") { - Some(v) => { - let mut map: HashMap = HashMap::new(); - for xml_node in &v.children { - let u = xml_node - .as_element() - .ok_or(Error::XmlError("unable to convert to element".to_string()))?; - map.insert( - u.name.to_string(), - u.get_text().unwrap_or_default().to_string(), - ); - } - Some(map) - } - None => None, - }; - - contents.push(Item { - name: key, - last_modified, - etag, - owner_id, - owner_name, - size, - storage_class, - is_latest, - version_id, - user_metadata, - is_prefix: false, - is_delete_marker, - encoding_type: etype, - }); - } - - Ok(()) -} - -fn parse_list_objects_common_prefixes( - contents: &mut Vec, - root: &mut Element, - encoding_type: &Option, -) -> Result<(), Error> { - while let Some(v) = root.take_child("CommonPrefixes") { - let common_prefix = v; - contents.push(Item { - name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(), - last_modified: None, - etag: None, - owner_id: None, - owner_name: None, - size: None, - storage_class: None, - is_latest: false, - version_id: None, - user_metadata: None, - is_prefix: true, - is_delete_marker: false, - encoding_type: encoding_type.as_ref().cloned(), - }); - } - - Ok(()) -} - /// Client Builder manufactures a Client using given parameters. #[derive(Debug, Default)] pub struct ClientBuilder { @@ -315,7 +158,7 @@ impl Client { /// use minio::s3::client::Client; /// use minio::s3::creds::StaticProvider; /// use minio::s3::http::BaseUrl; - /// let mut base_url: BaseUrl = "play.min.io".parse().unwrap(); + /// let base_url: BaseUrl = "play.min.io".parse().unwrap(); /// let static_provider = StaticProvider::new( /// "Q3AM3UQ867SPQQA43P2F", /// "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", @@ -2433,327 +2276,6 @@ impl Client { }) } - /// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API - pub async fn list_objects_v1( - &self, - args: &ListObjectsV1Args<'_>, - ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - add_common_list_objects_query_params( - &mut query_params, - args.delimiter, - args.encoding_type, - args.max_keys, - args.prefix, - ); - if let Some(v) = &args.marker { - query_params.insert(String::from("marker"), v.to_string()); - } - - let resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(args.bucket), - None, - None, - ) - .await?; - let header_map = resp.headers().clone(); - let body = resp.bytes().await?; - let mut root = Element::parse(body.reader())?; - - let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = - parse_common_list_objects_response(&root)?; - let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?; - let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?; - let mut contents: Vec = Vec::new(); - parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; - if is_truncated && next_marker.is_none() { - next_marker = contents.last().map(|v| v.name.clone()) - } - parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; - - Ok(ListObjectsV1Response { - headers: header_map, - name, - encoding_type, - prefix, - delimiter, - is_truncated, - max_keys, - contents, - marker, - next_marker, - }) - } - - /// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API - pub async fn list_objects_v2( - &self, - args: &ListObjectsV2Args<'_>, - ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - query_params.insert(String::from("list-type"), String::from("2")); - add_common_list_objects_query_params( - &mut query_params, - args.delimiter, - args.encoding_type, - args.max_keys, - args.prefix, - ); - if let Some(v) = &args.continuation_token { - query_params.insert(String::from("continuation-token"), v.to_string()); - } - if args.fetch_owner { - query_params.insert(String::from("fetch-owner"), String::from("true")); - } - if let Some(v) = &args.start_after { - query_params.insert(String::from("start-after"), v.to_string()); - } - if args.include_user_metadata { - query_params.insert(String::from("metadata"), String::from("true")); - } - - let resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(args.bucket), - None, - None, - ) - .await?; - let header_map = resp.headers().clone(); - let body = resp.bytes().await?; - let mut root = Element::parse(body.reader())?; - - let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = - parse_common_list_objects_response(&root)?; - let text = get_option_text(&root, "KeyCount"); - let key_count = match text { - Some(v) => match v.is_empty() { - true => None, - false => Some(v.parse::()?), - }, - None => None, - }; - let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?; - let continuation_token = get_option_text(&root, "ContinuationToken"); - let next_continuation_token = get_option_text(&root, "NextContinuationToken"); - let mut contents: Vec = Vec::new(); - parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; - parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; - - Ok(ListObjectsV2Response { - headers: header_map, - name, - encoding_type, - prefix, - delimiter, - is_truncated, - max_keys, - contents, - key_count, - start_after, - continuation_token, - next_continuation_token, - }) - } - - /// Executes [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) S3 API - pub async fn list_object_versions( - &self, - args: &ListObjectVersionsArgs<'_>, - ) -> Result { - let region = self.get_region(args.bucket, args.region).await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - query_params.insert(String::from("versions"), String::new()); - add_common_list_objects_query_params( - &mut query_params, - args.delimiter, - args.encoding_type, - args.max_keys, - args.prefix, - ); - if let Some(v) = &args.key_marker { - query_params.insert(String::from("key-marker"), v.to_string()); - } - if let Some(v) = &args.version_id_marker { - query_params.insert(String::from("version-id-marker"), v.to_string()); - } - - let resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(args.bucket), - None, - None, - ) - .await?; - let header_map = resp.headers().clone(); - let body = resp.bytes().await?; - let mut root = Element::parse(body.reader())?; - - let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = - parse_common_list_objects_response(&root)?; - let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?; - let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?; - let version_id_marker = get_option_text(&root, "VersionIdMarker"); - let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker"); - let mut contents: Vec = Vec::new(); - parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?; - parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; - parse_list_objects_contents( - &mut contents, - &mut root, - "DeleteMarker", - &encoding_type, - true, - )?; - - Ok(ListObjectVersionsResponse { - headers: header_map, - name, - encoding_type, - prefix, - delimiter, - is_truncated, - max_keys, - contents, - key_marker, - next_key_marker, - version_id_marker, - next_version_id_marker, - }) - } - - /// List objects with version information optionally. `results_fn` callback - /// function is repeatedly called with object information and returning - /// false from the callback stops further listing. - pub async fn list_objects(&self, args: &ListObjectsArgs<'_>) -> Result<(), Error> { - let mut lov1_args = ListObjectsV1Args::new(args.bucket)?; - lov1_args.extra_headers = args.extra_headers; - lov1_args.extra_query_params = args.extra_query_params; - lov1_args.region = args.region; - if args.recursive { - lov1_args.delimiter = None; - } else { - lov1_args.delimiter = Some(args.delimiter.unwrap_or("/")); - } - lov1_args.encoding_type = match args.use_url_encoding_type { - true => Some("url"), - false => None, - }; - lov1_args.max_keys = args.max_keys; - lov1_args.prefix = args.prefix; - lov1_args.marker = args.marker.map(|x| x.to_string()); - - let mut lov2_args = ListObjectsV2Args::new(args.bucket)?; - lov2_args.extra_headers = args.extra_headers; - lov2_args.extra_query_params = args.extra_query_params; - lov2_args.region = args.region; - if args.recursive { - lov2_args.delimiter = None; - } else { - lov2_args.delimiter = Some(args.delimiter.unwrap_or("/")); - } - lov2_args.encoding_type = match args.use_url_encoding_type { - true => Some("url"), - false => None, - }; - lov2_args.max_keys = args.max_keys; - lov2_args.prefix = args.prefix; - lov2_args.start_after = args.start_after.map(|x| x.to_string()); - lov2_args.continuation_token = args.continuation_token.map(|x| x.to_string()); - lov2_args.fetch_owner = args.fetch_owner; - lov2_args.include_user_metadata = args.include_user_metadata; - - let mut lov_args = ListObjectVersionsArgs::new(args.bucket)?; - lov_args.extra_headers = args.extra_headers; - lov_args.extra_query_params = args.extra_query_params; - lov_args.region = args.region; - if args.recursive { - lov_args.delimiter = None; - } else { - lov_args.delimiter = Some(args.delimiter.unwrap_or("/")); - } - lov_args.encoding_type = match args.use_url_encoding_type { - true => Some("url"), - false => None, - }; - lov_args.max_keys = args.max_keys; - lov_args.prefix = args.prefix; - lov_args.key_marker = args.key_marker.map(|x| x.to_string()); - lov_args.version_id_marker = args.version_id_marker.map(|x| x.to_string()); - - let mut stop = false; - while !stop { - if args.include_versions { - let resp = self.list_object_versions(&lov_args).await?; - stop = !resp.is_truncated; - if resp.is_truncated { - lov_args.key_marker = resp.next_key_marker; - lov_args.version_id_marker = resp.next_version_id_marker; - } - stop = stop || !(args.result_fn)(resp.contents); - } else if args.use_api_v1 { - let resp = self.list_objects_v1(&lov1_args).await?; - stop = !resp.is_truncated; - if resp.is_truncated { - lov1_args.marker = resp.next_marker; - } - stop = stop || !(args.result_fn)(resp.contents); - } else { - let resp = self.list_objects_v2(&lov2_args).await?; - stop = !resp.is_truncated; - if resp.is_truncated { - lov2_args.start_after = resp.start_after; - lov2_args.continuation_token = resp.next_continuation_token; - } - stop = stop || !(args.result_fn)(resp.contents); - } - } - - Ok(()) - } - pub async fn make_bucket( &self, args: &MakeBucketArgs<'_>, diff --git a/src/s3/client/list_objects.rs b/src/s3/client/list_objects.rs new file mode 100644 index 0000000..67e6ea6 --- /dev/null +++ b/src/s3/client/list_objects.rs @@ -0,0 +1,561 @@ +// MinIO Rust Library for Amazon S3 Compatible Cloud Storage +// Copyright 2023 MinIO, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! S3 API: ListObjectsV2, ListObjectsV1, ListObjectVersions and streaming helper. + +use std::collections::HashMap; + +use super::Client; +use crate::s3::{ + args::{ListObjectVersionsArgs, ListObjectsArgs, ListObjectsV1Args, ListObjectsV2Args}, + error::Error, + response::{ListObjectVersionsResponse, ListObjectsV1Response, ListObjectsV2Response}, + types::ListEntry, + utils::{ + from_iso8601utc, get_default_text, get_option_text, get_text, merge, urldecode, Multimap, + }, +}; + +use bytes::Buf; +use futures_util::{stream as futures_stream, Stream, StreamExt}; +use http::Method; +use xmltree::Element; + +fn url_decode( + encoding_type: &Option, + prefix: Option, +) -> Result, Error> { + if let Some(v) = encoding_type.as_ref() { + if v == "url" { + if let Some(v) = prefix { + return Ok(Some(urldecode(&v)?.to_string())); + } + } + } + + if let Some(v) = prefix.as_ref() { + return Ok(Some(v.to_string())); + } + + Ok(None) +} + +fn add_common_list_objects_query_params( + query_params: &mut Multimap, + delimiter: Option<&str>, + use_url_encoding_type: bool, + max_keys: Option, + prefix: Option<&str>, +) { + query_params.insert( + String::from("delimiter"), + delimiter.unwrap_or("").to_string(), + ); + query_params.insert( + String::from("max-keys"), + max_keys.unwrap_or(1000).to_string(), + ); + query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string()); + if use_url_encoding_type { + query_params.insert(String::from("encoding-type"), String::from("url")); + } +} + +fn parse_common_list_objects_response( + root: &Element, +) -> Result< + ( + String, + Option, + Option, + Option, + bool, + Option, + ), + Error, +> { + let encoding_type = get_option_text(root, "EncodingType"); + let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?; + Ok(( + get_text(root, "Name")?, + encoding_type, + prefix, + get_option_text(root, "Delimiter"), + match get_option_text(root, "IsTruncated") { + Some(v) => v.to_lowercase() == "true", + None => false, + }, + match get_option_text(root, "MaxKeys") { + Some(v) => Some(v.parse::()?), + None => None, + }, + )) +} + +fn parse_list_objects_contents( + contents: &mut Vec, + root: &mut xmltree::Element, + tag: &str, + encoding_type: &Option, + is_delete_marker: bool, +) -> Result<(), Error> { + while let Some(v) = root.take_child(tag) { + let content = v; + let etype = encoding_type.as_ref().cloned(); + let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap(); + let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?); + let etag = get_option_text(&content, "ETag"); + let v = get_default_text(&content, "Size"); + let size = match v.is_empty() { + true => None, + false => Some(v.parse::()?), + }; + let storage_class = get_option_text(&content, "StorageClass"); + let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true"; + let version_id = get_option_text(&content, "VersionId"); + let (owner_id, owner_name) = match content.get_child("Owner") { + Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")), + None => (None, None), + }; + let user_metadata = match content.get_child("UserMetadata") { + Some(v) => { + let mut map: HashMap = HashMap::new(); + for xml_node in &v.children { + let u = xml_node + .as_element() + .ok_or(Error::XmlError("unable to convert to element".to_string()))?; + map.insert( + u.name.to_string(), + u.get_text().unwrap_or_default().to_string(), + ); + } + Some(map) + } + None => None, + }; + + contents.push(ListEntry { + name: key, + last_modified, + etag, + owner_id, + owner_name, + size, + storage_class, + is_latest, + version_id, + user_metadata, + is_prefix: false, + is_delete_marker, + encoding_type: etype, + }); + } + + Ok(()) +} + +fn parse_list_objects_common_prefixes( + contents: &mut Vec, + root: &mut Element, + encoding_type: &Option, +) -> Result<(), Error> { + while let Some(v) = root.take_child("CommonPrefixes") { + let common_prefix = v; + contents.push(ListEntry { + name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(), + last_modified: None, + etag: None, + owner_id: None, + owner_name: None, + size: None, + storage_class: None, + is_latest: false, + version_id: None, + user_metadata: None, + is_prefix: true, + is_delete_marker: false, + encoding_type: encoding_type.as_ref().cloned(), + }); + } + + Ok(()) +} + +impl Client { + /// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API + pub async fn list_objects_v1( + &self, + args: &ListObjectsV1Args, + ) -> Result { + let region = self + .get_region(&args.bucket, args.region.as_deref()) + .await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + add_common_list_objects_query_params( + &mut query_params, + args.delimiter.as_deref(), + args.use_url_encoding_type, + args.max_keys, + args.prefix.as_deref(), + ); + if let Some(v) = &args.marker { + query_params.insert(String::from("marker"), v.to_string()); + } + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = + parse_common_list_objects_response(&root)?; + let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?; + let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?; + let mut contents: Vec = Vec::new(); + parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; + if is_truncated && next_marker.is_none() { + next_marker = contents.last().map(|v| v.name.clone()) + } + parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; + + Ok(ListObjectsV1Response { + headers: header_map, + name, + encoding_type, + prefix, + delimiter, + is_truncated, + max_keys, + contents, + marker, + next_marker, + }) + } + + /// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API + pub async fn list_objects_v2( + &self, + args: &ListObjectsV2Args, + ) -> Result { + let region = self + .get_region(&args.bucket, args.region.as_deref()) + .await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("list-type"), String::from("2")); + add_common_list_objects_query_params( + &mut query_params, + args.delimiter.as_deref(), + args.use_url_encoding_type, + args.max_keys, + args.prefix.as_deref(), + ); + if let Some(v) = &args.continuation_token { + query_params.insert(String::from("continuation-token"), v.to_string()); + } + if args.fetch_owner { + query_params.insert(String::from("fetch-owner"), String::from("true")); + } + if let Some(v) = &args.start_after { + query_params.insert(String::from("start-after"), v.to_string()); + } + if args.include_user_metadata { + query_params.insert(String::from("metadata"), String::from("true")); + } + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = + parse_common_list_objects_response(&root)?; + let text = get_option_text(&root, "KeyCount"); + let key_count = match text { + Some(v) => match v.is_empty() { + true => None, + false => Some(v.parse::()?), + }, + None => None, + }; + let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?; + let continuation_token = get_option_text(&root, "ContinuationToken"); + let next_continuation_token = get_option_text(&root, "NextContinuationToken"); + let mut contents: Vec = Vec::new(); + parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; + parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; + + Ok(ListObjectsV2Response { + headers: header_map, + name, + encoding_type, + prefix, + delimiter, + is_truncated, + max_keys, + contents, + key_count, + start_after, + continuation_token, + next_continuation_token, + }) + } + + /// Executes [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) S3 API + pub async fn list_object_versions( + &self, + args: &ListObjectVersionsArgs, + ) -> Result { + let region = self + .get_region(&args.bucket, args.region.as_deref()) + .await?; + + let mut headers = Multimap::new(); + if let Some(v) = &args.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &args.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("versions"), String::new()); + add_common_list_objects_query_params( + &mut query_params, + args.delimiter.as_deref(), + args.use_url_encoding_type, + args.max_keys, + args.prefix.as_deref(), + ); + if let Some(v) = &args.key_marker { + query_params.insert(String::from("key-marker"), v.to_string()); + } + if let Some(v) = &args.version_id_marker { + query_params.insert(String::from("version-id-marker"), v.to_string()); + } + + let resp = self + .execute( + Method::GET, + ®ion, + &mut headers, + &query_params, + Some(&args.bucket), + None, + None, + ) + .await?; + let header_map = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = + parse_common_list_objects_response(&root)?; + let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?; + let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?; + let version_id_marker = get_option_text(&root, "VersionIdMarker"); + let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker"); + let mut contents: Vec = Vec::new(); + parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?; + parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; + parse_list_objects_contents( + &mut contents, + &mut root, + "DeleteMarker", + &encoding_type, + true, + )?; + + Ok(ListObjectVersionsResponse { + headers: header_map, + name, + encoding_type, + prefix, + delimiter, + is_truncated, + max_keys, + contents, + key_marker, + next_key_marker, + version_id_marker, + next_version_id_marker, + }) + } + + async fn list_objects_v1_stream( + &self, + args: ListObjectsV1Args, + ) -> impl Stream> + Unpin { + Box::pin(futures_stream::unfold( + (self.clone(), args), + move |(client, mut args)| async move { + let resp = client.list_objects_v1(&args).await; + match resp { + Ok(resp) => { + if !resp.is_truncated { + None + } else { + args.marker = resp.next_marker.clone(); + Some((Ok(resp), (client, args))) + } + } + Err(e) => Some((Err(e), (client, args))), + } + }, + )) + } + + async fn list_objects_v2_stream( + &self, + args: ListObjectsV2Args, + ) -> impl Stream> + Unpin { + Box::pin(futures_stream::unfold( + (self.clone(), args), + move |(client, mut args)| async move { + let resp = client.list_objects_v2(&args).await; + match resp { + Ok(resp) => { + if !resp.is_truncated { + None + } else { + args.continuation_token = resp.next_continuation_token.clone(); + Some((Ok(resp), (client, args))) + } + } + Err(e) => Some((Err(e), (client, args))), + } + }, + )) + } + + async fn list_object_versions_stream( + &self, + args: ListObjectVersionsArgs, + ) -> impl Stream> + Unpin { + Box::pin(futures_stream::unfold( + (self.clone(), args), + move |(client, mut args)| async move { + let resp = client.list_object_versions(&args).await; + match resp { + Ok(resp) => { + if !resp.is_truncated { + None + } else { + args.key_marker = resp.next_key_marker.clone(); + args.version_id_marker = resp.next_version_id_marker.clone(); + Some((Ok(resp), (client, args))) + } + } + Err(e) => Some((Err(e), (client, args))), + } + }, + )) + } + + /// List objects with version information optionally. This function handles + /// pagination and returns a stream of results. Each result corresponds to + /// the response of a single listing API call. + /// + /// # Example + /// + /// ```rust,no_run + /// use minio::s3::client::{Client, ClientBuilder}; + /// use minio::s3::creds::StaticProvider; + /// use minio::s3::http::BaseUrl; + /// use minio::s3::args::ListObjectsArgs; + /// use futures_util::StreamExt; + /// + /// #[tokio::main] + /// async fn main() { + /// let base_url: BaseUrl = "play.min.io".parse().unwrap(); + /// let static_provider = StaticProvider::new( + /// "Q3AM3UQ867SPQQA43P2F", + /// "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG", + /// None, + /// ); + /// + /// let client = ClientBuilder::new(base_url) + /// .provider(Some(Box::new(static_provider))) + /// .build() + /// .unwrap(); + /// + /// // List all objects in a directory. + /// let mut list_objects_arg = ListObjectsArgs::new("my-bucket").unwrap(); + /// list_objects_arg.recursive = true; + /// let mut stream = client.list_objects(list_objects_arg).await; + /// while let Some(result) = stream.next().await { + /// match result { + /// Ok(items) => { + /// for item in items { + /// println!("{:?}", item); + /// } + /// } + /// Err(e) => println!("Error: {:?}", e), + /// } + /// } + /// } + pub async fn list_objects( + &self, + args: ListObjectsArgs, + ) -> Box, Error>> + Unpin> { + if args.include_versions { + let stream = self.list_object_versions_stream(args.into()).await; + Box::new(stream.map(|v| v.map(|v| v.contents))) + } else if args.use_api_v1 { + let stream = self.list_objects_v1_stream(args.into()).await; + Box::new(stream.map(|v| v.map(|v| v.contents))) + } else { + let stream = self.list_objects_v2_stream(args.into()).await; + Box::new(stream.map(|v| v.map(|v| v.contents))) + } + } +} diff --git a/src/s3/response.rs b/src/s3/response.rs index 124ffda..27c8604 100644 --- a/src/s3/response.rs +++ b/src/s3/response.rs @@ -17,7 +17,7 @@ use crate::s3::error::Error; use crate::s3::types::{ - parse_legal_hold, Bucket, Item, LifecycleConfig, NotificationConfig, ObjectLockConfig, + parse_legal_hold, Bucket, LifecycleConfig, ListEntry, NotificationConfig, ObjectLockConfig, ReplicationConfig, RetentionMode, SelectProgress, SseConfig, }; use crate::s3::utils::{ @@ -245,7 +245,7 @@ pub struct ListObjectsV1Response { pub delimiter: Option, pub is_truncated: bool, pub max_keys: Option, - pub contents: Vec, + pub contents: Vec, pub marker: Option, pub next_marker: Option, } @@ -260,7 +260,7 @@ pub struct ListObjectsV2Response { pub delimiter: Option, pub is_truncated: bool, pub max_keys: Option, - pub contents: Vec, + pub contents: Vec, pub key_count: Option, pub start_after: Option, pub continuation_token: Option, @@ -277,7 +277,7 @@ pub struct ListObjectVersionsResponse { pub delimiter: Option, pub is_truncated: bool, pub max_keys: Option, - pub contents: Vec, + pub contents: Vec, pub key_marker: Option, pub next_key_marker: Option, pub version_id_marker: Option, @@ -294,7 +294,7 @@ pub struct ListObjectsResponse { pub delimiter: Option, pub is_truncated: bool, pub max_keys: Option, - pub contents: Vec, + pub contents: Vec, // ListObjectsV1 pub marker: String, diff --git a/src/s3/types.rs b/src/s3/types.rs index 0c142ba..7b5e2f5 100644 --- a/src/s3/types.rs +++ b/src/s3/types.rs @@ -26,7 +26,7 @@ use xmltree::Element; #[derive(Clone, Debug, Default)] /// Contains information of an item of [list_objects()](crate::s3::client::Client::list_objects) API -pub struct Item { +pub struct ListEntry { pub name: String, pub last_modified: Option, pub etag: Option, // except DeleteMarker diff --git a/tests/tests.rs b/tests/tests.rs index f482090..01f1d13 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -15,7 +15,6 @@ use async_std::task; use chrono::Duration; -use futures_util::stream::StreamExt; use hyper::http::Method; use minio::s3::types::NotificationRecords; use rand::distributions::{Alphanumeric, DistString}; @@ -25,6 +24,7 @@ use std::io::BufReader; use std::path::{Path, PathBuf}; use std::{fs, io}; use tokio::sync::mpsc; +use tokio_stream::StreamExt; use minio::s3::args::*; use minio::s3::client::Client; @@ -421,18 +421,17 @@ impl ClientTest { names.push(object_name); } - self.client - .list_objects( - &ListObjectsArgs::new(&self.test_bucket, &|items| { - for item in items.iter() { - assert!(names.contains(&item.name)); - } - true - }) - .unwrap(), - ) - .await - .unwrap(); + let mut stream = self + .client + .list_objects(ListObjectsArgs::new(&self.test_bucket).unwrap()) + .await; + + while let Some(items) = stream.next().await { + let items = items.unwrap(); + for item in items.iter() { + assert!(names.contains(&item.name)); + } + } let mut objects: Vec = Vec::new(); for name in names.iter() {