From 856735e8a15e5dfd821642887025b049dc884a2d Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 28 Feb 2024 10:43:56 +0800 Subject: [PATCH] feat(s3stream): use sequential memory alloc for stream set object compaction Signed-off-by: Shichao Nie --- .../automq/stream/DirectByteBufSeqAlloc.java | 5 +++-- .../com/automq/stream/s3/ByteBufAlloc.java | 20 +++++++++---------- .../stream/s3/StreamRecordBatchCodec.java | 2 +- .../s3/compact/operator/DataBlockReader.java | 6 ++++-- .../s3/metrics/S3StreamMetricsManager.java | 8 ++++---- .../stream/DirectByteBufSeqAllocTest.java | 2 +- 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java index 854ae2120..81c934f5e 100644 --- a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java @@ -19,11 +19,12 @@ public class DirectByteBufSeqAlloc { public static final int HUGE_BUF_SIZE = 8 * 1024 * 1024; // why not use ThreadLocal? the partition open has too much threads - final AtomicReference[] hugeBufArray = new AtomicReference[8]; + final AtomicReference[] hugeBufArray; private final int allocType; - public DirectByteBufSeqAlloc(int allocType) { + public DirectByteBufSeqAlloc(int allocType, int concurrency) { this.allocType = allocType; + hugeBufArray = new AtomicReference[concurrency]; for (int i = 0; i < hugeBufArray.length; i++) { hugeBufArray[i] = new AtomicReference<>(new HugeBuf(ByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType))); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java index faa45a37e..dfe72bede 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java @@ -48,7 +48,7 @@ public class ByteBufAlloc { public static final int STREAM_OBJECT_COMPACTION_WRITE = 8; public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9; public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10; - public static DirectByteBufAllocMetric directByteBufAllocMetric = null; + public static ByteBufAllocMetric byteBufAllocMetric = null; static { registerAllocType(DEFAULT, "default"); @@ -87,8 +87,8 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { if (now - lastMetricLogTime > 60000) { // it's ok to be not thread safe lastMetricLogTime = now; - ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.info("Direct Memory usage: {}", ByteBufAlloc.directByteBufAllocMetric); + ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric(); + LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric); } return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); } else { @@ -96,12 +96,12 @@ public static ByteBuf byteBuffer(int initCapacity, int type) { } } catch (OutOfMemoryError e) { if (MEMORY_USAGE_DETECT) { - ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric(); - LOGGER.error("alloc direct buffer OOM, {}", ByteBufAlloc.directByteBufAllocMetric, e); + ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric(); + LOGGER.error("alloc buffer OOM, {}", ByteBufAlloc.byteBufAllocMetric, e); } else { - LOGGER.error("alloc direct buffer OOM", e); + LOGGER.error("alloc buffer OOM", e); } - System.err.println("alloc direct buffer OOM"); + System.err.println("alloc buffer OOM"); Runtime.getRuntime().halt(1); throw e; } @@ -114,12 +114,12 @@ public static void registerAllocType(int type, String name) { ALLOC_TYPE.put(type, name); } - public static class DirectByteBufAllocMetric { + public static class ByteBufAllocMetric { private final long usedMemory; private final long allocatedMemory; private final Map detail = new HashMap<>(); - public DirectByteBufAllocMetric() { + public ByteBufAllocMetric() { USAGE_STATS.forEach((k, v) -> { detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue()); }); @@ -138,7 +138,7 @@ public Map getDetailedMap() { @Override public String toString() { - StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedMemory="); + StringBuilder sb = new StringBuilder("ByteBufAllocMetric{usedMemory="); sb.append(usedMemory); sb.append(", allocatedMemory="); sb.append(allocatedMemory); diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java index eae64872e..ab56514f6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java @@ -26,7 +26,7 @@ public class StreamRecordBatchCodec { + 8 // baseOffset + 4 // lastOffsetDelta + 4; // payload length - private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD); + private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD, 8); public static ByteBuf encode(StreamRecordBatch streamRecord) { int totalLength = HEADER_SIZE + streamRecord.size(); // payload diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 17cf005df..9d5c961bc 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -11,6 +11,7 @@ package com.automq.stream.s3.compact.operator; +import com.automq.stream.DirectByteBufSeqAlloc; import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.ObjectReader; @@ -39,6 +40,7 @@ //TODO: refactor to reduce duplicate code with ObjectWriter public class DataBlockReader { private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReader.class); + private static final DirectByteBufSeqAlloc DIRECT_ALLOC = new DirectByteBufSeqAlloc(STREAM_SET_OBJECT_COMPACTION_READ, 1); private final S3ObjectMetadata metadata; private final String objectKey; private final S3Operator s3Operator; @@ -195,7 +197,7 @@ private CompletableFuture rangeRead0(long start, long end) { if (throttleBucket == null) { return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); + ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes()); directBuf.writeBytes(buf); buf.release(); return directBuf; @@ -205,7 +207,7 @@ private CompletableFuture rangeRead0(long start, long end) { .thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); + ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes()); directBuf.writeBytes(buf); buf.release(); return directBuf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 74571a852..5da77fbcf 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -272,8 +272,8 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) { - Map allocateSizeMap = ByteBufAlloc.directByteBufAllocMetric.getDetailedMap(); + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.byteBufAllocMetric != null) { + Map allocateSizeMap = ByteBufAlloc.byteBufAllocMetric.getDetailedMap(); for (Map.Entry entry : allocateSizeMap.entrySet()) { result.record(entry.getValue(), ALLOC_TYPE_ATTRIBUTES.get(entry.getKey())); } @@ -284,8 +284,8 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .ofLongs() .buildWithCallback(result -> { - if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) { - result.record(ByteBufAlloc.directByteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes()); + if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.byteBufAllocMetric != null) { + result.record(ByteBufAlloc.byteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes()); } }); } diff --git a/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java b/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java index 5764e9304..7d801980a 100644 --- a/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java +++ b/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java @@ -23,7 +23,7 @@ public class DirectByteBufSeqAllocTest { @Test public void testAlloc() { - DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0); + DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0, 1); AtomicReference bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)];