From 7219bcf9fbdccf7ddab56b4b15dab5d21201a34d Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Tue, 2 Apr 2024 17:52:11 -0700 Subject: [PATCH] Add file read/write ability to `ObjectContent` --- Cargo.toml | 8 +- examples/file-uploader.rs | 4 +- src/s3/builders/object_content.rs | 258 +++++++++++++++++++++++------- src/s3/builders/put_object.rs | 63 ++------ src/s3/client/put_object.rs | 11 -- src/s3/mod.rs | 2 + src/s3/response/get_object.rs | 2 +- tests/tests.rs | 2 +- 8 files changed, 227 insertions(+), 123 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4141872..697fa90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,16 +20,20 @@ chrono = "0.4.27" crc = "3.0.1" dashmap = "5.5.3" derivative = "2.2.0" +env_logger = "0.11.2" futures-util = "0.3.28" hex = "0.4.3" hmac = "0.12.1" +home = "0.5.9" http = "0.2.9" hyper = { version = "0.14.27", features = ["full"] } lazy_static = "1.4.0" +log = "0.4.20" md5 = "0.7.0" multimap = "0.10.0" os_info = "3.7.0" percent-encoding = "2.3.0" +rand = { version = "0.8.5", features = ["small_rng"] } regex = "1.9.4" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" @@ -39,9 +43,6 @@ tokio-stream = "0.1.14" tokio-util = { version = "0.7.8", features = ["io"] } urlencoding = "2.1.3" xmltree = "0.10.3" -log = "0.4.20" -env_logger = "0.11.2" -home = "0.5.9" [dependencies.reqwest] version = "0.11.20" @@ -49,7 +50,6 @@ features = ["native-tls", "blocking", "rustls-tls", "stream"] [dev-dependencies] async-std = { version = "1.12.0", features = ["attributes", "tokio1"] } -rand = { version = "0.8.5", features = ["small_rng"] } [[example]] name = "file-uploader" diff --git a/examples/file-uploader.rs b/examples/file-uploader.rs index 835ee5d..4f97afb 100644 --- a/examples/file-uploader.rs +++ b/examples/file-uploader.rs @@ -1,5 +1,6 @@ use log::info; use minio::s3::args::{BucketExistsArgs, MakeBucketArgs}; +use minio::s3::builders::ObjectContent; use minio::s3::client::ClientBuilder; use minio::s3::creds::StaticProvider; use minio::s3::http::BaseUrl; @@ -47,8 +48,9 @@ async fn main() -> Result<(), Box> { info!("filename {}", &filename.to_str().unwrap()); + let content = ObjectContent::from(filename); client - .put_object_from_file(bucket_name, object_name, filename) + .put_object_content(bucket_name, object_name, content) .send() .await?; diff --git a/src/s3/builders/object_content.rs b/src/s3/builders/object_content.rs index 0328f71..6a566d3 100644 --- a/src/s3/builders/object_content.rs +++ b/src/s3/builders/object_content.rs @@ -13,55 +13,164 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::pin::Pin; +use std::path::PathBuf; +use std::{ffi::OsString, path::Path, pin::Pin}; use bytes::{Bytes, BytesMut}; use futures_util::Stream; -use tokio::io::AsyncRead; +use rand::prelude::random; +use tokio::fs; +use tokio::io::AsyncWriteExt; use tokio_stream::StreamExt; type IoResult = Result; -pub struct ObjectContent { - r: Pin>>>, - extra: Option, - size: Option, +/// Object content that can be uploaded or downloaded. Can be constructed from a stream of `Bytes`, +/// a file path, or a `Bytes` object. +pub struct ObjectContent(ObjectContentInner); + +enum ObjectContentInner { + Stream(Pin>>>, Option), + FilePath(PathBuf), + Bytes(SegmentedBytes), } impl From for ObjectContent { fn from(value: Bytes) -> Self { - let n = value.len(); - ObjectContent { - r: Box::pin(tokio_stream::iter(vec![Ok(value)])), - extra: None, - size: Some(n as u64), - } + ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::from(value))) } } impl From for ObjectContent { fn from(value: String) -> Self { - let n = value.len(); - ObjectContent { - r: Box::pin(tokio_stream::iter(vec![Ok(Bytes::from(value))])), - extra: None, - size: Some(n as u64), - } + ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::from( + Bytes::from(value), + ))) } } impl From> for ObjectContent { fn from(value: Vec) -> Self { - let n = value.len(); - ObjectContent { - r: Box::pin(tokio_stream::iter(vec![Ok(Bytes::from(value))])), - extra: None, - size: Some(n as u64), - } + ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::from( + Bytes::from(value), + ))) + } +} + +impl From<&Path> for ObjectContent { + fn from(value: &Path) -> Self { + ObjectContent(ObjectContentInner::FilePath(value.to_path_buf())) + } +} + +impl Default for ObjectContent { + fn default() -> Self { + ObjectContent(ObjectContentInner::Bytes(SegmentedBytes::new())) } } impl ObjectContent { + /// Create a new `ObjectContent` from a stream of `Bytes`. + pub fn new_from_stream( + r: impl Stream> + 'static, + size: Option, + ) -> Self { + let r = Box::pin(r); + ObjectContent(ObjectContentInner::Stream(r, size)) + } + + pub async fn to_stream( + self, + ) -> IoResult<(Pin>>>, Option)> { + match self.0 { + ObjectContentInner::Stream(r, size) => Ok((r, size)), + ObjectContentInner::FilePath(path) => { + let file = fs::File::open(&path).await?; + let size = file.metadata().await?.len(); + let r = tokio_util::io::ReaderStream::new(file); + Ok((Box::pin(r), Some(size))) + } + ObjectContentInner::Bytes(sb) => { + let k = sb.len(); + let r = Box::pin(tokio_stream::iter(sb.into_iter().map(Ok))); + Ok((r, Some(k as u64))) + } + } + } + + pub(crate) async fn to_content_stream(self) -> IoResult { + let (r, size) = self.to_stream().await?; + Ok(ContentStream::new(r, size)) + } + + /// Load the content into memory and return a `SegmentedBytes` object. + pub async fn to_segmented_bytes(self) -> IoResult { + let mut segmented_bytes = SegmentedBytes::new(); + let (mut r, _) = self.to_stream().await?; + while let Some(bytes) = r.next().await { + let bytes = bytes?; + if bytes.is_empty() { + break; + } + segmented_bytes.append(bytes); + } + Ok(segmented_bytes) + } + + /// Write the content to a file. This function will return the total number + /// of bytes written to the file. It first writes the content to a temporary + /// file and then renames the temporary file to the final file path. The + /// temporary file will be located in the same directory as the final file + /// path. + /// + /// If the file already exists, it will be replaced. If the parent directory + /// does not exist, an attempt to create it will be made. + pub async fn to_file(self, file_path: &Path) -> IoResult { + if file_path.is_dir() { + return Err(std::io::Error::other("path is a directory")); + } + let parent_dir = file_path.parent().ok_or(std::io::Error::other(format!( + "path {:?} does not have a parent directory", + file_path + )))?; + if !parent_dir.is_dir() { + fs::create_dir_all(parent_dir).await?; + } + let file_name = file_path.file_name().ok_or(std::io::Error::other( + "could not get filename component of path", + ))?; + let mut tmp_file_name: OsString = file_name.to_os_string(); + tmp_file_name.push(format!("_{}", random::())); + let tmp_file_path = parent_dir + .to_path_buf() + .join(Path::new(tmp_file_name.as_os_str())); + + let mut total = 0; + { + let mut fp = fs::File::open(&tmp_file_path).await?; + let (mut r, _) = self.to_stream().await?; + while let Some(bytes) = r.next().await { + let bytes = bytes?; + if bytes.is_empty() { + break; + } + total += bytes.len() as u64; + fp.write_all(&bytes).await?; + } + fp.flush().await?; + } + fs::rename(&tmp_file_path, file_path).await?; + Ok(total) + } +} + +pub(crate) struct ContentStream { + r: Pin>>>, + extra: Option, + size: Option, +} + +impl ContentStream { pub fn new(r: impl Stream> + 'static, size: Option) -> Self { let r = Box::pin(r); Self { @@ -79,24 +188,10 @@ impl ObjectContent { } } - pub fn from_reader(r: impl AsyncRead + Send + Sync + 'static, size: Option) -> Self { - let pinned = Box::pin(r); - let r = tokio_util::io::ReaderStream::new(pinned); - Self { - r: Box::pin(r), - extra: None, - size, - } - } - pub fn get_size(&self) -> Option { self.size } - pub fn stream(self) -> impl Stream> { - self.r - } - // Read as many bytes as possible up to `n` and return a `SegmentedBytes` // object. pub async fn read_upto(&mut self, n: usize) -> IoResult { @@ -134,20 +229,9 @@ impl ObjectContent { } Ok(segmented_bytes) } - - pub async fn to_segmented_bytes(mut self) -> IoResult { - let mut segmented_bytes = SegmentedBytes::new(); - while let Some(bytes) = self.r.next().await { - let bytes = bytes?; - if bytes.is_empty() { - break; - } - segmented_bytes.append(bytes); - } - Ok(segmented_bytes) - } } +/// An aggregated collection of `Bytes` objects. #[derive(Debug, Clone)] pub struct SegmentedBytes { segments: Vec>, @@ -200,6 +284,14 @@ impl SegmentedBytes { } } + pub fn into_iter(self) -> SegmentedBytesIntoIterator { + SegmentedBytesIntoIterator { + sb: self, + current_segment: 0, + current_segment_index: 0, + } + } + // Copy all the content into a single `Bytes` object. pub fn to_bytes(&self) -> Bytes { let mut buf = BytesMut::with_capacity(self.total_size); @@ -212,11 +304,42 @@ impl SegmentedBytes { } } -impl From for SegmentedBytes { - fn from(bytes: Bytes) -> Self { - let mut sb = SegmentedBytes::new(); - sb.append(bytes); - sb +pub struct SegmentedBytesIntoIterator { + sb: SegmentedBytes, + current_segment: usize, + current_segment_index: usize, +} + +impl Iterator for SegmentedBytesIntoIterator { + type Item = Bytes; + + fn next(&mut self) -> Option { + if self.current_segment >= self.sb.segments.len() { + return None; + } + let segment = &self.sb.segments[self.current_segment]; + if self.current_segment_index >= segment.len() { + self.current_segment += 1; + self.current_segment_index = 0; + return Iterator::next(self); + } + let bytes = &segment[self.current_segment_index]; + self.current_segment_index += 1; + Some(bytes.clone()) + } +} + +impl IntoIterator for SegmentedBytes { + type Item = Bytes; + + type IntoIter = SegmentedBytesIntoIterator; + + fn into_iter(self) -> Self::IntoIter { + SegmentedBytesIntoIterator { + sb: self, + current_segment: 0, + current_segment_index: 0, + } } } @@ -226,7 +349,7 @@ pub struct SegmentedBytesIterator<'a> { current_segment_index: usize, } -impl Iterator for SegmentedBytesIterator<'_> { +impl<'a> Iterator for SegmentedBytesIterator<'a> { type Item = Bytes; fn next(&mut self) -> Option { @@ -237,10 +360,31 @@ impl Iterator for SegmentedBytesIterator<'_> { if self.current_segment_index >= segment.len() { self.current_segment += 1; self.current_segment_index = 0; - return self.next(); + return Iterator::next(self); } let bytes = &segment[self.current_segment_index]; self.current_segment_index += 1; Some(bytes.clone()) } } + +impl<'a> IntoIterator for &'a SegmentedBytes { + type Item = Bytes; + type IntoIter = SegmentedBytesIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + SegmentedBytesIterator { + sb: self, + current_segment: 0, + current_segment_index: 0, + } + } +} + +impl From for SegmentedBytes { + fn from(bytes: Bytes) -> Self { + let mut sb = SegmentedBytes::new(); + sb.append(bytes); + sb + } +} diff --git a/src/s3/builders/put_object.rs b/src/s3/builders/put_object.rs index 55420b2..1e09a70 100644 --- a/src/s3/builders/put_object.rs +++ b/src/s3/builders/put_object.rs @@ -13,17 +13,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - collections::HashMap, - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use bytes::BytesMut; use http::Method; use crate::s3::{ - builders::ObjectContent, + builders::ContentStream, client::Client, error::Error, response::{ @@ -35,7 +31,7 @@ use crate::s3::{ utils::{check_bucket_name, md5sum_hash, merge, to_iso8601utc, urlencode, Multimap}, }; -use super::SegmentedBytes; +use super::{ObjectContent, SegmentedBytes}; /// Argument for /// [create_multipart_upload()](crate::s3::client::Client::create_multipart_upload) @@ -632,12 +628,11 @@ pub struct PutObjectContent { content_type: String, // source data - input_reader: Option, - file_path: Option, + input_content: ObjectContent, // Computed. // expected_parts: Option, - reader: ObjectContent, + content_stream: ContentStream, part_count: u16, } @@ -646,8 +641,7 @@ impl PutObjectContent { PutObjectContent { bucket: bucket.to_string(), object: object.to_string(), - input_reader: Some(content.into()), - file_path: None, + input_content: content.into(), client: None, extra_headers: None, extra_query_params: None, @@ -659,29 +653,7 @@ impl PutObjectContent { legal_hold: false, part_size: None, content_type: String::from("application/octet-stream"), - reader: ObjectContent::empty(), - part_count: 0, - } - } - - pub fn from_file(bucket: &str, object: &str, file_path: &Path) -> Self { - PutObjectContent { - bucket: bucket.to_string(), - object: object.to_string(), - input_reader: None, - file_path: Some(file_path.to_path_buf()), - client: None, - extra_headers: None, - extra_query_params: None, - region: None, - user_metadata: None, - sse: None, - tags: None, - retention: None, - legal_hold: false, - part_size: None, - content_type: String::from("application/octet-stream"), - reader: ObjectContent::empty(), + content_stream: ContentStream::empty(), part_count: 0, } } @@ -750,18 +722,13 @@ impl PutObjectContent { ))); } - if self.input_reader.is_none() { - // This unwrap is safe as the public API ensures that the file_path - // or the reader is always set. - let file_path = self.file_path.as_ref().unwrap(); - let file = tokio::fs::File::open(file_path).await?; - let size = file.metadata().await?.len(); - self.reader = ObjectContent::from_reader(file, Some(size)); - } else { - self.reader = self.input_reader.take().unwrap(); - } + let input_content = std::mem::replace(&mut self.input_content, ObjectContent::default()); + self.content_stream = input_content + .to_content_stream() + .await + .map_err(|e| Error::IOError(e))?; - let object_size = self.reader.get_size(); + let object_size = self.content_stream.get_size(); let (psize, expected_parts) = calc_part_info(object_size, self.part_size)?; assert_ne!(expected_parts, Some(0)); self.part_size = Some(psize); @@ -775,7 +742,7 @@ impl PutObjectContent { } // Read the first part. - let seg_bytes = self.reader.read_upto(psize as usize).await?; + let seg_bytes = self.content_stream.read_upto(psize as usize).await?; // In the first part read, if: // @@ -839,7 +806,7 @@ impl PutObjectContent { if let Some(v) = first_part.take() { v } else { - self.reader.read_upto(psize as usize).await? + self.content_stream.read_upto(psize as usize).await? } }; part_number += 1; diff --git a/src/s3/client/put_object.rs b/src/s3/client/put_object.rs index b0b2f2c..283e293 100644 --- a/src/s3/client/put_object.rs +++ b/src/s3/client/put_object.rs @@ -15,8 +15,6 @@ //! S3 APIs for uploading objects. -use std::path::Path; - use super::Client; use crate::s3::{ builders::{ @@ -76,13 +74,4 @@ impl Client { ) -> PutObjectContent { PutObjectContent::new(bucket, object, content).client(self) } - - pub fn put_object_from_file( - &self, - bucket: &str, - object: &str, - file_path: &Path, - ) -> PutObjectContent { - PutObjectContent::from_file(bucket, object, file_path).client(self) - } } diff --git a/src/s3/mod.rs b/src/s3/mod.rs index fdd82a8..e899f27 100644 --- a/src/s3/mod.rs +++ b/src/s3/mod.rs @@ -26,3 +26,5 @@ pub mod signer; pub mod sse; pub mod types; pub mod utils; + +pub use client::{Client, ClientBuilder}; diff --git a/src/s3/response/get_object.rs b/src/s3/response/get_object.rs index 826e544..8752df2 100644 --- a/src/s3/response/get_object.rs +++ b/src/s3/response/get_object.rs @@ -50,7 +50,7 @@ impl FromS3Response for GetObjectResponse2 { result.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) }); - let content = ObjectContent::new(body, Some(content_length)); + let content = ObjectContent::new_from_stream(body, Some(content_length)); Ok(GetObjectResponse2 { headers: header_map, diff --git a/tests/tests.rs b/tests/tests.rs index 9e913c2..a470a28 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -322,7 +322,7 @@ impl ClientTest { .put_object_content( &self.test_bucket, &object_name, - ObjectContent::new(data_src, Some(*size)), + ObjectContent::new_from_stream(data_src, Some(*size)), ) .send() .await