Skip to content

Commit

Permalink
Add MultiPartStore (#4961) (#4608) (#4971)
Browse files Browse the repository at this point in the history
* Add MultiPartStore (#4961) (#4608)

* Parse CompleteMultipartUploadResult (#4965)

* More docs

* Add integration test

* Fix azure

* More docs

* Don't gate multipart behind feature flag
  • Loading branch information
tustvold authored Oct 25, 2023
1 parent a6a512f commit e78d140
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 85 deletions.
51 changes: 47 additions & 4 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig};
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -115,6 +117,9 @@ pub(crate) enum Error {
#[snafu(display("Error performing complete multipart request: {}", source))]
CompleteMultipartRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting complete multipart response body: {}", source))]
CompleteMultipartResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid list response: {}", source))]
InvalidListResponse { source: quick_xml::de::DeError },

Expand Down Expand Up @@ -162,6 +167,13 @@ struct MultipartPart {
part_number: usize,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase", rename = "CompleteMultipartUploadResult")]
struct CompleteMultipartResult {
#[serde(rename = "ETag")]
e_tag: String,
}

#[derive(Deserialize)]
#[serde(rename_all = "PascalCase", rename = "DeleteResult")]
struct BatchDeleteResponse {
Expand Down Expand Up @@ -506,12 +518,32 @@ impl S3Client {
Ok(response.upload_id)
}

pub async fn put_part(
&self,
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
let part = (part_idx + 1).to_string();

let content_id = self
.put_request(
path,
data,
&[("partNumber", &part), ("uploadId", upload_id)],
)
.await?;

Ok(PartId { content_id })
}

pub async fn complete_multipart(
&self,
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
) -> Result<()> {
) -> Result<PutResult> {
let parts = parts
.into_iter()
.enumerate()
Expand All @@ -527,7 +559,8 @@ impl S3Client {
let credential = self.get_credential().await?;
let url = self.config.path_url(location);

self.client
let response = self
.client
.request(Method::POST, url)
.query(&[("uploadId", upload_id)])
.body(body)
Expand All @@ -542,7 +575,17 @@ impl S3Client {
.await
.context(CompleteMultipartRequestSnafu)?;

Ok(())
let data = response
.bytes()
.await
.context(CompleteMultipartResponseBodySnafu)?;

let response: CompleteMultipartResult =
quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;

Ok(PutResult {
e_tag: Some(response.e_tag),
})
}
}

Expand Down
49 changes: 36 additions & 13 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::aws::client::S3Client;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{PartId, PutPart, WriteMultiPart};
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
use crate::signer::Signer;
use crate::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutResult,
Expand Down Expand Up @@ -246,18 +246,9 @@ struct S3MultiPartUpload {
#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let part = (part_idx + 1).to_string();

let content_id = self
.client
.put_request(
&self.location,
buf.into(),
&[("partNumber", &part), ("uploadId", &self.upload_id)],
)
.await?;

Ok(PartId { content_id })
self.client
.put_part(&self.location, &self.upload_id, part_idx, buf.into())
.await
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
Expand All @@ -268,6 +259,36 @@ impl PutPart for S3MultiPartUpload {
}
}

#[async_trait]
impl MultiPartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.create_multipart(path).await
}

async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
self.client.put_part(path, id, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.complete_multipart(path, id, parts).await
}

async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
self.client.delete_request(path, &[("uploadId", id)]).await
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -293,6 +314,8 @@ mod tests {
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
stream_get(&integration).await;
multipart(&integration, &integration).await;

if test_not_exists {
copy_if_not_exists(&integration).await;
}
Expand Down
49 changes: 47 additions & 2 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ use super::credential::AzureCredential;
use crate::azure::credential::*;
use crate::azure::{AzureCredentialProvider, STORE};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::header::{get_etag, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::{ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result, RetryConfig};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -84,6 +87,11 @@ pub(crate) enum Error {
Authorization {
source: crate::azure::credential::Error,
},

#[snafu(display("Unable to extract metadata from headers: {}", source))]
Metadata {
source: crate::client::header::Error,
},
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -190,6 +198,43 @@ impl AzureClient {
Ok(response)
}

/// PUT a block <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block>
pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

self.put_request(
path,
Some(data),
true,
&[
("comp", "block"),
("blockid", &BASE64_STANDARD.encode(block_id)),
],
)
.await?;

Ok(PartId { content_id })
}

/// PUT a block list <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
pub async fn put_block_list(&self, path: &Path, parts: Vec<PartId>) -> Result<PutResult> {
let blocks = parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
.collect();

let block_list = BlockList { blocks };
let block_xml = block_list.to_xml();

let response = self
.put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")])
.await?;

let e_tag = get_etag(response.headers()).context(MetadataSnafu)?;
Ok(PutResult { e_tag: Some(e_tag) })
}

/// Make an Azure Delete request <https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
pub async fn delete_request<T: Serialize + ?Sized + Sync>(
&self,
Expand Down
73 changes: 35 additions & 38 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
use self::client::{BlockId, BlockList};
use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use futures::stream::BoxStream;
use std::fmt::Debug;
Expand All @@ -53,6 +50,7 @@ mod credential;
/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::client::header::get_etag;
use crate::multipart::MultiPartStore;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -151,54 +149,52 @@ struct AzureMultiPartUpload {

#[async_trait]
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

self.client
.put_request(
&self.location,
Some(buf.into()),
true,
&[
("comp", "block"),
("blockid", &BASE64_STANDARD.encode(block_id)),
],
)
.await?;
async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
self.client.put_block(&self.location, idx, buf.into()).await
}

Ok(PartId { content_id })
async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
self.client.put_block_list(&self.location, parts).await?;
Ok(())
}
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let blocks = completed_parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
.collect();
#[async_trait]
impl MultiPartStore for MicrosoftAzure {
async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
Ok(String::new())
}

let block_list = BlockList { blocks };
let block_xml = block_list.to_xml();
async fn put_part(
&self,
path: &Path,
_: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
self.client.put_block(path, part_idx, data).await
}

self.client
.put_request(
&self.location,
Some(block_xml.into()),
true,
&[("comp", "blocklist")],
)
.await?;
async fn complete_multipart(
&self,
path: &Path,
_: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.put_block_list(path, parts).await
}

async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::tests::{
copy_if_not_exists, get_opts, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list_opts, rename_and_copy, stream_get,
};
use crate::tests::*;

#[tokio::test]
async fn azure_blob_test() {
Expand All @@ -212,6 +208,7 @@ mod tests {
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
multipart(&integration, &integration).await;
}

#[test]
Expand Down
Loading

0 comments on commit e78d140

Please sign in to comment.