diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 00d6ee446f2f..6e46ddf75c9b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -27,7 +27,9 @@ use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; use crate::multipart::PartId; use crate::path::DELIMITER; -use crate::{ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig}; +use crate::{ + ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig, +}; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; @@ -506,12 +508,32 @@ impl S3Client { Ok(response.upload_id) } + pub async fn put_part( + &self, + path: &Path, + upload_id: &MultipartId, + part_idx: usize, + data: Bytes, + ) -> Result { + let part = (part_idx + 1).to_string(); + + let content_id = self + .put_request( + path, + data, + &[("partNumber", &part), ("uploadId", upload_id)], + ) + .await?; + + Ok(PartId { content_id }) + } + pub async fn complete_multipart( &self, location: &Path, upload_id: &str, parts: Vec, - ) -> Result<()> { + ) -> Result { let parts = parts .into_iter() .enumerate() @@ -527,7 +549,8 @@ impl S3Client { let credential = self.get_credential().await?; let url = self.config.path_url(location); - self.client + let response = self + .client .request(Method::POST, url) .query(&[("uploadId", upload_id)]) .body(body) @@ -542,7 +565,9 @@ impl S3Client { .await .context(CompleteMultipartRequestSnafu)?; - Ok(()) + let etag = get_etag(response.headers()).context(MetadataSnafu)?; + + Ok(PutResult { e_tag: Some(etag) }) } } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 25894a1c3445..371d14fce345 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -44,7 +44,7 @@ use crate::aws::client::S3Client; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; use crate::client::CredentialProvider; -use crate::multipart::{PartId, PutPart, WriteMultiPart}; +use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart}; use crate::signer::Signer; use crate::{ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutResult, @@ -246,18 +246,9 @@ struct S3MultiPartUpload { #[async_trait] impl PutPart for S3MultiPartUpload { async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - let part = (part_idx + 1).to_string(); - - let content_id = self - .client - .put_request( - &self.location, - buf.into(), - &[("partNumber", &part), ("uploadId", &self.upload_id)], - ) - .await?; - - Ok(PartId { content_id }) + self.client + .put_part(&self.location, &self.upload_id, part_idx, buf.into()) + .await } async fn complete(&self, completed_parts: Vec) -> Result<()> { @@ -268,6 +259,36 @@ impl PutPart for S3MultiPartUpload { } } +#[async_trait] +impl MultiPartStore for AmazonS3 { + async fn create_multipart(&self, path: &Path) -> Result { + self.client.create_multipart(path).await + } + + async fn put_part( + &self, + path: &Path, + id: &MultipartId, + part_idx: usize, + data: Bytes, + ) -> Result { + self.client.put_part(path, id, part_idx, data).await + } + + async fn complete_multipart( + &self, + path: &Path, + id: &MultipartId, + parts: Vec, + ) -> Result { + self.client.complete_multipart(path, id, parts).await + } + + async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { + self.client.delete_request(path, &[("uploadId", id)]).await + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs index cd3df8c7b857..9f47b9a8152b 100644 --- a/object_store/src/azure/client.rs +++ b/object_store/src/azure/client.rs @@ -19,13 +19,16 @@ use super::credential::AzureCredential; use crate::azure::credential::*; use crate::azure::{AzureCredentialProvider, STORE}; use crate::client::get::GetClient; -use crate::client::header::HeaderConfig; +use crate::client::header::{get_etag, HeaderConfig}; use crate::client::list::ListClient; use crate::client::retry::RetryExt; use crate::client::GetOptionsExt; +use crate::multipart::PartId; use crate::path::DELIMITER; use crate::util::deserialize_rfc1123; -use crate::{ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result, RetryConfig}; +use crate::{ + ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, Result, RetryConfig, +}; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; @@ -84,6 +87,11 @@ pub(crate) enum Error { Authorization { source: crate::azure::credential::Error, }, + + #[snafu(display("Unable to extract metadata from headers: {}", source))] + Metadata { + source: crate::client::header::Error, + }, } impl From for crate::Error { @@ -190,6 +198,43 @@ impl AzureClient { Ok(response) } + /// PUT a block + pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) -> Result { + let content_id = format!("{part_idx:20}"); + let block_id: BlockId = content_id.clone().into(); + + self.put_request( + path, + Some(data), + true, + &[ + ("comp", "block"), + ("blockid", &BASE64_STANDARD.encode(block_id)), + ], + ) + .await?; + + Ok(PartId { content_id }) + } + + /// PUT a block list + pub async fn put_block_list(&self, path: &Path, parts: Vec) -> Result { + let blocks = parts + .into_iter() + .map(|part| BlockId::from(part.content_id)) + .collect(); + + let block_list = BlockList { blocks }; + let block_xml = block_list.to_xml(); + + let response = self + .put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")]) + .await?; + + let e_tag = get_etag(response.headers()).context(MetadataSnafu)?; + Ok(PutResult { e_tag: Some(e_tag) }) + } + /// Make an Azure Delete request pub async fn delete_request( &self, diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs index 5f768756a629..67e797961441 100644 --- a/object_store/src/azure/mod.rs +++ b/object_store/src/azure/mod.rs @@ -26,15 +26,12 @@ //! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide //! a way to drop old blocks. Instead unused blocks are automatically cleaned up //! after 7 days. -use self::client::{BlockId, BlockList}; use crate::{ multipart::{PartId, PutPart, WriteMultiPart}, path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result, }; use async_trait::async_trait; -use base64::prelude::BASE64_STANDARD; -use base64::Engine; use bytes::Bytes; use futures::stream::BoxStream; use std::fmt::Debug; @@ -53,6 +50,7 @@ mod credential; /// [`CredentialProvider`] for [`MicrosoftAzure`] pub type AzureCredentialProvider = Arc>; use crate::client::header::get_etag; +use crate::multipart::MultiPartStore; pub use builder::{AzureConfigKey, MicrosoftAzureBuilder}; pub use credential::AzureCredential; @@ -151,43 +149,44 @@ struct AzureMultiPartUpload { #[async_trait] impl PutPart for AzureMultiPartUpload { - async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - let content_id = format!("{part_idx:20}"); - let block_id: BlockId = content_id.clone().into(); - - self.client - .put_request( - &self.location, - Some(buf.into()), - true, - &[ - ("comp", "block"), - ("blockid", &BASE64_STANDARD.encode(block_id)), - ], - ) - .await?; + async fn put_part(&self, buf: Vec, idx: usize) -> Result { + self.client.put_block(&self.location, idx, buf.into()).await + } - Ok(PartId { content_id }) + async fn complete(&self, parts: Vec) -> Result<()> { + self.client.put_block_list(&self.location, parts).await?; + Ok(()) } +} - async fn complete(&self, completed_parts: Vec) -> Result<()> { - let blocks = completed_parts - .into_iter() - .map(|part| BlockId::from(part.content_id)) - .collect(); +#[async_trait] +impl MultiPartStore for MicrosoftAzure { + async fn create_multipart(&self, _: &Path) -> Result { + Ok(String::new()) + } - let block_list = BlockList { blocks }; - let block_xml = block_list.to_xml(); + async fn put_part( + &self, + path: &Path, + _: &MultipartId, + part_idx: usize, + data: Bytes, + ) -> Result { + self.client.put_block(path, part_idx, data).await + } - self.client - .put_request( - &self.location, - Some(block_xml.into()), - true, - &[("comp", "blocklist")], - ) - .await?; + async fn complete_multipart( + &self, + path: &Path, + _: &MultipartId, + parts: Vec, + ) -> Result { + self.client.put_block_list(path, parts).await + } + async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> { + // There is no way to drop blocks that have been uploaded. Instead, they simply + // expire in 7 days. Ok(()) } } diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs index 558a6f8d2a84..8c44f9016480 100644 --- a/object_store/src/gcp/client.rs +++ b/object_store/src/gcp/client.rs @@ -24,7 +24,7 @@ use crate::client::GetOptionsExt; use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE}; use crate::multipart::PartId; use crate::path::{Path, DELIMITER}; -use crate::{ClientOptions, GetOptions, ListResult, MultipartId, Result, RetryConfig}; +use crate::{ClientOptions, GetOptions, ListResult, MultipartId, PutResult, Result, RetryConfig}; use async_trait::async_trait; use bytes::{Buf, Bytes}; use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC}; @@ -184,6 +184,30 @@ impl GoogleCloudStorageClient { Ok(get_etag(response.headers()).context(MetadataSnafu)?) } + /// Perform a put part request + /// + /// Returns the new [`PartId`] + pub async fn put_part( + &self, + path: &Path, + upload_id: &MultipartId, + part_idx: usize, + data: Bytes, + ) -> Result { + let content_id = self + .put_request( + path, + data, + &[ + ("partNumber", &format!("{}", part_idx + 1)), + ("uploadId", upload_id), + ], + ) + .await?; + + Ok(PartId { content_id }) + } + /// Initiate a multi-part upload pub async fn multipart_initiate(&self, path: &Path) -> Result { let credential = self.get_credential().await?; @@ -240,7 +264,7 @@ impl GoogleCloudStorageClient { path: &Path, multipart_id: &MultipartId, completed_parts: Vec, - ) -> Result<()> { + ) -> Result { let upload_id = multipart_id.clone(); let url = self.object_url(path); @@ -263,7 +287,8 @@ impl GoogleCloudStorageClient { // https://github.com/tafia/quick-xml/issues/350 .replace(""", "\""); - self.client + let result = self + .client .request(Method::POST, &url) .bearer_auth(&credential.bearer) .query(&[("uploadId", upload_id)]) @@ -274,7 +299,8 @@ impl GoogleCloudStorageClient { path: path.as_ref(), })?; - Ok(()) + let etag = get_etag(result.headers()).context(MetadataSnafu)?; + Ok(PutResult { e_tag: Some(etag) }) } /// Perform a delete request diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs index 6512a8b036c5..9b5c4ce38fb6 100644 --- a/object_store/src/gcp/mod.rs +++ b/object_store/src/gcp/mod.rs @@ -45,6 +45,7 @@ use tokio::io::AsyncWrite; use crate::client::get::GetClientExt; use crate::client::list::ListClientExt; +use crate::multipart::MultiPartStore; pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey}; pub use credential::GcpCredential; @@ -90,27 +91,17 @@ struct GCSMultipartUpload { impl PutPart for GCSMultipartUpload { /// Upload an object part async fn put_part(&self, buf: Vec, part_idx: usize) -> Result { - let upload_id = self.multipart_id.clone(); - let content_id = self - .client - .put_request( - &self.path, - buf.into(), - &[ - ("partNumber", format!("{}", part_idx + 1)), - ("uploadId", upload_id), - ], - ) - .await?; - - Ok(PartId { content_id }) + self.client + .put_part(&self.path, &self.multipart_id, part_idx, buf.into()) + .await } /// Complete a multipart upload async fn complete(&self, completed_parts: Vec) -> Result<()> { self.client .multipart_complete(&self.path, &self.multipart_id, completed_parts) - .await + .await?; + Ok(()) } } @@ -169,6 +160,36 @@ impl ObjectStore for GoogleCloudStorage { } } +#[async_trait] +impl MultiPartStore for GoogleCloudStorage { + async fn create_multipart(&self, path: &Path) -> Result { + self.client.multipart_initiate(path).await + } + + async fn put_part( + &self, + path: &Path, + id: &MultipartId, + part_idx: usize, + data: Bytes, + ) -> Result { + self.client.put_part(path, id, part_idx, data).await + } + + async fn complete_multipart( + &self, + path: &Path, + id: &MultipartId, + parts: Vec, + ) -> Result { + self.client.multipart_complete(path, id, parts).await + } + + async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> { + self.client.multipart_cleanup(path, id).await + } +} + #[cfg(test)] mod test { diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index d4c911fceab4..910f14f6389e 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -22,17 +22,18 @@ //! especially useful when dealing with large files or high-throughput systems. use async_trait::async_trait; +use bytes::Bytes; use futures::{stream::FuturesUnordered, Future, StreamExt}; use std::{io, pin::Pin, sync::Arc, task::Poll}; use tokio::io::AsyncWrite; -use crate::Result; +use crate::path::Path; +use crate::{MultipartId, PutResult, Result}; type BoxedTryFuture = Pin> + Send>>; -/// A trait that can be implemented by cloud-based object stores -/// and used in combination with [`WriteMultiPart`] to provide -/// multipart upload support +/// A trait used in combination with [`WriteMultiPart`] to implement +/// [`AsyncWrite`] on top of an API for multipart upload #[async_trait] pub trait PutPart: Send + Sync + 'static { /// Upload a single part @@ -52,6 +53,9 @@ pub struct PartId { } /// Wrapper around a [`PutPart`] that implements [`AsyncWrite`] +/// +/// Data will be uploaded in fixed size chunks of 10 MiB in parallel, +/// up to the configured maximum concurrency pub struct WriteMultiPart { inner: Arc, /// A list of completed parts, in sequential order. @@ -263,3 +267,42 @@ impl std::fmt::Debug for WriteMultiPart { .finish() } } + +/// A low-level interface for interacting with multipart upload APIs +/// +/// Most use-cases should prefer [`ObjectStore::put_multipart`] as this is supported by more +/// backends, including [`LocalFileSystem`], and automatically handles uploading fixed +/// size parts of sufficient size in parallel +/// +/// [`ObjectStore::put_multipart`]: crate::ObjectStore::put_multipart +/// [`LocalFileSystem`]: crate::local::LocalFileSystem +#[async_trait] +pub trait MultiPartStore: Send + Sync + 'static { + /// Creates a new multipart upload, returning the [`MultipartId`] + async fn create_multipart(&self, path: &Path) -> Result; + + /// Uploads a new part with index `part_idx` + /// + /// Most stores require that all parts apart from the final are at least 5 MiB. Additionally + /// some stores require all parts excluding the last to be the same size, e.g. [R2] + /// + /// [R2]: https://developers.cloudflare.com/r2/objects/multipart-objects/#limitations + async fn put_part( + &self, + path: &Path, + id: &MultipartId, + part_idx: usize, + data: Bytes, + ) -> Result; + + /// Completes a multipart upload + async fn complete_multipart( + &self, + path: &Path, + id: &MultipartId, + parts: Vec, + ) -> Result; + + /// Aborts a multipart upload + async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()>; +} diff --git a/object_store/src/signer.rs b/object_store/src/signer.rs index f792397a7894..ed92e28799e5 100644 --- a/object_store/src/signer.rs +++ b/object_store/src/signer.rs @@ -23,8 +23,7 @@ use reqwest::Method; use std::{fmt, time::Duration}; use url::Url; -/// Universal API to presigned URLs generated from multiple object store services. Not supported by -/// all object store services. +/// Universal API to generate presigned URLs from multiple object store services. #[async_trait] pub trait Signer: Send + Sync + fmt::Debug + 'static { /// Given the intended [`Method`] and [`Path`] to use and the desired length of time for which