Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement put optimisation for small files in UploadRequest #1231

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 61 additions & 16 deletions mountpoint-s3/src/upload/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::fmt::Debug;
use mountpoint_s3_client::checksums::{crc32c, crc32c_from_base64, Crc32c};
use mountpoint_s3_client::error::{ObjectClientError, PutObjectError};
use mountpoint_s3_client::types::{
ChecksumAlgorithm, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview,
ChecksumAlgorithm, PutObjectParams, PutObjectResult, PutObjectSingleParams, PutObjectTrailingChecksums,
UploadChecksum, UploadReview,
};
use mountpoint_s3_client::{ObjectClient, PutObjectRequest};
use tracing::error;
Expand All @@ -15,6 +16,7 @@ use crate::ServerSideEncryption;
use super::UploadError;

const MAX_S3_MULTIPART_UPLOAD_PARTS: usize = 10000;
const SINGLE_UPLOAD_THRESHOLD: usize = 16 * 1024 * 1024; // 16MB in bytes

/// Manages the upload of an object to S3.
///
Expand All @@ -27,6 +29,11 @@ pub struct UploadRequest<Client: ObjectClient> {
hasher: crc32c::Hasher,
maximum_upload_size: Option<usize>,
sse: ServerSideEncryption,

client: Client,
storage_class: Option<String>,
buffered_data: Vec<u8>,
// We should consider adding the field "upload_request_params: UploadRequestParams" to simplify the UploadRequest type since we use all of its fields but one.
}

/// Parameters to initialize an [UploadRequest].
Expand All @@ -40,7 +47,7 @@ pub struct UploadRequestParams {

impl<Client> UploadRequest<Client>
where
Client: ObjectClient + Send + 'static,
Client: ObjectClient + Clone + Send + 'static,
{
pub fn new(
runtime: &BoxRuntime,
Expand Down Expand Up @@ -77,23 +84,28 @@ where
let maximum_upload_size = client
.write_part_size()
.map(|ps| ps.saturating_mul(MAX_S3_MULTIPART_UPLOAD_PARTS));
let client_clone = client.clone();
let request = runtime
.spawn_with_result(async move { client.put_object(&put_bucket, &put_key, &put_object_params).await })
.spawn_with_result(async move { client_clone.put_object(&put_bucket, &put_key, &put_object_params).await })
.unwrap();

Ok(UploadRequest {
request,
bucket: params.bucket,
key: params.key,
bucket: params.bucket.clone(),
key: params.key.clone(),
next_request_offset: 0,
hasher: crc32c::Hasher::new(),
maximum_upload_size,
sse: params.server_side_encryption,
sse: params.server_side_encryption.clone(),
// Store for potential single upload:
client,
storage_class: params.storage_class.clone(),
buffered_data: Vec::with_capacity(SINGLE_UPLOAD_THRESHOLD),
})
}

pub fn size(&self) -> u64 {
self.next_request_offset
pub fn size(&self) -> usize {
self.next_request_offset as usize
}

pub async fn write(&mut self, offset: i64, data: &[u8]) -> Result<usize, UploadError<Client::ClientError>> {
Expand All @@ -111,7 +123,22 @@ where
}

self.hasher.update(data);
self.request.get_mut().await?.unwrap().write(data).await?;

let new_size = self.size() + data.len();
if new_size <= SINGLE_UPLOAD_THRESHOLD {
// Buffer the data for potential single upload
self.buffered_data.extend_from_slice(data);
} else {
let request = self.request.get_mut().await?.unwrap();

if !self.buffered_data.is_empty() {
request.write(&self.buffered_data).await?;
self.buffered_data.clear();
self.buffered_data.shrink_to_fit();
}

request.write(data).await?; // 20ms overhead (once)
}

self.next_request_offset += data.len() as u64;
Ok(data.len())
Expand All @@ -120,13 +147,31 @@ where
pub async fn complete(self) -> Result<PutObjectResult, UploadError<Client::ClientError>> {
let size = self.size();
let checksum = self.hasher.finalize();
let result = self
.request
.into_inner()
.await?
.unwrap()
.review_and_complete(move |review| verify_checksums(review, size, checksum))
.await?;

let result = if !self.buffered_data.is_empty() {
drop(self.request); // Abort MPU

let mut params = PutObjectSingleParams::new();
params = params.checksum(Some(UploadChecksum::Crc32c(checksum)));
if let Some(storage_class) = &self.storage_class {
params = params.storage_class(storage_class.clone());
}
let (sse_type, key_id) = self.sse.clone().into_inner()?;
params = params.server_side_encryption(sse_type);
params = params.ssekms_key_id(key_id);

self.client
.put_object_single(&self.bucket, &self.key, &params, self.buffered_data)
.await?
} else {
self.request
.into_inner()
.await?
.unwrap()
.review_and_complete(move |review| verify_checksums(review, size as u64, checksum))
.await?
};

if let Err(err) = self
.sse
.verify_response(result.sse_type.as_deref(), result.sse_kms_key_id.as_deref())
Expand Down
Loading