Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allocate direct buffers for multipart upload #559

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ public class S3MultiPartOutputStream extends OutputStream {
public S3MultiPartOutputStream(final String bucketName,
final ObjectKey key,
final int partSize,
final S3Client client) {
final S3Client client,
final boolean directAllocation) {
this.bucketName = bucketName;
this.key = key;
this.client = client;
this.partSize = partSize;
this.partBuffer = ByteBuffer.allocate(partSize);
if (directAllocation) {
this.partBuffer = ByteBuffer.allocateDirect(partSize);
} else {
this.partBuffer = ByteBuffer.allocate(partSize);
}
final CreateMultipartUploadRequest initialRequest = CreateMultipartUploadRequest.builder().bucket(bucketName)
.key(key.value()).build();
final CreateMultipartUploadResponse initiateResult = client.createMultipartUpload(initialRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ public class S3Storage implements StorageBackend {
private String bucketName;
private int partSize;

private boolean multipartDirectBuffers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
private boolean multipartDirectBuffers;
private boolean bufferAllocationDirect;


@Override
public void configure(final Map<String, ?> configs) {
final S3StorageConfig config = new S3StorageConfig(configs);
this.s3Client = S3ClientBuilder.build(config);
this.bucketName = config.bucketName();
this.partSize = config.uploadPartSize();
this.multipartDirectBuffers = config.multipartDirectBuffers();
}

@Override
Expand All @@ -63,7 +66,7 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St
}

S3MultiPartOutputStream s3OutputStream(final ObjectKey key) {
return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client);
return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client, multipartDirectBuffers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public class S3StorageConfig extends AbstractConfig {
private static final String S3_MULTIPART_UPLOAD_PART_SIZE_DOC = "Size of parts in bytes to use when uploading. "
+ "All parts but the last one will have this size. "
+ "Valid values: between 5MiB and 2GiB";

public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG = "s3.multipart.upload.direct.buffers";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG = "s3.multipart.upload.direct.buffers";
public static final String S3_MULTIPART_UPLOAD_BUFFER_ALLOCATION_DIRECT_CONFIG = "s3.multipart.upload.buffer.allocation.direct";

public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC =
"Allocate multipart upload buffers as direct buffers (off-heap)";

static final int S3_MULTIPART_UPLOAD_PART_SIZE_MIN = 5 * 1024 * 1024; // 5MiB
static final int S3_MULTIPART_UPLOAD_PART_SIZE_MAX = Integer.MAX_VALUE;
static final int S3_MULTIPART_UPLOAD_PART_SIZE_DEFAULT = S3_MULTIPART_UPLOAD_PART_SIZE_MIN;
Expand Down Expand Up @@ -120,6 +125,12 @@ public class S3StorageConfig extends AbstractConfig {
null,
ConfigDef.Importance.LOW,
S3_PATH_STYLE_ENABLED_DOC)
.define(
S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC)
.define(
S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -261,6 +272,10 @@ public Boolean pathStyleAccessEnabled() {
return getBoolean(S3_PATH_STYLE_ENABLED_CONFIG);
}

public Boolean multipartDirectBuffers() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public Boolean multipartDirectBuffers() {
public Boolean uploadBufferAllocationDirect() {

return getBoolean(S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG);
}

public int uploadPartSize() {
return getInt(S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG);
}
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these tests be parametrized to have coverage for the direct allocation?

Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void sendAbortForAnyExceptionWhileWriting() {
when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class)))
.thenThrow(testException);

final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3, false);
assertThatThrownBy(() -> out.write(new byte[] {1, 2, 3}))
.isInstanceOf(IOException.class)
.hasRootCause(testException);
Expand All @@ -105,7 +105,7 @@ void sendAbortForAnyExceptionWhenClosingUpload() throws Exception {
when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class)))
.thenThrow(RuntimeException.class);

final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false);

final byte[] buffer = new byte[5];
random.nextBytes(buffer);
Expand All @@ -132,7 +132,7 @@ void sendAbortForAnyExceptionWhenClosingComplete() throws Exception {
when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
.thenThrow(RuntimeException.class);

final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false);

final byte[] buffer = new byte[5];
random.nextBytes(buffer);
Expand All @@ -159,7 +159,7 @@ void writesOneByte() throws Exception {
when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
.thenReturn(CompleteMultipartUploadResponse.builder().eTag("SOME_ETAG").build());

final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false);
out.write(1);
out.close();

Expand Down Expand Up @@ -197,7 +197,7 @@ void writesMultipleMessages() throws Exception {
.thenReturn(CompleteMultipartUploadResponse.builder().build());

final List<byte[]> expectedMessagesList = new ArrayList<>();
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3, false);
for (int i = 0; i < 3; i++) {
random.nextBytes(message);
out.write(message, 0, message.length);
Expand Down Expand Up @@ -257,7 +257,7 @@ void writesTailMessages() throws Exception {
final byte[] expectedFullMessage = new byte[messageSize + 10];
final byte[] expectedTailMessage = new byte[10];

final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3, false);
final byte[] message = new byte[messageSize];
random.nextBytes(message);
out.write(message);
Expand Down Expand Up @@ -288,7 +288,7 @@ void writesTailMessages() throws Exception {

@Test
void sendAbortIfNoWritingHappened() throws IOException {
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false);
out.close();

verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
Expand All @@ -299,7 +299,7 @@ void sendAbortIfNoWritingHappened() throws IOException {

@Test
void failWhenUploadingPartAfterStreamIsClosed() throws IOException {
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3);
final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false);
out.close();

verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());
Expand Down
Loading