From fc20535f1d98d45f3ce465ab77b5ace25aa8fa20 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Tue, 2 Apr 2024 17:39:40 -0700 Subject: [PATCH] Add builder style for list_objects (#74) - Also add `include_user_metadata` option for list object versions --- Cargo.toml | 1 + src/s3/args.rs | 265 ------------ src/s3/builders.rs | 17 + src/s3/builders/list_objects.rs | 746 ++++++++++++++++++++++++++++++++ src/s3/client.rs | 12 +- src/s3/client/list_objects.rs | 518 +--------------------- src/s3/error.rs | 140 ++++-- src/s3/mod.rs | 1 + src/s3/response.rs | 98 +---- src/s3/response/list_objects.rs | 425 ++++++++++++++++++ src/s3/types.rs | 122 +++++- tests/tests.rs | 12 +- 12 files changed, 1450 insertions(+), 907 deletions(-) create mode 100644 src/s3/builders.rs create mode 100644 src/s3/builders/list_objects.rs create mode 100644 src/s3/response/list_objects.rs diff --git a/Cargo.toml b/Cargo.toml index 0930f666..a11279e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ categories = ["api-bindings", "web-programming::http-client"] [dependencies] async-recursion = "1.0.4" +async-trait = "0.1.73" base64 = "0.21.3" byteorder = "1.4.3" bytes = "1.4.0" diff --git a/src/s3/args.rs b/src/s3/args.rs index c786d998..5cfb52f6 100644 --- a/src/s3/args.rs +++ b/src/s3/args.rs @@ -986,271 +986,6 @@ impl<'a> RemoveObjectsArgs<'a> { } } -/// Argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API -#[derive(Clone, Debug)] -pub struct ListObjectsV1Args { - pub extra_headers: Option, - pub extra_query_params: Option, - pub region: Option, - pub bucket: String, - pub delimiter: Option, - pub use_url_encoding_type: bool, - pub max_keys: Option, - pub prefix: Option, - pub marker: Option, -} - -// Helper function delimiter based on recursive flag when delimiter is not -// provided. -fn delim_helper(delim: Option, recursive: bool) -> Option { - if delim.is_some() { - return delim; - } - match recursive { - true => None, - false => Some(String::from("/")), - } -} - -impl From for ListObjectsV1Args { - fn from(value: ListObjectsArgs) -> Self { - ListObjectsV1Args { - extra_headers: value.extra_headers, - extra_query_params: value.extra_query_params, - region: value.region, - bucket: value.bucket, - delimiter: delim_helper(value.delimiter, value.recursive), - use_url_encoding_type: value.use_url_encoding_type, - max_keys: value.max_keys, - prefix: value.prefix, - marker: value.marker, - } - } -} - -impl ListObjectsV1Args { - /// Returns argument for [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API with given bucket name - /// - /// # Examples - /// - /// ``` - /// use minio::s3::args::*; - /// let args = ListObjectsV1Args::new("my-bucket").unwrap(); - /// ``` - pub fn new(bucket_name: &str) -> Result { - check_bucket_name(bucket_name, true)?; - - Ok(ListObjectsV1Args { - extra_headers: None, - extra_query_params: None, - region: None, - bucket: bucket_name.to_owned(), - delimiter: None, - use_url_encoding_type: true, - max_keys: None, - prefix: None, - marker: None, - }) - } -} - -/// Argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API -#[derive(Clone, Debug)] -pub struct ListObjectsV2Args { - pub extra_headers: Option, - pub extra_query_params: Option, - pub region: Option, - pub bucket: String, - pub delimiter: Option, - pub use_url_encoding_type: bool, - pub max_keys: Option, - pub prefix: Option, - pub start_after: Option, - pub continuation_token: Option, - pub fetch_owner: bool, - pub include_user_metadata: bool, -} - -impl From for ListObjectsV2Args { - fn from(value: ListObjectsArgs) -> Self { - ListObjectsV2Args { - extra_headers: value.extra_headers, - extra_query_params: value.extra_query_params, - region: value.region, - bucket: value.bucket, - delimiter: delim_helper(value.delimiter, value.recursive), - use_url_encoding_type: value.use_url_encoding_type, - max_keys: value.max_keys, - prefix: value.prefix, - start_after: value.start_after, - continuation_token: value.continuation_token, - fetch_owner: value.fetch_owner, - include_user_metadata: value.include_user_metadata, - } - } -} - -impl ListObjectsV2Args { - /// Returns argument for [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API with given bucket name - /// - /// # Examples - /// - /// ``` - /// use minio::s3::args::*; - /// let args = ListObjectsV2Args::new("my-bucket").unwrap(); - /// ``` - pub fn new(bucket_name: &str) -> Result { - check_bucket_name(bucket_name, true)?; - Ok(ListObjectsV2Args { - extra_headers: None, - extra_query_params: None, - region: None, - bucket: bucket_name.to_owned(), - delimiter: None, - use_url_encoding_type: true, - max_keys: None, - prefix: None, - start_after: None, - continuation_token: None, - fetch_owner: false, - include_user_metadata: false, - }) - } -} - -/// Argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API -pub struct ListObjectVersionsArgs { - pub extra_headers: Option, - pub extra_query_params: Option, - pub region: Option, - pub bucket: String, - pub delimiter: Option, - pub use_url_encoding_type: bool, - pub max_keys: Option, - pub prefix: Option, - pub key_marker: Option, - pub version_id_marker: Option, -} - -impl From for ListObjectVersionsArgs { - fn from(value: ListObjectsArgs) -> Self { - ListObjectVersionsArgs { - extra_headers: value.extra_headers, - extra_query_params: value.extra_query_params, - region: value.region, - bucket: value.bucket, - delimiter: delim_helper(value.delimiter, value.recursive), - use_url_encoding_type: value.use_url_encoding_type, - max_keys: value.max_keys, - prefix: value.prefix, - key_marker: value.key_marker, - version_id_marker: value.version_id_marker, - } - } -} - -impl ListObjectVersionsArgs { - /// Returns argument for [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API with given bucket name - /// - /// # Examples - /// - /// ``` - /// use minio::s3::args::*; - /// let args = ListObjectVersionsArgs::new("my-bucket").unwrap(); - /// ``` - pub fn new(bucket_name: &str) -> Result { - check_bucket_name(bucket_name, true)?; - - Ok(ListObjectVersionsArgs { - extra_headers: None, - extra_query_params: None, - region: None, - bucket: bucket_name.to_owned(), - delimiter: None, - use_url_encoding_type: true, - max_keys: None, - prefix: None, - key_marker: None, - version_id_marker: None, - }) - } -} - -/// Argument for [list_objects()](crate::s3::client::Client::list_objects) API -#[derive(Clone, Debug)] -pub struct ListObjectsArgs { - pub extra_headers: Option, - pub extra_query_params: Option, - pub region: Option, - /// Specifies the bucket name on which listing is to be performed. - pub bucket: String, - /// Delimiter to roll up common prefixes on. - pub delimiter: Option, - pub use_url_encoding_type: bool, - pub max_keys: Option, - pub prefix: Option, - - /// Used only with ListObjectsV1. - pub marker: Option, - - /// Used only with ListObjectsV2 - pub start_after: Option, - /// Used only with ListObjectsV2. - pub continuation_token: Option, - /// Used only with ListObjectsV2. - pub fetch_owner: bool, - /// MinIO extension for ListObjectsV2. - pub include_user_metadata: bool, - - /// Used only with GetObjectVersions. - pub key_marker: Option, - /// Used only with GetObjectVersions. - pub version_id_marker: Option, - - /// This parameter takes effect only when delimiter is None. Enables - /// recursive traversal for listing of the bucket and prefix. - pub recursive: bool, - /// Set this to use ListObjectsV1. Defaults to false. - pub use_api_v1: bool, - /// Set this to include versions. - pub include_versions: bool, -} - -impl ListObjectsArgs { - /// Returns argument for [list_objects()](crate::s3::client::Client::list_objects) API with given bucket name and callback function for results. - /// - /// # Examples - /// - /// ``` - /// use minio::s3::args::*; - /// let args = ListObjectsArgs::new("my-bucket").unwrap(); - /// ``` - pub fn new(bucket_name: &str) -> Result { - check_bucket_name(bucket_name, true)?; - - Ok(ListObjectsArgs { - extra_headers: None, - extra_query_params: None, - region: None, - bucket: bucket_name.to_owned(), - delimiter: None, - use_url_encoding_type: true, - marker: None, - start_after: None, - key_marker: None, - max_keys: None, - prefix: None, - continuation_token: None, - fetch_owner: false, - version_id_marker: None, - include_user_metadata: false, - recursive: false, - use_api_v1: false, - include_versions: false, - }) - } -} - /// Argument for [select_object_content()](crate::s3::client::Client::select_object_content) API pub struct SelectObjectContentArgs<'a> { pub extra_headers: Option<&'a Multimap>, diff --git a/src/s3/builders.rs b/src/s3/builders.rs new file mode 100644 index 00000000..079e7ae1 --- /dev/null +++ b/src/s3/builders.rs @@ -0,0 +1,17 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Argument builders for [minio::s3::client::Client](crate::s3::client::Client) APIs + +mod list_objects; + +pub use list_objects::*; diff --git a/src/s3/builders/list_objects.rs b/src/s3/builders/list_objects.rs new file mode 100644 index 00000000..535f2e7c --- /dev/null +++ b/src/s3/builders/list_objects.rs @@ -0,0 +1,746 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Argument builders for ListObject APIs. + +use async_trait::async_trait; +use futures_util::{stream as futures_stream, Stream, StreamExt}; +use http::Method; + +use crate::s3::{ + client::Client, + error::Error, + response::{ + ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, + ListObjectsV2Response, + }, + types::{S3Api, S3Request, ToS3Request, ToStream}, + utils::{check_bucket_name, merge, Multimap}, +}; + +fn add_common_list_objects_query_params( + query_params: &mut Multimap, + delimiter: Option<&str>, + disable_url_encoding: bool, + max_keys: Option, + prefix: Option<&str>, +) { + query_params.insert( + String::from("delimiter"), + delimiter.unwrap_or("").to_string(), + ); + query_params.insert( + String::from("max-keys"), + max_keys.unwrap_or(1000).to_string(), + ); + query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string()); + if !disable_url_encoding { + query_params.insert(String::from("encoding-type"), String::from("url")); + } +} + +/// Argument builder for ListObjectsV1 S3 API, created by +/// [list_objects_v1()](crate::s3::client::Client::list_objects_v1). +#[derive(Clone, Debug, Default)] +pub struct ListObjectsV1 { + client: Option, + + extra_headers: Option, + extra_query_params: Option, + region: Option, + bucket: String, + delimiter: Option, + disable_url_encoding: bool, + max_keys: Option, + prefix: Option, + marker: Option, +} + +#[async_trait] +impl ToStream for ListObjectsV1 { + type Item = ListObjectsV1Response; + + async fn to_stream(self) -> Box> + Unpin + Send> { + Box::new(Box::pin(futures_stream::unfold( + (self.clone(), false), + move |(mut args, mut is_done)| async move { + if is_done { + return None; + } + let resp = args.send().await; + match resp { + Ok(resp) => { + args.marker = resp.next_marker.clone(); + is_done = !resp.is_truncated; + Some((Ok(resp), (args, is_done))) + } + Err(e) => Some((Err(e), (args, true))), + } + }, + ))) + } +} + +impl ToS3Request for ListObjectsV1 { + fn to_s3request<'a>(&'a self) -> Result, Error> { + check_bucket_name(&self.bucket, true)?; + + let mut headers = Multimap::new(); + if let Some(v) = &self.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &self.extra_query_params { + merge(&mut query_params, v); + } + + add_common_list_objects_query_params( + &mut query_params, + self.delimiter.as_deref(), + self.disable_url_encoding, + self.max_keys, + self.prefix.as_deref(), + ); + if let Some(v) = &self.marker { + query_params.insert(String::from("marker"), v.to_string()); + } + + let req = S3Request::new( + self.client.as_ref().ok_or(Error::NoClientProvided)?, + Method::GET, + ) + .region(self.region.as_deref()) + .bucket(Some(&self.bucket)) + .query_params(query_params) + .headers(headers); + + Ok(req) + } +} + +impl S3Api for ListObjectsV1 { + type S3Response = ListObjectsV1Response; +} + +// Helper function delimiter based on recursive flag when delimiter is not +// provided. +fn delim_helper(delim: Option, recursive: bool) -> Option { + if delim.is_some() { + return delim; + } + match recursive { + true => None, + false => Some(String::from("/")), + } +} + +impl From for ListObjectsV1 { + fn from(value: ListObjects) -> Self { + ListObjectsV1 { + client: value.client, + extra_headers: value.extra_headers, + extra_query_params: value.extra_query_params, + region: value.region, + bucket: value.bucket, + delimiter: delim_helper(value.delimiter, value.recursive), + disable_url_encoding: value.disable_url_encoding, + max_keys: value.max_keys, + prefix: value.prefix, + marker: value.marker, + } + } +} + +impl ListObjectsV1 { + pub fn new(bucket: &str) -> Self { + Self { + bucket: bucket.to_owned(), + ..Default::default() + } + } + + pub fn client(mut self, client: &Client) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } + + pub fn delimiter(mut self, delimiter: Option) -> Self { + self.delimiter = delimiter; + self + } + + pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self { + self.disable_url_encoding = disable_url_encoding; + self + } + + pub fn max_keys(mut self, max_keys: Option) -> Self { + self.max_keys = max_keys; + self + } + + pub fn prefix(mut self, prefix: Option) -> Self { + self.prefix = prefix; + self + } + + pub fn marker(mut self, marker: Option) -> Self { + self.marker = marker; + self + } +} + +/// Argument builder for ListObjectsV2 S3 API, created by +/// [list_objects_v2()](crate::s3::client::Client::list_objects_v2). +#[derive(Clone, Debug, Default)] +pub struct ListObjectsV2 { + client: Option, + + extra_headers: Option, + extra_query_params: Option, + region: Option, + bucket: String, + delimiter: Option, + disable_url_encoding: bool, + max_keys: Option, + prefix: Option, + start_after: Option, + continuation_token: Option, + fetch_owner: bool, + include_user_metadata: bool, +} + +#[async_trait] +impl ToStream for ListObjectsV2 { + type Item = ListObjectsV2Response; + + async fn to_stream(self) -> Box> + Unpin + Send> { + Box::new(Box::pin(futures_stream::unfold( + (self.clone(), false), + move |(mut args, mut is_done)| async move { + if is_done { + return None; + } + let resp = args.send().await; + match resp { + Ok(resp) => { + args.continuation_token = resp.next_continuation_token.clone(); + is_done = !resp.is_truncated; + Some((Ok(resp), (args, is_done))) + } + Err(e) => Some((Err(e), (args, true))), + } + }, + ))) + } +} + +impl S3Api for ListObjectsV2 { + type S3Response = ListObjectsV2Response; +} + +impl ToS3Request for ListObjectsV2 { + fn to_s3request(&self) -> Result { + check_bucket_name(&self.bucket, true)?; + + let mut headers = Multimap::new(); + if let Some(v) = &self.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &self.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("list-type"), String::from("2")); + add_common_list_objects_query_params( + &mut query_params, + self.delimiter.as_deref(), + self.disable_url_encoding, + self.max_keys, + self.prefix.as_deref(), + ); + if let Some(v) = &self.continuation_token { + query_params.insert(String::from("continuation-token"), v.to_string()); + } + if self.fetch_owner { + query_params.insert(String::from("fetch-owner"), String::from("true")); + } + if let Some(v) = &self.start_after { + query_params.insert(String::from("start-after"), v.to_string()); + } + if self.include_user_metadata { + query_params.insert(String::from("metadata"), String::from("true")); + } + + let req = S3Request::new( + self.client.as_ref().ok_or(Error::NoClientProvided)?, + Method::GET, + ) + .region(self.region.as_deref()) + .bucket(Some(&self.bucket)) + .query_params(query_params) + .headers(headers); + Ok(req) + } +} + +impl From for ListObjectsV2 { + fn from(value: ListObjects) -> Self { + ListObjectsV2 { + client: value.client, + extra_headers: value.extra_headers, + extra_query_params: value.extra_query_params, + region: value.region, + bucket: value.bucket, + delimiter: delim_helper(value.delimiter, value.recursive), + disable_url_encoding: value.disable_url_encoding, + max_keys: value.max_keys, + prefix: value.prefix, + start_after: value.start_after, + continuation_token: value.continuation_token, + fetch_owner: value.fetch_owner, + include_user_metadata: value.include_user_metadata, + } + } +} + +impl ListObjectsV2 { + pub fn new(bucket: &str) -> Self { + Self { + bucket: bucket.to_owned(), + ..Default::default() + } + } + + pub fn client(mut self, client: &Client) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } + + pub fn delimiter(mut self, delimiter: Option) -> Self { + self.delimiter = delimiter; + self + } + + pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self { + self.disable_url_encoding = disable_url_encoding; + self + } + + pub fn max_keys(mut self, max_keys: Option) -> Self { + self.max_keys = max_keys; + self + } + + pub fn prefix(mut self, prefix: Option) -> Self { + self.prefix = prefix; + self + } + + pub fn start_after(mut self, start_after: Option) -> Self { + self.start_after = start_after; + self + } + + pub fn continuation_token(mut self, continuation_token: Option) -> Self { + self.continuation_token = continuation_token; + self + } + + pub fn fetch_owner(mut self, fetch_owner: bool) -> Self { + self.fetch_owner = fetch_owner; + self + } + + pub fn include_user_metadata(mut self, include_user_metadata: bool) -> Self { + self.include_user_metadata = include_user_metadata; + self + } +} + +/// Argument builder for ListObjectVerions S3 API created by +/// [list_object_versions()](crate::s3::client::Client::list_object_versions). +#[derive(Clone, Debug, Default)] +pub struct ListObjectVersions { + client: Option, + + extra_headers: Option, + extra_query_params: Option, + region: Option, + bucket: String, + delimiter: Option, + disable_url_encoding: bool, + max_keys: Option, + prefix: Option, + key_marker: Option, + version_id_marker: Option, + include_user_metadata: bool, +} + +#[async_trait] +impl ToStream for ListObjectVersions { + type Item = ListObjectVersionsResponse; + + async fn to_stream(self) -> Box> + Unpin + Send> { + Box::new(Box::pin(futures_stream::unfold( + (self.clone(), false), + move |(mut args, mut is_done)| async move { + if is_done { + return None; + } + let resp = args.send().await; + match resp { + Ok(resp) => { + args.key_marker = resp.next_key_marker.clone(); + args.version_id_marker = resp.next_version_id_marker.clone(); + + is_done = !resp.is_truncated; + Some((Ok(resp), (args, is_done))) + } + Err(e) => Some((Err(e), (args, true))), + } + }, + ))) + } +} + +impl S3Api for ListObjectVersions { + type S3Response = ListObjectVersionsResponse; +} + +impl ToS3Request for ListObjectVersions { + fn to_s3request(&self) -> Result { + check_bucket_name(&self.bucket, true)?; + + let mut headers = Multimap::new(); + if let Some(v) = &self.extra_headers { + merge(&mut headers, v); + } + + let mut query_params = Multimap::new(); + if let Some(v) = &self.extra_query_params { + merge(&mut query_params, v); + } + query_params.insert(String::from("versions"), String::new()); + add_common_list_objects_query_params( + &mut query_params, + self.delimiter.as_deref(), + self.disable_url_encoding, + self.max_keys, + self.prefix.as_deref(), + ); + if let Some(v) = &self.key_marker { + query_params.insert(String::from("key-marker"), v.to_string()); + } + if let Some(v) = &self.version_id_marker { + query_params.insert(String::from("version-id-marker"), v.to_string()); + } + if self.include_user_metadata { + query_params.insert(String::from("metadata"), String::from("true")); + } + + let req = S3Request::new( + self.client.as_ref().ok_or(Error::NoClientProvided)?, + Method::GET, + ) + .region(self.region.as_deref()) + .bucket(Some(&self.bucket)) + .query_params(query_params) + .headers(headers); + Ok(req) + } +} + +impl From for ListObjectVersions { + fn from(value: ListObjects) -> Self { + ListObjectVersions { + client: value.client, + extra_headers: value.extra_headers, + extra_query_params: value.extra_query_params, + region: value.region, + bucket: value.bucket, + delimiter: delim_helper(value.delimiter, value.recursive), + disable_url_encoding: value.disable_url_encoding, + max_keys: value.max_keys, + prefix: value.prefix, + key_marker: value.key_marker, + version_id_marker: value.version_id_marker, + include_user_metadata: value.include_user_metadata, + } + } +} + +impl ListObjectVersions { + pub fn new(bucket: &str) -> Self { + Self { + bucket: bucket.to_owned(), + ..Default::default() + } + } + + pub fn client(mut self, client: &Client) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } + + pub fn delimiter(mut self, delimiter: Option) -> Self { + self.delimiter = delimiter; + self + } + + pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self { + self.disable_url_encoding = disable_url_encoding; + self + } + + pub fn max_keys(mut self, max_keys: Option) -> Self { + self.max_keys = max_keys; + self + } + + pub fn prefix(mut self, prefix: Option) -> Self { + self.prefix = prefix; + self + } + + pub fn key_marker(mut self, key_marker: Option) -> Self { + self.key_marker = key_marker; + self + } + + pub fn version_id_marker(mut self, version_id_marker: Option) -> Self { + self.version_id_marker = version_id_marker; + self + } + + pub fn include_user_metadata(mut self, include_user_metadata: bool) -> Self { + self.include_user_metadata = include_user_metadata; + self + } +} + +/// Argument builder for +/// [list_objects()](crate::s3::client::Client::list_objects) API. +/// +/// Use the various builder methods to set parameters on the request. Finally to +/// send the request and consume the results use the `ToStream` instance to get +/// a stream of results. Pagination is automatically performed. +#[derive(Clone, Debug, Default)] +pub struct ListObjects { + client: Option, + + // Parameters common to all ListObjects APIs. + extra_headers: Option, + extra_query_params: Option, + region: Option, + bucket: String, + delimiter: Option, + disable_url_encoding: bool, + max_keys: Option, + prefix: Option, + + // Options specific to ListObjectsV1. + marker: Option, + + // Options specific to ListObjectsV2. + start_after: Option, + continuation_token: Option, + fetch_owner: bool, + include_user_metadata: bool, + + // Options specific to ListObjectVersions. + key_marker: Option, + version_id_marker: Option, + + // Higher level options. + recursive: bool, + use_api_v1: bool, + include_versions: bool, +} + +#[async_trait] +impl ToStream for ListObjects { + type Item = ListObjectsResponse; + + async fn to_stream(self) -> Box> + Unpin + Send> { + if self.use_api_v1 { + let stream = ListObjectsV1::from(self).to_stream().await; + Box::new(stream.map(|v| v.map(|v| v.into()))) + } else if self.include_versions { + let stream = ListObjectVersions::from(self).to_stream().await; + Box::new(stream.map(|v| v.map(|v| v.into()))) + } else { + let stream = ListObjectsV2::from(self).to_stream().await; + Box::new(stream.map(|v| v.map(|v| v.into()))) + } + } +} + +impl ListObjects { + pub fn new(bucket: &str) -> Self { + Self { + bucket: bucket.to_owned(), + ..Default::default() + } + } + + pub fn client(mut self, client: &Client) -> Self { + self.client = Some(client.clone()); + self + } + + pub fn extra_headers(mut self, extra_headers: Option) -> Self { + self.extra_headers = extra_headers; + self + } + + pub fn extra_query_params(mut self, extra_query_params: Option) -> Self { + self.extra_query_params = extra_query_params; + self + } + + pub fn region(mut self, region: Option) -> Self { + self.region = region; + self + } + + /// Delimiter to roll up common prefixes on. + pub fn delimiter(mut self, delimiter: Option) -> Self { + self.delimiter = delimiter; + self + } + + /// Disable setting the `EncodingType` parameter in the ListObjects request. + /// By default it is set to `url`. + pub fn disable_url_encoding(mut self, disable_url_encoding: bool) -> Self { + self.disable_url_encoding = disable_url_encoding; + self + } + + pub fn max_keys(mut self, max_keys: Option) -> Self { + self.max_keys = max_keys; + self + } + + pub fn prefix(mut self, prefix: Option) -> Self { + self.prefix = prefix; + self + } + + /// Used only with ListObjectsV1. + pub fn marker(mut self, marker: Option) -> Self { + self.marker = marker; + self + } + + /// Used only with ListObjectsV2 + pub fn start_after(mut self, start_after: Option) -> Self { + self.start_after = start_after; + self + } + + /// Used only with ListObjectsV2 + pub fn continuation_token(mut self, continuation_token: Option) -> Self { + self.continuation_token = continuation_token; + self + } + + /// Used only with ListObjectsV2 + pub fn fetch_owner(mut self, fetch_owner: bool) -> Self { + self.fetch_owner = fetch_owner; + self + } + + /// Used only with ListObjectsV2. MinIO extension. + pub fn include_user_metadata(mut self, include_user_metadata: bool) -> Self { + self.include_user_metadata = include_user_metadata; + self + } + + /// Used only with GetObjectVersions. + pub fn key_marker(mut self, key_marker: Option) -> Self { + self.key_marker = key_marker; + self + } + + /// Used only with GetObjectVersions. + pub fn version_id_marker(mut self, version_id_marker: Option) -> Self { + self.version_id_marker = version_id_marker; + self + } + + /// This parameter takes effect only when delimiter is None. Enables + /// recursive traversal for listing of the bucket and prefix. + pub fn recursive(mut self, recursive: bool) -> Self { + self.recursive = recursive; + self + } + + /// Set this to use ListObjectsV1. Defaults to false. + pub fn use_api_v1(mut self, use_api_v1: bool) -> Self { + self.use_api_v1 = use_api_v1; + self + } + + /// Set this to include versions. + pub fn include_versions(mut self, include_versions: bool) -> Self { + self.include_versions = include_versions; + self + } +} diff --git a/src/s3/client.rs b/src/s3/client.rs index 8b87ac3b..85bc162b 100644 --- a/src/s3/client.rs +++ b/src/s3/client.rs @@ -386,7 +386,7 @@ impl Client { pub async fn do_execute( &self, - method: Method, + method: &Method, region: &String, headers: &mut Multimap, query_params: &Multimap, @@ -398,7 +398,7 @@ impl Client { let body = data.unwrap_or_default(); let url = self.base_url - .build_url(&method, region, query_params, bucket_name, object_name)?; + .build_url(method, region, query_params, bucket_name, object_name)?; self.build_headers(headers, query_params, region, &url, &method, body); let mut req = self.client.request(method.clone(), url.to_string()); @@ -409,7 +409,7 @@ impl Client { } } - if method == Method::PUT || method == Method::POST { + if *method == Method::PUT || *method == Method::POST { req = req.body(body.to_vec()); } @@ -425,7 +425,7 @@ impl Client { &mut body, status_code, &header_map, - &method, + method, &url.path, bucket_name, object_name, @@ -458,7 +458,7 @@ impl Client { ) -> Result { let res = self .do_execute( - method.clone(), + &method, region, headers, query_params, @@ -482,7 +482,7 @@ impl Client { // Retry only once on RetryHead error. self.do_execute( - method.clone(), + &method, region, headers, query_params, diff --git a/src/s3/client/list_objects.rs b/src/s3/client/list_objects.rs index 67e6ea66..cf6f498f 100644 --- a/src/s3/client/list_objects.rs +++ b/src/s3/client/list_objects.rs @@ -13,492 +13,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! S3 API: ListObjectsV2, ListObjectsV1, ListObjectVersions and streaming helper. - -use std::collections::HashMap; +//! S3 APIs for listing objects. use super::Client; -use crate::s3::{ - args::{ListObjectVersionsArgs, ListObjectsArgs, ListObjectsV1Args, ListObjectsV2Args}, - error::Error, - response::{ListObjectVersionsResponse, ListObjectsV1Response, ListObjectsV2Response}, - types::ListEntry, - utils::{ - from_iso8601utc, get_default_text, get_option_text, get_text, merge, urldecode, Multimap, - }, -}; - -use bytes::Buf; -use futures_util::{stream as futures_stream, Stream, StreamExt}; -use http::Method; -use xmltree::Element; - -fn url_decode( - encoding_type: &Option, - prefix: Option, -) -> Result, Error> { - if let Some(v) = encoding_type.as_ref() { - if v == "url" { - if let Some(v) = prefix { - return Ok(Some(urldecode(&v)?.to_string())); - } - } - } - - if let Some(v) = prefix.as_ref() { - return Ok(Some(v.to_string())); - } - - Ok(None) -} - -fn add_common_list_objects_query_params( - query_params: &mut Multimap, - delimiter: Option<&str>, - use_url_encoding_type: bool, - max_keys: Option, - prefix: Option<&str>, -) { - query_params.insert( - String::from("delimiter"), - delimiter.unwrap_or("").to_string(), - ); - query_params.insert( - String::from("max-keys"), - max_keys.unwrap_or(1000).to_string(), - ); - query_params.insert(String::from("prefix"), prefix.unwrap_or("").to_string()); - if use_url_encoding_type { - query_params.insert(String::from("encoding-type"), String::from("url")); - } -} - -fn parse_common_list_objects_response( - root: &Element, -) -> Result< - ( - String, - Option, - Option, - Option, - bool, - Option, - ), - Error, -> { - let encoding_type = get_option_text(root, "EncodingType"); - let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?; - Ok(( - get_text(root, "Name")?, - encoding_type, - prefix, - get_option_text(root, "Delimiter"), - match get_option_text(root, "IsTruncated") { - Some(v) => v.to_lowercase() == "true", - None => false, - }, - match get_option_text(root, "MaxKeys") { - Some(v) => Some(v.parse::()?), - None => None, - }, - )) -} - -fn parse_list_objects_contents( - contents: &mut Vec, - root: &mut xmltree::Element, - tag: &str, - encoding_type: &Option, - is_delete_marker: bool, -) -> Result<(), Error> { - while let Some(v) = root.take_child(tag) { - let content = v; - let etype = encoding_type.as_ref().cloned(); - let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap(); - let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?); - let etag = get_option_text(&content, "ETag"); - let v = get_default_text(&content, "Size"); - let size = match v.is_empty() { - true => None, - false => Some(v.parse::()?), - }; - let storage_class = get_option_text(&content, "StorageClass"); - let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true"; - let version_id = get_option_text(&content, "VersionId"); - let (owner_id, owner_name) = match content.get_child("Owner") { - Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")), - None => (None, None), - }; - let user_metadata = match content.get_child("UserMetadata") { - Some(v) => { - let mut map: HashMap = HashMap::new(); - for xml_node in &v.children { - let u = xml_node - .as_element() - .ok_or(Error::XmlError("unable to convert to element".to_string()))?; - map.insert( - u.name.to_string(), - u.get_text().unwrap_or_default().to_string(), - ); - } - Some(map) - } - None => None, - }; - - contents.push(ListEntry { - name: key, - last_modified, - etag, - owner_id, - owner_name, - size, - storage_class, - is_latest, - version_id, - user_metadata, - is_prefix: false, - is_delete_marker, - encoding_type: etype, - }); - } - - Ok(()) -} - -fn parse_list_objects_common_prefixes( - contents: &mut Vec, - root: &mut Element, - encoding_type: &Option, -) -> Result<(), Error> { - while let Some(v) = root.take_child("CommonPrefixes") { - let common_prefix = v; - contents.push(ListEntry { - name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(), - last_modified: None, - etag: None, - owner_id: None, - owner_name: None, - size: None, - storage_class: None, - is_latest: false, - version_id: None, - user_metadata: None, - is_prefix: true, - is_delete_marker: false, - encoding_type: encoding_type.as_ref().cloned(), - }); - } - - Ok(()) -} +use crate::s3::builders::{ListObjectVersions, ListObjects, ListObjectsV1, ListObjectsV2}; impl Client { - /// Executes [ListObjects](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html) S3 API - pub async fn list_objects_v1( - &self, - args: &ListObjectsV1Args, - ) -> Result { - let region = self - .get_region(&args.bucket, args.region.as_deref()) - .await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - add_common_list_objects_query_params( - &mut query_params, - args.delimiter.as_deref(), - args.use_url_encoding_type, - args.max_keys, - args.prefix.as_deref(), - ); - if let Some(v) = &args.marker { - query_params.insert(String::from("marker"), v.to_string()); - } - - let resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(&args.bucket), - None, - None, - ) - .await?; - let header_map = resp.headers().clone(); - let body = resp.bytes().await?; - let mut root = Element::parse(body.reader())?; - - let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = - parse_common_list_objects_response(&root)?; - let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?; - let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?; - let mut contents: Vec = Vec::new(); - parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; - if is_truncated && next_marker.is_none() { - next_marker = contents.last().map(|v| v.name.clone()) - } - parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; - - Ok(ListObjectsV1Response { - headers: header_map, - name, - encoding_type, - prefix, - delimiter, - is_truncated, - max_keys, - contents, - marker, - next_marker, - }) - } - - /// Executes [ListObjectsV2](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) S3 API - pub async fn list_objects_v2( - &self, - args: &ListObjectsV2Args, - ) -> Result { - let region = self - .get_region(&args.bucket, args.region.as_deref()) - .await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - query_params.insert(String::from("list-type"), String::from("2")); - add_common_list_objects_query_params( - &mut query_params, - args.delimiter.as_deref(), - args.use_url_encoding_type, - args.max_keys, - args.prefix.as_deref(), - ); - if let Some(v) = &args.continuation_token { - query_params.insert(String::from("continuation-token"), v.to_string()); - } - if args.fetch_owner { - query_params.insert(String::from("fetch-owner"), String::from("true")); - } - if let Some(v) = &args.start_after { - query_params.insert(String::from("start-after"), v.to_string()); - } - if args.include_user_metadata { - query_params.insert(String::from("metadata"), String::from("true")); - } - - let resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(&args.bucket), - None, - None, - ) - .await?; - let header_map = resp.headers().clone(); - let body = resp.bytes().await?; - let mut root = Element::parse(body.reader())?; - - let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = - parse_common_list_objects_response(&root)?; - let text = get_option_text(&root, "KeyCount"); - let key_count = match text { - Some(v) => match v.is_empty() { - true => None, - false => Some(v.parse::()?), - }, - None => None, - }; - let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?; - let continuation_token = get_option_text(&root, "ContinuationToken"); - let next_continuation_token = get_option_text(&root, "NextContinuationToken"); - let mut contents: Vec = Vec::new(); - parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; - parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; - - Ok(ListObjectsV2Response { - headers: header_map, - name, - encoding_type, - prefix, - delimiter, - is_truncated, - max_keys, - contents, - key_count, - start_after, - continuation_token, - next_continuation_token, - }) - } - - /// Executes [ListObjectVersions](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html) S3 API - pub async fn list_object_versions( - &self, - args: &ListObjectVersionsArgs, - ) -> Result { - let region = self - .get_region(&args.bucket, args.region.as_deref()) - .await?; - - let mut headers = Multimap::new(); - if let Some(v) = &args.extra_headers { - merge(&mut headers, v); - } - - let mut query_params = Multimap::new(); - if let Some(v) = &args.extra_query_params { - merge(&mut query_params, v); - } - query_params.insert(String::from("versions"), String::new()); - add_common_list_objects_query_params( - &mut query_params, - args.delimiter.as_deref(), - args.use_url_encoding_type, - args.max_keys, - args.prefix.as_deref(), - ); - if let Some(v) = &args.key_marker { - query_params.insert(String::from("key-marker"), v.to_string()); - } - if let Some(v) = &args.version_id_marker { - query_params.insert(String::from("version-id-marker"), v.to_string()); - } - - let resp = self - .execute( - Method::GET, - ®ion, - &mut headers, - &query_params, - Some(&args.bucket), - None, - None, - ) - .await?; - let header_map = resp.headers().clone(); - let body = resp.bytes().await?; - let mut root = Element::parse(body.reader())?; - - let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = - parse_common_list_objects_response(&root)?; - let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?; - let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?; - let version_id_marker = get_option_text(&root, "VersionIdMarker"); - let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker"); - let mut contents: Vec = Vec::new(); - parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?; - parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; - parse_list_objects_contents( - &mut contents, - &mut root, - "DeleteMarker", - &encoding_type, - true, - )?; - - Ok(ListObjectVersionsResponse { - headers: header_map, - name, - encoding_type, - prefix, - delimiter, - is_truncated, - max_keys, - contents, - key_marker, - next_key_marker, - version_id_marker, - next_version_id_marker, - }) - } - - async fn list_objects_v1_stream( - &self, - args: ListObjectsV1Args, - ) -> impl Stream> + Unpin { - Box::pin(futures_stream::unfold( - (self.clone(), args), - move |(client, mut args)| async move { - let resp = client.list_objects_v1(&args).await; - match resp { - Ok(resp) => { - if !resp.is_truncated { - None - } else { - args.marker = resp.next_marker.clone(); - Some((Ok(resp), (client, args))) - } - } - Err(e) => Some((Err(e), (client, args))), - } - }, - )) + pub fn list_objects_v1(&self, bucket: &str) -> ListObjectsV1 { + ListObjectsV1::new(bucket).client(self) } - async fn list_objects_v2_stream( - &self, - args: ListObjectsV2Args, - ) -> impl Stream> + Unpin { - Box::pin(futures_stream::unfold( - (self.clone(), args), - move |(client, mut args)| async move { - let resp = client.list_objects_v2(&args).await; - match resp { - Ok(resp) => { - if !resp.is_truncated { - None - } else { - args.continuation_token = resp.next_continuation_token.clone(); - Some((Ok(resp), (client, args))) - } - } - Err(e) => Some((Err(e), (client, args))), - } - }, - )) + pub fn list_objects_v2(&self, bucket: &str) -> ListObjectsV2 { + ListObjectsV2::new(bucket).client(self) } - async fn list_object_versions_stream( - &self, - args: ListObjectVersionsArgs, - ) -> impl Stream> + Unpin { - Box::pin(futures_stream::unfold( - (self.clone(), args), - move |(client, mut args)| async move { - let resp = client.list_object_versions(&args).await; - match resp { - Ok(resp) => { - if !resp.is_truncated { - None - } else { - args.key_marker = resp.next_key_marker.clone(); - args.version_id_marker = resp.next_version_id_marker.clone(); - Some((Ok(resp), (client, args))) - } - } - Err(e) => Some((Err(e), (client, args))), - } - }, - )) + pub fn list_object_versions(&self, bucket: &str) -> ListObjectVersions { + ListObjectVersions::new(bucket).client(self) } /// List objects with version information optionally. This function handles @@ -511,7 +41,7 @@ impl Client { /// use minio::s3::client::{Client, ClientBuilder}; /// use minio::s3::creds::StaticProvider; /// use minio::s3::http::BaseUrl; - /// use minio::s3::args::ListObjectsArgs; + /// use minio::s3::types::ToStream; /// use futures_util::StreamExt; /// /// #[tokio::main] @@ -529,13 +59,15 @@ impl Client { /// .unwrap(); /// /// // List all objects in a directory. - /// let mut list_objects_arg = ListObjectsArgs::new("my-bucket").unwrap(); - /// list_objects_arg.recursive = true; - /// let mut stream = client.list_objects(list_objects_arg).await; - /// while let Some(result) = stream.next().await { + /// let mut list_objects = client + /// .list_objects("my-bucket") + /// .recursive(true) + /// .to_stream() + /// .await; + /// while let Some(result) = list_objects.next().await { /// match result { - /// Ok(items) => { - /// for item in items { + /// Ok(resp) => { + /// for item in resp.contents { /// println!("{:?}", item); /// } /// } @@ -543,19 +75,7 @@ impl Client { /// } /// } /// } - pub async fn list_objects( - &self, - args: ListObjectsArgs, - ) -> Box, Error>> + Unpin> { - if args.include_versions { - let stream = self.list_object_versions_stream(args.into()).await; - Box::new(stream.map(|v| v.map(|v| v.contents))) - } else if args.use_api_v1 { - let stream = self.list_objects_v1_stream(args.into()).await; - Box::new(stream.map(|v| v.map(|v| v.contents))) - } else { - let stream = self.list_objects_v2_stream(args.into()).await; - Box::new(stream.map(|v| v.map(|v| v.contents))) - } + pub fn list_objects(&self, bucket: &str) -> ListObjects { + ListObjects::new(bucket).client(self) } } diff --git a/src/s3/error.rs b/src/s3/error.rs index b2f3ce21..bebbba09 100644 --- a/src/s3/error.rs +++ b/src/s3/error.rs @@ -109,6 +109,7 @@ pub enum Error { InvalidFilter, PostPolicyError(String), InvalidObjectLockConfig(String), + NoClientProvided, } impl std::error::Error for Error {} @@ -116,7 +117,7 @@ impl std::error::Error for Error {} impl fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Error::TimeParseError(e) => write!(f, "{}", e), + Error::TimeParseError(e) => write!(f, "{}", e), Error::InvalidUrl(e) => write!(f, "{}", e), Error::IOError(e) => write!(f, "{}", e), Error::XmlParseError(e) => write!(f, "{}", e), @@ -124,53 +125,96 @@ impl fmt::Display for Error { Error::StrError(e) => write!(f, "{}", e), Error::IntError(e) => write!(f, "{}", e), Error::BoolError(e) => write!(f, "{}", e), - Error::Utf8Error(e) => write!(f, "{}", e), - Error::JsonError(e) => write!(f, "{}", e), - Error::XmlError(m) => write!(f, "{}", m), - Error::InvalidBucketName(m) => write!(f, "{}", m), - Error::InvalidObjectName(m) => write!(f, "{}", m), - Error::InvalidUploadId(m) => write!(f, "{}", m), - Error::InvalidPartNumber(m) => write!(f, "{}", m), - Error::EmptyParts(m) => write!(f, "{}", m), - Error::InvalidRetentionMode(m) => write!(f, "invalid retention mode {}", m), - Error::InvalidRetentionConfig(m) => write!(f, "invalid retention configuration; {}", m), - Error::InvalidMinPartSize(s) => write!(f, "part size {} is not supported; minimum allowed 5MiB", s), - Error::InvalidMaxPartSize(s) => write!(f, "part size {} is not supported; maximum allowed 5GiB", s), - Error::InvalidObjectSize(s) => write!(f, "object size {} is not supported; maximum allowed 5TiB", s), - Error::MissingPartSize => write!(f, "valid part size must be provided when object size is unknown"), - Error::InvalidPartCount(os, ps, pc) => write!(f, "object size {} and part size {} make more than {} parts for upload", os, ps, pc), - Error::SseTlsRequired(m) => write!(f, "{}SSE operation must be performed over a secure connection", m.as_ref().map_or(String::new(), |v| v.clone())), - Error::InsufficientData(ps, br) => write!(f, "not enough data in the stream; expected: {}, got: {} bytes", ps, br), - Error::InvalidBaseUrl(m) => write!(f, "{}", m), - Error::UrlBuildError(m) => write!(f, "{}", m), - Error::InvalidLegalHold(s) => write!(f, "invalid legal hold {}", s), - Error::RegionMismatch(br, r) => write!(f, "region must be {}, but passed {}", br, r), - Error::S3Error(er) => write!(f, "s3 operation failed; code: {}, message: {}, resource: {}, request_id: {}, host_id: {}, bucket_name: {}, object_name: {}", er.code, er.message, er.resource, er.request_id, er.host_id, er.bucket_name, er.object_name), - Error::InvalidResponse(sc, ct) => write!(f, "invalid response received; status code: {}; content-type: {}", sc, ct), - Error::ServerError(sc) => write!(f, "server failed with HTTP status code {}", sc), - Error::InvalidSelectExpression(m) => write!(f, "{}", m), - Error::InvalidHeaderValueType(v) => write!(f, "invalid header value type {}", v), - Error::CrcMismatch(t, e, g) => write!(f, "{} CRC mismatch; expected: {}, got: {}", t, e, g), - Error::UnknownEventType(et) => write!(f, "unknown event type {}", et), - Error::SelectError(ec, em) => write!(f, "error code: {}, error message: {}", ec, em), - Error::UnsupportedApi(a) => write!(f, "{} API is not supported in Amazon AWS S3", a), - Error::InvalidComposeSource(m) => write!(f, "{}", m), - Error::InvalidComposeSourceOffset(b, o, v, of, os) => write!(f, "source {}/{}{}: offset {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), of, os), - Error::InvalidComposeSourceLength(b, o, v, l, os) => write!(f, "source {}/{}{}: length {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), l, os), - Error::InvalidComposeSourceSize(b, o, v, cs, os) => write!(f, "source {}/{}{}: compose size {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), cs, os), - Error::InvalidDirective(m) => write!(f, "{}", m), - Error::InvalidCopyDirective(m) => write!(f, "{}", m), - Error::InvalidComposeSourcePartSize(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} must be greater than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, es), - Error::InvalidComposeSourceMultipart(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} for multipart split upload of {}, last part size is less than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, s, es), - Error::InvalidMultipartCount(c) => write!(f, "Compose sources create more than allowed multipart count {}", c), - Error::MissingLifecycleAction => write!(f, "at least one of action (AbortIncompleteMultipartUpload, Expiration, NoncurrentVersionExpiration, NoncurrentVersionTransition or Transition) must be specified in a rule"), - Error::InvalidExpiredObjectDeleteMarker => write!(f, "ExpiredObjectDeleteMarker must not be provided along with Date and Days"), - Error::InvalidDateAndDays(m) => write!(f, "Only one of date or days of {} must be set", m), - Error::InvalidLifecycleRuleId => write!(f, "id must be exceed 255 characters"), - Error::InvalidFilter => write!(f, "only one of And, Prefix or Tag must be provided"), - Error::PostPolicyError(m) => write!(f, "{}", m), - Error::InvalidObjectLockConfig(m) => write!(f, "{}", m), - } + Error::Utf8Error(e) => write!(f, "{}", e), + Error::JsonError(e) => write!(f, "{}", e), + Error::XmlError(m) => write!(f, "{}", m), + Error::InvalidBucketName(m) => write!(f, "{}", m), + Error::InvalidObjectName(m) => write!(f, "{}", m), + Error::InvalidUploadId(m) => write!(f, "{}", m), + Error::InvalidPartNumber(m) => write!(f, "{}", m), + Error::EmptyParts(m) => write!(f, "{}", m), + Error::InvalidRetentionMode(m) => write!(f, "invalid retention mode {}", m), + Error::InvalidRetentionConfig(m) => write!(f, "invalid retention configuration; {}", m), + Error::InvalidMinPartSize(s) => { + write!(f, "part size {} is not supported; minimum allowed 5MiB", s) + } + Error::InvalidMaxPartSize(s) => { + write!(f, "part size {} is not supported; maximum allowed 5GiB", s) + } + Error::InvalidObjectSize(s) => write!( + f, + "object size {} is not supported; maximum allowed 5TiB", + s + ), + Error::MissingPartSize => write!( + f, + "valid part size must be provided when object size is unknown" + ), + Error::InvalidPartCount(os, ps, pc) => write!( + f, + "object size {} and part size {} make more than {} parts for upload", + os, ps, pc + ), + Error::SseTlsRequired(m) => write!( + f, + "{}SSE operation must be performed over a secure connection", + m.as_ref().map_or(String::new(), |v| v.clone()) + ), + Error::InsufficientData(ps, br) => write!( + f, + "not enough data in the stream; expected: {}, got: {} bytes", + ps, br + ), + Error::InvalidBaseUrl(m) => write!(f, "{}", m), + Error::UrlBuildError(m) => write!(f, "{}", m), + Error::InvalidLegalHold(s) => write!(f, "invalid legal hold {}", s), + Error::RegionMismatch(br, r) => write!(f, "region must be {}, but passed {}", br, r), + Error::S3Error(er) => write!( + f, + "s3 operation failed; code: {}, message: {}, resource: {}, request_id: {}, host_id: {}, bucket_name: {}, object_name: {}", + er.code, er.message, er.resource, er.request_id, er.host_id, er.bucket_name, er.object_name, + ), + Error::InvalidResponse(sc, ct) => write!( + f, + "invalid response received; status code: {}; content-type: {}", + sc, ct + ), + Error::ServerError(sc) => write!(f, "server failed with HTTP status code {}", sc), + Error::InvalidSelectExpression(m) => write!(f, "{}", m), + Error::InvalidHeaderValueType(v) => write!(f, "invalid header value type {}", v), + Error::CrcMismatch(t, e, g) => { + write!(f, "{} CRC mismatch; expected: {}, got: {}", t, e, g) + } + Error::UnknownEventType(et) => write!(f, "unknown event type {}", et), + Error::SelectError(ec, em) => write!(f, "error code: {}, error message: {}", ec, em), + Error::UnsupportedApi(a) => write!(f, "{} API is not supported in Amazon AWS S3", a), + Error::InvalidComposeSource(m) => write!(f, "{}", m), + Error::InvalidComposeSourceOffset(b, o, v, of, os) => write!(f, "source {}/{}{}: offset {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), of, os), + Error::InvalidComposeSourceLength(b, o, v, l, os) => write!(f, "source {}/{}{}: length {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), l, os), + Error::InvalidComposeSourceSize(b, o, v, cs, os) => write!(f, "source {}/{}{}: compose size {} is beyond object size {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), cs, os), + Error::InvalidDirective(m) => write!(f, "{}", m), + Error::InvalidCopyDirective(m) => write!(f, "{}", m), + Error::InvalidComposeSourcePartSize(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} must be greater than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, es), + Error::InvalidComposeSourceMultipart(b, o, v, s, es) => write!(f, "source {}/{}{}: size {} for multipart split upload of {}, last part size is less than {}", b, o, v.as_ref().map_or(String::new(), |v| String::from("?versionId=") + v), s, s, es), + Error::InvalidMultipartCount(c) => write!( + f, + "Compose sources create more than allowed multipart count {}", + c + ), + Error::MissingLifecycleAction => write!(f, "at least one of action (AbortIncompleteMultipartUpload, Expiration, NoncurrentVersionExpiration, NoncurrentVersionTransition or Transition) must be specified in a rule"), + Error::InvalidExpiredObjectDeleteMarker => write!( + f, + "ExpiredObjectDeleteMarker must not be provided along with Date and Days" + ), + Error::InvalidDateAndDays(m) => { + write!(f, "Only one of date or days of {} must be set", m) + } + Error::InvalidLifecycleRuleId => write!(f, "id must be exceed 255 characters"), + Error::InvalidFilter => write!(f, "only one of And, Prefix or Tag must be provided"), + Error::PostPolicyError(m) => write!(f, "{}", m), + Error::InvalidObjectLockConfig(m) => write!(f, "{}", m), + Error::NoClientProvided => write!(f, "no client provided"), + } } } diff --git a/src/s3/mod.rs b/src/s3/mod.rs index 289187e9..fdd82a8e 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -16,6 +16,7 @@ //! Implementation of Simple Storage Service (aka S3) client pub mod args; +pub mod builders; pub mod client; pub mod creds; pub mod error; diff --git a/src/s3/response.rs b/src/s3/response.rs index 27c86046..3e401237 100644 --- a/src/s3/response.rs +++ b/src/s3/response.rs @@ -15,19 +15,27 @@ //! Responses for [minio::s3::client::Client](crate::s3::client::Client) APIs +use std::collections::HashMap; +use std::collections::VecDeque; + +use reqwest::header::HeaderMap; +use std::io::BufReader; +use xmltree::Element; + use crate::s3::error::Error; use crate::s3::types::{ - parse_legal_hold, Bucket, LifecycleConfig, ListEntry, NotificationConfig, ObjectLockConfig, + parse_legal_hold, Bucket, LifecycleConfig, NotificationConfig, ObjectLockConfig, ReplicationConfig, RetentionMode, SelectProgress, SseConfig, }; use crate::s3::utils::{ copy_slice, crc32, from_http_header_value, from_iso8601utc, get_text, uint32, UtcTime, }; -use reqwest::header::HeaderMap; -use std::collections::HashMap; -use std::collections::VecDeque; -use std::io::BufReader; -use xmltree::Element; + +mod list_objects; + +pub use list_objects::{ + ListObjectVersionsResponse, ListObjectsResponse, ListObjectsV1Response, ListObjectsV2Response, +}; #[derive(Debug)] /// Response of [list_buckets()](crate::s3::client::Client::list_buckets) API @@ -235,84 +243,6 @@ pub struct RemoveObjectsApiResponse { /// Response of [remove_objects()](crate::s3::client::Client::remove_objects) API pub type RemoveObjectsResponse = RemoveObjectsApiResponse; -#[derive(Clone, Debug)] -/// Response of [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API -pub struct ListObjectsV1Response { - pub headers: HeaderMap, - pub name: String, - pub encoding_type: Option, - pub prefix: Option, - pub delimiter: Option, - pub is_truncated: bool, - pub max_keys: Option, - pub contents: Vec, - pub marker: Option, - pub next_marker: Option, -} - -#[derive(Clone, Debug)] -/// Response of [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API -pub struct ListObjectsV2Response { - pub headers: HeaderMap, - pub name: String, - pub encoding_type: Option, - pub prefix: Option, - pub delimiter: Option, - pub is_truncated: bool, - pub max_keys: Option, - pub contents: Vec, - pub key_count: Option, - pub start_after: Option, - pub continuation_token: Option, - pub next_continuation_token: Option, -} - -#[derive(Clone, Debug)] -/// Response of [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API -pub struct ListObjectVersionsResponse { - pub headers: HeaderMap, - pub name: String, - pub encoding_type: Option, - pub prefix: Option, - pub delimiter: Option, - pub is_truncated: bool, - pub max_keys: Option, - pub contents: Vec, - pub key_marker: Option, - pub next_key_marker: Option, - pub version_id_marker: Option, - pub next_version_id_marker: Option, -} - -#[derive(Clone, Debug)] -/// Response of [list_objects()](crate::s3::client::Client::list_objects) API -pub struct ListObjectsResponse { - pub headers: HeaderMap, - pub name: String, - pub encoding_type: Option, - pub prefix: Option, - pub delimiter: Option, - pub is_truncated: bool, - pub max_keys: Option, - pub contents: Vec, - - // ListObjectsV1 - pub marker: String, - pub next_marker: String, - - // ListObjectsV2 - pub key_count: u16, - pub start_after: String, - pub continuation_token: String, - pub next_continuation_token: String, - - // ListObjectVersions - pub key_marker: String, - pub next_key_marker: String, - pub version_id_marker: String, - pub next_version_id_marker: String, -} - /// Response of [select_object_content()](crate::s3::client::Client::select_object_content) API pub struct SelectObjectContentResponse { pub headers: HeaderMap, diff --git a/src/s3/response/list_objects.rs b/src/s3/response/list_objects.rs new file mode 100644 index 00000000..24b22811 --- /dev/null +++ b/src/s3/response/list_objects.rs @@ -0,0 +1,425 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Response types for ListObjects APIs + +use std::collections::HashMap; + +use async_trait::async_trait; +use bytes::Buf; +use reqwest::header::HeaderMap; +use xmltree::Element; + +use crate::s3::{ + error::Error, + types::{FromS3Response, ListEntry, S3Request}, + utils::{from_iso8601utc, get_default_text, get_option_text, get_text, urldecode}, +}; + +fn url_decode( + encoding_type: &Option, + prefix: Option, +) -> Result, Error> { + if let Some(v) = encoding_type.as_ref() { + if v == "url" { + if let Some(v) = prefix { + return Ok(Some(urldecode(&v)?.to_string())); + } + } + } + + if let Some(v) = prefix.as_ref() { + return Ok(Some(v.to_string())); + } + + Ok(None) +} + +fn parse_common_list_objects_response( + root: &Element, +) -> Result< + ( + String, + Option, + Option, + Option, + bool, + Option, + ), + Error, +> { + let encoding_type = get_option_text(root, "EncodingType"); + let prefix = url_decode(&encoding_type, Some(get_default_text(root, "Prefix")))?; + Ok(( + get_text(root, "Name")?, + encoding_type, + prefix, + get_option_text(root, "Delimiter"), + match get_option_text(root, "IsTruncated") { + Some(v) => v.to_lowercase() == "true", + None => false, + }, + match get_option_text(root, "MaxKeys") { + Some(v) => Some(v.parse::()?), + None => None, + }, + )) +} + +fn parse_list_objects_contents( + contents: &mut Vec, + root: &mut xmltree::Element, + tag: &str, + encoding_type: &Option, + is_delete_marker: bool, +) -> Result<(), Error> { + while let Some(v) = root.take_child(tag) { + let content = v; + let etype = encoding_type.as_ref().cloned(); + let key = url_decode(&etype, Some(get_text(&content, "Key")?))?.unwrap(); + let last_modified = Some(from_iso8601utc(&get_text(&content, "LastModified")?)?); + let etag = get_option_text(&content, "ETag"); + let v = get_default_text(&content, "Size"); + let size = match v.is_empty() { + true => None, + false => Some(v.parse::()?), + }; + let storage_class = get_option_text(&content, "StorageClass"); + let is_latest = get_default_text(&content, "IsLatest").to_lowercase() == "true"; + let version_id = get_option_text(&content, "VersionId"); + let (owner_id, owner_name) = match content.get_child("Owner") { + Some(v) => (get_option_text(v, "ID"), get_option_text(v, "DisplayName")), + None => (None, None), + }; + let user_metadata = match content.get_child("UserMetadata") { + Some(v) => { + let mut map: HashMap = HashMap::new(); + for xml_node in &v.children { + let u = xml_node + .as_element() + .ok_or(Error::XmlError("unable to convert to element".to_string()))?; + map.insert( + u.name.to_string(), + u.get_text().unwrap_or_default().to_string(), + ); + } + Some(map) + } + None => None, + }; + + contents.push(ListEntry { + name: key, + last_modified, + etag, + owner_id, + owner_name, + size, + storage_class, + is_latest, + version_id, + user_metadata, + is_prefix: false, + is_delete_marker, + encoding_type: etype, + }); + } + + Ok(()) +} + +fn parse_list_objects_common_prefixes( + contents: &mut Vec, + root: &mut Element, + encoding_type: &Option, +) -> Result<(), Error> { + while let Some(v) = root.take_child("CommonPrefixes") { + let common_prefix = v; + contents.push(ListEntry { + name: url_decode(encoding_type, Some(get_text(&common_prefix, "Prefix")?))?.unwrap(), + last_modified: None, + etag: None, + owner_id: None, + owner_name: None, + size: None, + storage_class: None, + is_latest: false, + version_id: None, + user_metadata: None, + is_prefix: true, + is_delete_marker: false, + encoding_type: encoding_type.as_ref().cloned(), + }); + } + + Ok(()) +} + +/// Response of [list_objects_v1()](crate::s3::client::Client::list_objects_v1) S3 API +#[derive(Clone, Debug)] +pub struct ListObjectsV1Response { + pub headers: HeaderMap, + pub name: String, + pub encoding_type: Option, + pub prefix: Option, + pub delimiter: Option, + pub is_truncated: bool, + pub max_keys: Option, + pub contents: Vec, + pub marker: Option, + pub next_marker: Option, +} + +#[async_trait] +impl FromS3Response for ListObjectsV1Response { + async fn from_s3response<'a>( + _req: S3Request<'a>, + resp: reqwest::Response, + ) -> Result { + let headers = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = + parse_common_list_objects_response(&root)?; + let marker = url_decode(&encoding_type, get_option_text(&root, "Marker"))?; + let mut next_marker = url_decode(&encoding_type, get_option_text(&root, "NextMarker"))?; + let mut contents: Vec = Vec::new(); + parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; + if is_truncated && next_marker.is_none() { + next_marker = contents.last().map(|v| v.name.clone()) + } + parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; + + Ok(ListObjectsV1Response { + headers, + name, + encoding_type, + prefix, + delimiter, + is_truncated, + max_keys, + contents, + marker, + next_marker, + }) + } +} + +/// Response of [list_objects_v2()](crate::s3::client::Client::list_objects_v2) S3 API +#[derive(Clone, Debug)] +pub struct ListObjectsV2Response { + pub headers: HeaderMap, + pub name: String, + pub encoding_type: Option, + pub prefix: Option, + pub delimiter: Option, + pub is_truncated: bool, + pub max_keys: Option, + pub contents: Vec, + pub key_count: Option, + pub start_after: Option, + pub continuation_token: Option, + pub next_continuation_token: Option, +} + +#[async_trait] +impl FromS3Response for ListObjectsV2Response { + async fn from_s3response<'a>( + _req: S3Request<'a>, + resp: reqwest::Response, + ) -> Result { + let headers = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = + parse_common_list_objects_response(&root)?; + let text = get_option_text(&root, "KeyCount"); + let key_count = match text { + Some(v) => match v.is_empty() { + true => None, + false => Some(v.parse::()?), + }, + None => None, + }; + let start_after = url_decode(&encoding_type, get_option_text(&root, "StartAfter"))?; + let continuation_token = get_option_text(&root, "ContinuationToken"); + let next_continuation_token = get_option_text(&root, "NextContinuationToken"); + let mut contents: Vec = Vec::new(); + parse_list_objects_contents(&mut contents, &mut root, "Contents", &encoding_type, false)?; + parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; + + Ok(ListObjectsV2Response { + headers, + name, + encoding_type, + prefix, + delimiter, + is_truncated, + max_keys, + contents, + key_count, + start_after, + continuation_token, + next_continuation_token, + }) + } +} + +/// Response of [list_object_versions()](crate::s3::client::Client::list_object_versions) S3 API +#[derive(Clone, Debug)] +pub struct ListObjectVersionsResponse { + pub headers: HeaderMap, + pub name: String, + pub encoding_type: Option, + pub prefix: Option, + pub delimiter: Option, + pub is_truncated: bool, + pub max_keys: Option, + pub contents: Vec, + pub key_marker: Option, + pub next_key_marker: Option, + pub version_id_marker: Option, + pub next_version_id_marker: Option, +} + +#[async_trait] +impl FromS3Response for ListObjectVersionsResponse { + async fn from_s3response<'a>( + _req: S3Request<'a>, + resp: reqwest::Response, + ) -> Result { + let headers = resp.headers().clone(); + let body = resp.bytes().await?; + let mut root = Element::parse(body.reader())?; + + let (name, encoding_type, prefix, delimiter, is_truncated, max_keys) = + parse_common_list_objects_response(&root)?; + let key_marker = url_decode(&encoding_type, get_option_text(&root, "KeyMarker"))?; + let next_key_marker = url_decode(&encoding_type, get_option_text(&root, "NextKeyMarker"))?; + let version_id_marker = get_option_text(&root, "VersionIdMarker"); + let next_version_id_marker = get_option_text(&root, "NextVersionIdMarker"); + let mut contents: Vec = Vec::new(); + parse_list_objects_contents(&mut contents, &mut root, "Version", &encoding_type, false)?; + parse_list_objects_common_prefixes(&mut contents, &mut root, &encoding_type)?; + parse_list_objects_contents( + &mut contents, + &mut root, + "DeleteMarker", + &encoding_type, + true, + )?; + + Ok(ListObjectVersionsResponse { + headers, + name, + encoding_type, + prefix, + delimiter, + is_truncated, + max_keys, + contents, + key_marker, + next_key_marker, + version_id_marker, + next_version_id_marker, + }) + } +} + +/// Response of [list_objects()](crate::s3::client::Client::list_objects) API +#[derive(Clone, Debug, Default)] +pub struct ListObjectsResponse { + pub headers: HeaderMap, + pub name: String, + pub encoding_type: Option, + pub prefix: Option, + pub delimiter: Option, + pub is_truncated: bool, + pub max_keys: Option, + pub contents: Vec, + + // ListObjectsV1 + pub marker: Option, + pub next_marker: Option, + + // ListObjectsV2 + pub key_count: Option, + pub start_after: Option, + pub continuation_token: Option, + pub next_continuation_token: Option, + + // ListObjectVersions + pub key_marker: Option, + pub next_key_marker: Option, + pub version_id_marker: Option, + pub next_version_id_marker: Option, +} + +impl From for ListObjectsResponse { + fn from(value: ListObjectVersionsResponse) -> Self { + ListObjectsResponse { + headers: value.headers, + name: value.name, + encoding_type: value.encoding_type, + prefix: value.prefix, + delimiter: value.delimiter, + is_truncated: value.is_truncated, + max_keys: value.max_keys, + contents: value.contents, + key_marker: value.key_marker, + next_key_marker: value.next_key_marker, + version_id_marker: value.version_id_marker, + next_version_id_marker: value.next_version_id_marker, + ..Default::default() + } + } +} + +impl From for ListObjectsResponse { + fn from(value: ListObjectsV2Response) -> Self { + ListObjectsResponse { + headers: value.headers, + name: value.name, + encoding_type: value.encoding_type, + prefix: value.prefix, + delimiter: value.delimiter, + is_truncated: value.is_truncated, + max_keys: value.max_keys, + contents: value.contents, + key_count: value.key_count, + start_after: value.start_after, + continuation_token: value.continuation_token, + next_continuation_token: value.next_continuation_token, + ..Default::default() + } + } +} + +impl From for ListObjectsResponse { + fn from(value: ListObjectsV1Response) -> Self { + ListObjectsResponse { + headers: value.headers, + name: value.name, + encoding_type: value.encoding_type, + prefix: value.prefix, + delimiter: value.delimiter, + is_truncated: value.is_truncated, + max_keys: value.max_keys, + contents: value.contents, + marker: value.marker, + next_marker: value.next_marker, + ..Default::default() + } + } +} diff --git a/src/s3/types.rs b/src/s3/types.rs index ada3371c..34eaa338 100644 --- a/src/s3/types.rs +++ b/src/s3/types.rs @@ -15,14 +15,132 @@ //! Various types for S3 API requests and responses +use super::client::Client; use crate::s3::error::Error; use crate::s3::utils::{ - from_iso8601utc, get_default_text, get_option_text, get_text, to_iso8601utc, UtcTime, + from_iso8601utc, get_default_text, get_option_text, get_text, to_iso8601utc, Multimap, UtcTime, }; + +use async_trait::async_trait; +use futures_util::Stream; +use http::Method; use serde::{Deserialize, Serialize}; +use xmltree::Element; + use std::collections::HashMap; use std::fmt; -use xmltree::Element; + +pub struct S3Request<'a> { + client: &'a Client, + + method: Method, + region: Option<&'a str>, + bucket: Option<&'a str>, + object: Option<&'a str>, + query_params: Multimap, + headers: Multimap, + body: Option>, + + // Computed region + inner_region: String, +} + +impl<'a> S3Request<'a> { + pub fn new(client: &'a Client, method: Method) -> S3Request<'a> { + S3Request { + client, + method, + region: None, + bucket: None, + object: None, + query_params: Multimap::new(), + headers: Multimap::new(), + body: None, + inner_region: String::new(), + } + } + + pub fn region(mut self, region: Option<&'a str>) -> Self { + self.region = region; + self + } + + pub fn bucket(mut self, bucket: Option<&'a str>) -> Self { + self.bucket = bucket; + self + } + + pub fn object(mut self, object: Option<&'a str>) -> Self { + self.object = object; + self + } + + pub fn query_params(mut self, query_params: Multimap) -> Self { + self.query_params = query_params; + self + } + + pub fn headers(mut self, headers: Multimap) -> Self { + self.headers = headers; + self + } + + pub fn body(mut self, body: Option>) -> Self { + self.body = body; + self + } + + pub async fn execute(&mut self) -> Result { + // Lookup the region of the bucket if provided. + self.inner_region = if let Some(bucket) = self.bucket { + self.client.get_region(bucket, self.region).await? + } else { + "us-east-1".to_string() + }; + + // Execute the API request. + self.client + .execute( + self.method.clone(), + &self.inner_region, + &mut self.headers, + &self.query_params, + self.bucket, + self.object, + self.body.as_ref().map(|x| x.as_slice()), + ) + .await + } +} + +pub trait ToS3Request { + fn to_s3request(&self) -> Result; +} + +#[async_trait] +pub trait FromS3Response: Sized { + async fn from_s3response<'a>( + s3req: S3Request<'a>, + resp: reqwest::Response, + ) -> Result; +} + +#[async_trait] +pub trait S3Api: ToS3Request { + type S3Response: FromS3Response; + + async fn send(&self) -> Result { + let mut req = self.to_s3request()?; + let resp = req.execute().await?; + Self::S3Response::from_s3response(req, resp).await + } +} + +#[async_trait] +pub trait ToStream: Sized { + type Item; + async fn to_stream(self) -> Box> + Unpin + Send>; +} #[derive(Clone, Debug, Default)] /// Contains information of an item of [list_objects()](crate::s3::client::Client::list_objects) API diff --git a/tests/tests.rs b/tests/tests.rs index 7c63ea4a..18ff2af2 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -16,7 +16,7 @@ use async_std::task; use chrono::Duration; use hyper::http::Method; -use minio::s3::types::NotificationRecords; + use rand::distributions::{Alphanumeric, DistString}; use sha2::{Digest, Sha256}; use std::collections::HashMap; @@ -30,6 +30,8 @@ use minio::s3::args::*; use minio::s3::client::Client; use minio::s3::creds::StaticProvider; use minio::s3::http::BaseUrl; +use minio::s3::types::NotificationRecords; +use minio::s3::types::ToStream; use minio::s3::types::{ CsvInputSerialization, CsvOutputSerialization, DeleteObject, FileHeaderInfo, NotificationConfig, ObjectLockConfig, PrefixFilterRule, QueueConfig, QuoteFields, @@ -423,15 +425,19 @@ impl ClientTest { let mut stream = self .client - .list_objects(ListObjectsArgs::new(&self.test_bucket).unwrap()) + .list_objects(&self.test_bucket) + .to_stream() .await; + let mut count = 0; while let Some(items) = stream.next().await { - let items = items.unwrap(); + let items = items.unwrap().contents; for item in items.iter() { assert!(names.contains(&item.name)); + count += 1; } } + assert!(count == 3); let mut objects: Vec = Vec::new(); for name in names.iter() {