From 46df6eac60e185cdf93d8d6acbe3972c79d94dc4 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sat, 2 Dec 2023 21:15:57 +0800 Subject: [PATCH] feat(kafka_issues500): add pooledbuf read options (#782) Signed-off-by: Robin Han --- pom.xml | 2 +- s3stream/pom.xml | 2 +- .../com/automq/stream/api/ReadOptions.java | 13 ++++ .../java/com/automq/stream/s3/S3Stream.java | 67 ++++++++++++++++--- .../stream/s3/StreamRecordBatchCodec.java | 10 ++- .../stream/s3/model/StreamRecordBatch.java | 36 ++-------- .../automq/stream/s3/ObjectWriterTest.java | 16 ++--- .../stream/s3/StreamObjectCopierTest.java | 16 ++--- 8 files changed, 97 insertions(+), 65 deletions(-) diff --git a/pom.xml b/pom.xml index bf8ec3fcd..f82856ee4 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 32.0.1-jre 2.0.9 2.2 - 0.6.5-SNAPSHOT + 0.6.6-SNAPSHOT 23.5.26 diff --git a/s3stream/pom.xml b/s3stream/pom.xml index b11778efc..70e4a6766 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -22,7 +22,7 @@ 4.0.0 com.automq.elasticstream s3stream - 0.6.5-SNAPSHOT + 0.6.6-SNAPSHOT 5.5.0 5.10.0 diff --git a/s3stream/src/main/java/com/automq/stream/api/ReadOptions.java b/s3stream/src/main/java/com/automq/stream/api/ReadOptions.java index d77d072ab..52bdaaf4d 100644 --- a/s3stream/src/main/java/com/automq/stream/api/ReadOptions.java +++ b/s3stream/src/main/java/com/automq/stream/api/ReadOptions.java @@ -23,11 +23,16 @@ public class ReadOptions { public static final ReadOptions DEFAULT = new ReadOptions(); private boolean fastRead; + private boolean pooledBuf; public boolean fastRead() { return fastRead; } + public boolean pooledBuf() { + return pooledBuf; + } + public static Builder builder() { return new Builder(); } @@ -43,6 +48,14 @@ public Builder fastRead(boolean fastRead) { return this; } + /** + * Use pooled buffer for reading. The caller is responsible for releasing the buffer. + */ + public Builder pooledBuf(boolean pooledBuf) { + options.pooledBuf = pooledBuf; + return this; + } + public ReadOptions build() { return options; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 08dd1b744..8f0609058 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -20,12 +20,12 @@ import com.automq.stream.DefaultAppendResult; import com.automq.stream.RecordBatchWithContextWrapper; import com.automq.stream.api.AppendResult; -import com.automq.stream.api.ReadOptions; -import com.automq.stream.api.exceptions.ErrorCode; import com.automq.stream.api.FetchResult; +import com.automq.stream.api.ReadOptions; import com.automq.stream.api.RecordBatch; import com.automq.stream.api.RecordBatchWithContext; import com.automq.stream.api.Stream; +import com.automq.stream.api.exceptions.ErrorCode; import com.automq.stream.api.exceptions.FastReadFailFastException; import com.automq.stream.api.exceptions.StreamClientException; import com.automq.stream.s3.cache.CacheAccessType; @@ -37,9 +37,14 @@ import com.automq.stream.s3.streams.StreamManager; import com.automq.stream.utils.FutureUtil; import io.netty.buffer.Unpooled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -50,8 +55,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static com.automq.stream.utils.FutureUtil.exec; import static com.automq.stream.utils.FutureUtil.propagate; @@ -230,14 +233,14 @@ private CompletableFuture fetch0(long startOffset, long endOffset, return FutureUtil.failedFuture(new IllegalArgumentException(String.format("fetch startOffset %s is greater than endOffset %s", startOffset, endOffset))); } if (startOffset == endOffset) { - return CompletableFuture.completedFuture(new DefaultFetchResult(Collections.emptyList(), CacheAccessType.DELTA_WAL_CACHE_HIT)); + return CompletableFuture.completedFuture(new DefaultFetchResult(Collections.emptyList(), CacheAccessType.DELTA_WAL_CACHE_HIT, false)); } return storage.read(streamId, startOffset, endOffset, maxBytes, readOptions).thenApply(dataBlock -> { List records = dataBlock.getRecords(); if (LOGGER.isTraceEnabled()) { LOGGER.trace("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size()); } - return new DefaultFetchResult(records, dataBlock.getCacheAccessType()); + return new DefaultFetchResult(records, dataBlock.getCacheAccessType(), readOptions.pooledBuf()); }); } @@ -356,13 +359,20 @@ private void updateConfirmOffset(long newOffset) { } static class DefaultFetchResult implements FetchResult { + private final List pooledRecords; private final List records; private final CacheAccessType cacheAccessType; + private final boolean pooledBuf; + private volatile boolean freed = false; - public DefaultFetchResult(List streamRecords, CacheAccessType cacheAccessType) { - this.records = streamRecords.stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList()); + public DefaultFetchResult(List streamRecords, CacheAccessType cacheAccessType, boolean pooledBuf) { + this.pooledRecords = streamRecords; + this.pooledBuf = pooledBuf; + this.records = streamRecords.stream().map(r -> new RecordBatchWithContextWrapper(covert(r, pooledBuf), r.getBaseOffset())).collect(Collectors.toList()); this.cacheAccessType = cacheAccessType; - streamRecords.forEach(StreamRecordBatch::release); + if (!pooledBuf) { + streamRecords.forEach(StreamRecordBatch::release); + } } @Override @@ -375,6 +385,45 @@ public CacheAccessType getCacheAccessType() { return cacheAccessType; } + @Override + public void free() { + if (!freed && pooledBuf) { + pooledRecords.forEach(StreamRecordBatch::release); + } + freed = true; + } + + private static RecordBatch covert(StreamRecordBatch streamRecordBatch, boolean pooledBuf) { + ByteBuffer buf; + if (pooledBuf) { + buf = streamRecordBatch.getPayload().nioBuffer(); + } else { + buf = ByteBuffer.allocate(streamRecordBatch.size()); + streamRecordBatch.getPayload().duplicate().readBytes(buf); + buf.flip(); + } + return new RecordBatch() { + @Override + public int count() { + return streamRecordBatch.getCount(); + } + + @Override + public long baseTimestamp() { + return streamRecordBatch.getEpoch(); + } + + @Override + public Map properties() { + return Collections.emptyMap(); + } + + @Override + public ByteBuffer rawPayload() { + return buf; + } + }; + } } static class Status { diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java index a289d37f0..0dd230ef8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java @@ -22,7 +22,6 @@ import java.io.DataInputStream; import java.io.IOException; -import java.nio.ByteBuffer; public class StreamRecordBatchCodec { public static final byte MAGIC_V0 = 0x22; @@ -34,17 +33,16 @@ public static ByteBuf encode(StreamRecordBatch streamRecord) { + 8 // baseOffset + 4 // lastOffsetDelta + 4 // payload length - + streamRecord.getRecordBatch().rawPayload().remaining(); // payload + + streamRecord.size(); // payload ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength); buf.writeByte(MAGIC_V0); buf.writeLong(streamRecord.getStreamId()); buf.writeLong(streamRecord.getEpoch()); buf.writeLong(streamRecord.getBaseOffset()); - buf.writeInt(streamRecord.getRecordBatch().count()); - ByteBuffer payload = streamRecord.getRecordBatch().rawPayload().duplicate(); - buf.writeInt(payload.remaining()); - buf.writeBytes(payload); + buf.writeInt(streamRecord.getCount()); + buf.writeInt(streamRecord.size()); + buf.writeBytes(streamRecord.getPayload().duplicate()); return buf; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java index 8728db00c..3b8934fa7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java +++ b/s3stream/src/main/java/com/automq/stream/s3/model/StreamRecordBatch.java @@ -17,14 +17,9 @@ package com.automq.stream.s3.model; -import com.automq.stream.api.RecordBatch; import com.automq.stream.s3.StreamRecordBatchCodec; import io.netty.buffer.ByteBuf; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Map; - public class StreamRecordBatch implements Comparable { private final long streamId; private final long epoch; @@ -68,35 +63,12 @@ public long getLastOffset() { return baseOffset + count; } - public ByteBuf getPayload() { - return payload; + public int getCount() { + return count; } - public RecordBatch getRecordBatch() { - ByteBuffer buf = ByteBuffer.allocate(payload.readableBytes()); - payload.duplicate().readBytes(buf); - buf.flip(); - return new RecordBatch() { - @Override - public int count() { - return count; - } - - @Override - public long baseTimestamp() { - return 0; - } - - @Override - public Map properties() { - return Collections.emptyMap(); - } - - @Override - public ByteBuffer rawPayload() { - return buf; - } - }; + public ByteBuf getPayload() { + return payload; } public int size() { diff --git a/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java index 595c93af1..41029be25 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/ObjectWriterTest.java @@ -73,14 +73,14 @@ public void testWrite() throws ExecutionException, InterruptedException { StreamRecordBatch r = it.next(); assertEquals(233L, r.getStreamId()); assertEquals(10L, r.getBaseOffset()); - assertEquals(5L, r.getRecordBatch().count()); - assertEquals(r1.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(5L, r.getCount()); + assertEquals(r1.getPayload(), r.getPayload()); r.release(); r = it.next(); assertEquals(233L, r.getStreamId()); assertEquals(15L, r.getBaseOffset()); - assertEquals(10L, r.getRecordBatch().count()); - assertEquals(r2.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(10L, r.getCount()); + assertEquals(r2.getPayload(), r.getPayload()); assertFalse(it.hasNext()); r.release(); } @@ -90,8 +90,8 @@ public void testWrite() throws ExecutionException, InterruptedException { StreamRecordBatch r = it.next(); assertEquals(233L, r.getStreamId()); assertEquals(25L, r.getBaseOffset()); - assertEquals(5L, r.getRecordBatch().count()); - assertEquals(r3.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(5L, r.getCount()); + assertEquals(r3.getPayload(), r.getPayload()); r.release(); } @@ -103,8 +103,8 @@ public void testWrite() throws ExecutionException, InterruptedException { StreamRecordBatch r = it.next(); assertEquals(234L, r.getStreamId()); assertEquals(0L, r.getBaseOffset()); - assertEquals(5L, r.getRecordBatch().count()); - assertEquals(r4.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(5L, r.getCount()); + assertEquals(r4.getPayload(), r.getPayload()); assertFalse(it.hasNext()); r.release(); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCopierTest.java b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCopierTest.java index 350f523bf..d603ffa54 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCopierTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/StreamObjectCopierTest.java @@ -75,13 +75,13 @@ public void testCopy() throws ExecutionException, InterruptedException { StreamRecordBatch r = it.next(); assertEquals(streamId, r.getStreamId()); assertEquals(10L, r.getBaseOffset()); - assertEquals(5L, r.getRecordBatch().count()); - assertEquals(r1.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(5L, r.getCount()); + assertEquals(r1.getPayload(), r.getPayload()); r = it.next(); assertEquals(streamId, r.getStreamId()); assertEquals(15L, r.getBaseOffset()); - assertEquals(10L, r.getRecordBatch().count()); - assertEquals(r2.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(10L, r.getCount()); + assertEquals(r2.getPayload(), r.getPayload()); assertFalse(it.hasNext()); r.release(); } @@ -91,13 +91,13 @@ public void testCopy() throws ExecutionException, InterruptedException { StreamRecordBatch r = it.next(); assertEquals(streamId, r.getStreamId()); assertEquals(25L, r.getBaseOffset()); - assertEquals(8L, r.getRecordBatch().count()); - assertEquals(r3.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(8L, r.getCount()); + assertEquals(r3.getPayload(), r.getPayload()); r = it.next(); assertEquals(streamId, r.getStreamId()); assertEquals(33L, r.getBaseOffset()); - assertEquals(6L, r.getRecordBatch().count()); - assertEquals(r4.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload()); + assertEquals(6L, r.getCount()); + assertEquals(r4.getPayload(), r.getPayload()); assertFalse(it.hasNext()); r.release(); }