Skip to content

Commit

Permalink
Add MultiPartStore (#4961) (#4608)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 21, 2023
1 parent f4a2a88 commit 9f961f4
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 78 deletions.
33 changes: 29 additions & 4 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{ClientOptions, GetOptions, ListResult, MultipartId, Path, Result, RetryConfig};
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, Path, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -506,12 +508,32 @@ impl S3Client {
Ok(response.upload_id)
}

pub async fn put_part(
&self,
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
let part = (part_idx + 1).to_string();

let content_id = self
.put_request(
path,
data,
&[("partNumber", &part), ("uploadId", upload_id)],
)
.await?;

Ok(PartId { content_id })
}

pub async fn complete_multipart(
&self,
location: &Path,
upload_id: &str,
parts: Vec<PartId>,
) -> Result<()> {
) -> Result<PutResult> {
let parts = parts
.into_iter()
.enumerate()
Expand All @@ -527,7 +549,8 @@ impl S3Client {
let credential = self.get_credential().await?;
let url = self.config.path_url(location);

self.client
let response = self
.client
.request(Method::POST, url)
.query(&[("uploadId", upload_id)])
.body(body)
Expand All @@ -542,7 +565,9 @@ impl S3Client {
.await
.context(CompleteMultipartRequestSnafu)?;

Ok(())
let etag = get_etag(response.headers()).context(MetadataSnafu)?;

Ok(PutResult { e_tag: Some(etag) })
}
}

Expand Down
47 changes: 34 additions & 13 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::aws::client::S3Client;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{PartId, PutPart, WriteMultiPart};
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
use crate::signer::Signer;
use crate::{
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutResult,
Expand Down Expand Up @@ -246,18 +246,9 @@ struct S3MultiPartUpload {
#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let part = (part_idx + 1).to_string();

let content_id = self
.client
.put_request(
&self.location,
buf.into(),
&[("partNumber", &part), ("uploadId", &self.upload_id)],
)
.await?;

Ok(PartId { content_id })
self.client
.put_part(&self.location, &self.upload_id, part_idx, buf.into())
.await
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
Expand All @@ -268,6 +259,36 @@ impl PutPart for S3MultiPartUpload {
}
}

#[async_trait]
impl MultiPartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.create_multipart(path).await
}

async fn put_part(
&self,
path: &Path,
id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
self.client.put_part(path, id, part_idx, data).await
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.complete_multipart(path, id, parts).await
}

async fn abort_multipart(&self, path: &Path, id: &MultipartId) -> Result<()> {
self.client.delete_request(path, &[("uploadId", id)]).await
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
49 changes: 47 additions & 2 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ use super::credential::AzureCredential;
use crate::azure::credential::*;
use crate::azure::{AzureCredentialProvider, STORE};
use crate::client::get::GetClient;
use crate::client::header::HeaderConfig;
use crate::client::header::{get_etag, HeaderConfig};
use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::{ClientOptions, GetOptions, ListResult, ObjectMeta, Path, Result, RetryConfig};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -84,6 +87,11 @@ pub(crate) enum Error {
Authorization {
source: crate::azure::credential::Error,
},

#[snafu(display("Unable to extract metadata from headers: {}", source))]
Metadata {
source: crate::client::header::Error,
},
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -190,6 +198,43 @@ impl AzureClient {
Ok(response)
}

/// PUT a block <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block>
pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

self.put_request(
path,
Some(data),
true,
&[
("comp", "block"),
("blockid", &BASE64_STANDARD.encode(block_id)),
],
)
.await?;

Ok(PartId { content_id })
}

/// PUT a block list <https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
pub async fn put_block_list(&self, path: &Path, parts: Vec<PartId>) -> Result<PutResult> {
let blocks = parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
.collect();

let block_list = BlockList { blocks };
let block_xml = block_list.to_xml();

let response = self
.put_request(path, Some(block_xml.into()), true, &[("comp", "blocklist")])
.await?;

let e_tag = get_etag(response.headers()).context(MetadataSnafu)?;
Ok(PutResult { e_tag: Some(e_tag) })
}

/// Make an Azure Delete request <https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
pub async fn delete_request<T: Serialize + ?Sized + Sync>(
&self,
Expand Down
67 changes: 33 additions & 34 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,12 @@
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
use self::client::{BlockId, BlockList};
use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutResult, Result,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use futures::stream::BoxStream;
use std::fmt::Debug;
Expand All @@ -53,6 +50,7 @@ mod credential;
/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::client::header::get_etag;
use crate::multipart::MultiPartStore;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -151,43 +149,44 @@ struct AzureMultiPartUpload {

#[async_trait]
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let content_id = format!("{part_idx:20}");
let block_id: BlockId = content_id.clone().into();

self.client
.put_request(
&self.location,
Some(buf.into()),
true,
&[
("comp", "block"),
("blockid", &BASE64_STANDARD.encode(block_id)),
],
)
.await?;
async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
self.client.put_block(&self.location, idx, buf.into()).await
}

Ok(PartId { content_id })
async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
self.client.put_block_list(&self.location, parts).await?;
Ok(())
}
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let blocks = completed_parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
.collect();
#[async_trait]
impl MultiPartStore for MicrosoftAzure {
async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
Ok(String::new())
}

let block_list = BlockList { blocks };
let block_xml = block_list.to_xml();
async fn put_part(
&self,
path: &Path,
_: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
self.client.put_block(path, part_idx, data).await
}

self.client
.put_request(
&self.location,
Some(block_xml.into()),
true,
&[("comp", "blocklist")],
)
.await?;
async fn complete_multipart(
&self,
path: &Path,
_: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
self.client.put_block_list(path, parts).await
}

async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
}
}
Expand Down
34 changes: 30 additions & 4 deletions object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::client::GetOptionsExt;
use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
use crate::multipart::PartId;
use crate::path::{Path, DELIMITER};
use crate::{ClientOptions, GetOptions, ListResult, MultipartId, Result, RetryConfig};
use crate::{ClientOptions, GetOptions, ListResult, MultipartId, PutResult, Result, RetryConfig};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
Expand Down Expand Up @@ -184,6 +184,30 @@ impl GoogleCloudStorageClient {
Ok(get_etag(response.headers()).context(MetadataSnafu)?)
}

/// Perform a put part request <https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
///
/// Returns the new [`PartId`]
pub async fn put_part(
&self,
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
let content_id = self
.put_request(
path,
data,
&[
("partNumber", &format!("{}", part_idx + 1)),
("uploadId", upload_id),
],
)
.await?;

Ok(PartId { content_id })
}

/// Initiate a multi-part upload <https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
pub async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId> {
let credential = self.get_credential().await?;
Expand Down Expand Up @@ -240,7 +264,7 @@ impl GoogleCloudStorageClient {
path: &Path,
multipart_id: &MultipartId,
completed_parts: Vec<PartId>,
) -> Result<()> {
) -> Result<PutResult> {
let upload_id = multipart_id.clone();
let url = self.object_url(path);

Expand All @@ -263,7 +287,8 @@ impl GoogleCloudStorageClient {
// https://github.com/tafia/quick-xml/issues/350
.replace("&quot;", "\"");

self.client
let result = self
.client
.request(Method::POST, &url)
.bearer_auth(&credential.bearer)
.query(&[("uploadId", upload_id)])
Expand All @@ -274,7 +299,8 @@ impl GoogleCloudStorageClient {
path: path.as_ref(),
})?;

Ok(())
let etag = get_etag(result.headers()).context(MetadataSnafu)?;
Ok(PutResult { e_tag: Some(etag) })
}

/// Perform a delete request <https://cloud.google.com/storage/docs/xml-api/delete-object>
Expand Down
Loading

0 comments on commit 9f961f4

Please sign in to comment.