From 9fcdda2e7d5625d96be53b49d8a30b7b6886aa40 Mon Sep 17 00:00:00 2001 From: Francois Visconte Date: Tue, 4 Jun 2024 11:28:39 +0200 Subject: [PATCH] Allocate direct buffers for multipart upload Allow to allocate multipart upload buffers as direct buffer rather than on the heap. We try to set a pretty large multipart upload part size on cluster to optimize throughput and reduce S3 requests. At the same time, we try to keep kafka JVM heap size contained on most kafka installation in order to leave as much memory as possible for the page cache. As a matter of example, we will use 4GB heap size on machines with 64GB available memory. The consequence of using pretty large multipart upload size on contained JVM heap size is that we can pretty easily run out of heap size if we suddenly have to upload many segments to tiered storage. The strategy we propose is to allocate multipart buffer in direct memory so that we can more easily configure direct buffer budget. --- .../storage/s3/S3MultiPartOutputStream.java | 9 +++++++-- .../tieredstorage/storage/s3/S3Storage.java | 5 ++++- .../storage/s3/S3StorageConfig.java | 15 +++++++++++++++ .../storage/s3/S3MultiPartOutputStreamTest.java | 16 ++++++++-------- 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java index af5806698..f69b00cba 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStream.java @@ -66,12 +66,17 @@ public class S3MultiPartOutputStream extends OutputStream { public S3MultiPartOutputStream(final String bucketName, final ObjectKey key, final int partSize, - final S3Client client) { + final S3Client client, + final boolean directAllocation) { this.bucketName = bucketName; this.key = key; this.client = client; this.partSize = partSize; - this.partBuffer = ByteBuffer.allocate(partSize); + if (directAllocation) { + this.partBuffer = ByteBuffer.allocateDirect(partSize); + } else { + this.partBuffer = ByteBuffer.allocate(partSize); + } final CreateMultipartUploadRequest initialRequest = CreateMultipartUploadRequest.builder().bucket(bucketName) .key(key.value()).build(); final CreateMultipartUploadResponse initiateResult = client.createMultipartUpload(initialRequest); diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java index 68fad748d..f1e81bfe2 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3Storage.java @@ -44,12 +44,15 @@ public class S3Storage implements StorageBackend { private String bucketName; private int partSize; + private boolean multipartDirectBuffers; + @Override public void configure(final Map configs) { final S3StorageConfig config = new S3StorageConfig(configs); this.s3Client = S3ClientBuilder.build(config); this.bucketName = config.bucketName(); this.partSize = config.uploadPartSize(); + this.multipartDirectBuffers = config.multipartDirectBuffers(); } @Override @@ -63,7 +66,7 @@ public long upload(final InputStream inputStream, final ObjectKey key) throws St } S3MultiPartOutputStream s3OutputStream(final ObjectKey key) { - return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client); + return new S3MultiPartOutputStream(bucketName, key, partSize, s3Client, multipartDirectBuffers); } @Override diff --git a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java index cd13db260..75421054d 100644 --- a/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java +++ b/storage/s3/src/main/java/io/aiven/kafka/tieredstorage/storage/s3/S3StorageConfig.java @@ -58,6 +58,11 @@ public class S3StorageConfig extends AbstractConfig { private static final String S3_MULTIPART_UPLOAD_PART_SIZE_DOC = "Size of parts in bytes to use when uploading. " + "All parts but the last one will have this size. " + "Valid values: between 5MiB and 2GiB"; + + public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG = "s3.multipart.upload.direct.buffers"; + public static final String S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC = + "Allocate multipart upload buffers as direct buffers (off-heap)"; + static final int S3_MULTIPART_UPLOAD_PART_SIZE_MIN = 5 * 1024 * 1024; // 5MiB static final int S3_MULTIPART_UPLOAD_PART_SIZE_MAX = Integer.MAX_VALUE; static final int S3_MULTIPART_UPLOAD_PART_SIZE_DEFAULT = S3_MULTIPART_UPLOAD_PART_SIZE_MIN; @@ -120,6 +125,12 @@ public class S3StorageConfig extends AbstractConfig { null, ConfigDef.Importance.LOW, S3_PATH_STYLE_ENABLED_DOC) + .define( + S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.LOW, + S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_DOC) .define( S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG, ConfigDef.Type.INT, @@ -261,6 +272,10 @@ public Boolean pathStyleAccessEnabled() { return getBoolean(S3_PATH_STYLE_ENABLED_CONFIG); } + public Boolean multipartDirectBuffers() { + return getBoolean(S3_MULTIPART_UPLOAD_DIRECT_BUFFERS_CONFIG); + } + public int uploadPartSize() { return getInt(S3_MULTIPART_UPLOAD_PART_SIZE_CONFIG); } diff --git a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java index d95824584..3e23e4dfd 100644 --- a/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java +++ b/storage/s3/src/test/java/io/aiven/kafka/tieredstorage/storage/s3/S3MultiPartOutputStreamTest.java @@ -83,7 +83,7 @@ void sendAbortForAnyExceptionWhileWriting() { when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))) .thenThrow(testException); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 1, mockedS3, false); assertThatThrownBy(() -> out.write(new byte[] {1, 2, 3})) .isInstanceOf(IOException.class) .hasRootCause(testException); @@ -105,7 +105,7 @@ void sendAbortForAnyExceptionWhenClosingUpload() throws Exception { when(mockedS3.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))) .thenThrow(RuntimeException.class); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false); final byte[] buffer = new byte[5]; random.nextBytes(buffer); @@ -132,7 +132,7 @@ void sendAbortForAnyExceptionWhenClosingComplete() throws Exception { when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) .thenThrow(RuntimeException.class); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 10, mockedS3, false); final byte[] buffer = new byte[5]; random.nextBytes(buffer); @@ -159,7 +159,7 @@ void writesOneByte() throws Exception { when(mockedS3.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) .thenReturn(CompleteMultipartUploadResponse.builder().eTag("SOME_ETAG").build()); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false); out.write(1); out.close(); @@ -197,7 +197,7 @@ void writesMultipleMessages() throws Exception { .thenReturn(CompleteMultipartUploadResponse.builder().build()); final List expectedMessagesList = new ArrayList<>(); - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, bufferSize, mockedS3, false); for (int i = 0; i < 3; i++) { random.nextBytes(message); out.write(message, 0, message.length); @@ -257,7 +257,7 @@ void writesTailMessages() throws Exception { final byte[] expectedFullMessage = new byte[messageSize + 10]; final byte[] expectedTailMessage = new byte[10]; - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, messageSize + 10, mockedS3, false); final byte[] message = new byte[messageSize]; random.nextBytes(message); out.write(message); @@ -288,7 +288,7 @@ void writesTailMessages() throws Exception { @Test void sendAbortIfNoWritingHappened() throws IOException { - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false); out.close(); verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture()); @@ -299,7 +299,7 @@ void sendAbortIfNoWritingHappened() throws IOException { @Test void failWhenUploadingPartAfterStreamIsClosed() throws IOException { - final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3); + final var out = new S3MultiPartOutputStream(BUCKET_NAME, FILE_KEY, 100, mockedS3, false); out.close(); verify(mockedS3).abortMultipartUpload(abortMultipartUploadRequestCaptor.capture());