Skip to content

Commit

Permalink
feat(kafka_issues500): add pooledbuf read options (#782)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Dec 2, 2023
1 parent 0d63319 commit 46df6ea
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 65 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.0.1-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.6.5-SNAPSHOT</s3stream.version>
<s3stream.version>0.6.6-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.6.5-SNAPSHOT</version>
<version>0.6.6-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
13 changes: 13 additions & 0 deletions s3stream/src/main/java/com/automq/stream/api/ReadOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ public class ReadOptions {
public static final ReadOptions DEFAULT = new ReadOptions();

private boolean fastRead;
private boolean pooledBuf;

public boolean fastRead() {
return fastRead;
}

public boolean pooledBuf() {
return pooledBuf;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -43,6 +48,14 @@ public Builder fastRead(boolean fastRead) {
return this;
}

/**
* Use pooled buffer for reading. The caller is responsible for releasing the buffer.
*/
public Builder pooledBuf(boolean pooledBuf) {
options.pooledBuf = pooledBuf;
return this;
}

public ReadOptions build() {
return options;
}
Expand Down
67 changes: 58 additions & 9 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import com.automq.stream.DefaultAppendResult;
import com.automq.stream.RecordBatchWithContextWrapper;
import com.automq.stream.api.AppendResult;
import com.automq.stream.api.ReadOptions;
import com.automq.stream.api.exceptions.ErrorCode;
import com.automq.stream.api.FetchResult;
import com.automq.stream.api.ReadOptions;
import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.RecordBatchWithContext;
import com.automq.stream.api.Stream;
import com.automq.stream.api.exceptions.ErrorCode;
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.cache.CacheAccessType;
Expand All @@ -37,9 +37,14 @@
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.FutureUtil;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -50,8 +55,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.utils.FutureUtil.exec;
import static com.automq.stream.utils.FutureUtil.propagate;
Expand Down Expand Up @@ -230,14 +233,14 @@ private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset,
return FutureUtil.failedFuture(new IllegalArgumentException(String.format("fetch startOffset %s is greater than endOffset %s", startOffset, endOffset)));
}
if (startOffset == endOffset) {
return CompletableFuture.completedFuture(new DefaultFetchResult(Collections.emptyList(), CacheAccessType.DELTA_WAL_CACHE_HIT));
return CompletableFuture.completedFuture(new DefaultFetchResult(Collections.emptyList(), CacheAccessType.DELTA_WAL_CACHE_HIT, false));
}
return storage.read(streamId, startOffset, endOffset, maxBytes, readOptions).thenApply(dataBlock -> {
List<StreamRecordBatch> records = dataBlock.getRecords();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size());
}
return new DefaultFetchResult(records, dataBlock.getCacheAccessType());
return new DefaultFetchResult(records, dataBlock.getCacheAccessType(), readOptions.pooledBuf());
});
}

Expand Down Expand Up @@ -356,13 +359,20 @@ private void updateConfirmOffset(long newOffset) {
}

static class DefaultFetchResult implements FetchResult {
private final List<StreamRecordBatch> pooledRecords;
private final List<RecordBatchWithContext> records;
private final CacheAccessType cacheAccessType;
private final boolean pooledBuf;
private volatile boolean freed = false;

public DefaultFetchResult(List<StreamRecordBatch> streamRecords, CacheAccessType cacheAccessType) {
this.records = streamRecords.stream().map(r -> new RecordBatchWithContextWrapper(r.getRecordBatch(), r.getBaseOffset())).collect(Collectors.toList());
public DefaultFetchResult(List<StreamRecordBatch> streamRecords, CacheAccessType cacheAccessType, boolean pooledBuf) {
this.pooledRecords = streamRecords;
this.pooledBuf = pooledBuf;
this.records = streamRecords.stream().map(r -> new RecordBatchWithContextWrapper(covert(r, pooledBuf), r.getBaseOffset())).collect(Collectors.toList());
this.cacheAccessType = cacheAccessType;
streamRecords.forEach(StreamRecordBatch::release);
if (!pooledBuf) {
streamRecords.forEach(StreamRecordBatch::release);
}
}

@Override
Expand All @@ -375,6 +385,45 @@ public CacheAccessType getCacheAccessType() {
return cacheAccessType;
}

@Override
public void free() {
if (!freed && pooledBuf) {
pooledRecords.forEach(StreamRecordBatch::release);
}
freed = true;
}

private static RecordBatch covert(StreamRecordBatch streamRecordBatch, boolean pooledBuf) {
ByteBuffer buf;
if (pooledBuf) {
buf = streamRecordBatch.getPayload().nioBuffer();
} else {
buf = ByteBuffer.allocate(streamRecordBatch.size());
streamRecordBatch.getPayload().duplicate().readBytes(buf);
buf.flip();
}
return new RecordBatch() {
@Override
public int count() {
return streamRecordBatch.getCount();
}

@Override
public long baseTimestamp() {
return streamRecordBatch.getEpoch();
}

@Override
public Map<String, String> properties() {
return Collections.emptyMap();
}

@Override
public ByteBuffer rawPayload() {
return buf;
}
};
}
}

static class Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class StreamRecordBatchCodec {
public static final byte MAGIC_V0 = 0x22;
Expand All @@ -34,17 +33,16 @@ public static ByteBuf encode(StreamRecordBatch streamRecord) {
+ 8 // baseOffset
+ 4 // lastOffsetDelta
+ 4 // payload length
+ streamRecord.getRecordBatch().rawPayload().remaining(); // payload
+ streamRecord.size(); // payload

ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength);
buf.writeByte(MAGIC_V0);
buf.writeLong(streamRecord.getStreamId());
buf.writeLong(streamRecord.getEpoch());
buf.writeLong(streamRecord.getBaseOffset());
buf.writeInt(streamRecord.getRecordBatch().count());
ByteBuffer payload = streamRecord.getRecordBatch().rawPayload().duplicate();
buf.writeInt(payload.remaining());
buf.writeBytes(payload);
buf.writeInt(streamRecord.getCount());
buf.writeInt(streamRecord.size());
buf.writeBytes(streamRecord.getPayload().duplicate());
return buf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@

package com.automq.stream.s3.model;

import com.automq.stream.api.RecordBatch;
import com.automq.stream.s3.StreamRecordBatchCodec;
import io.netty.buffer.ByteBuf;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

public class StreamRecordBatch implements Comparable<StreamRecordBatch> {
private final long streamId;
private final long epoch;
Expand Down Expand Up @@ -68,35 +63,12 @@ public long getLastOffset() {
return baseOffset + count;
}

public ByteBuf getPayload() {
return payload;
public int getCount() {
return count;
}

public RecordBatch getRecordBatch() {
ByteBuffer buf = ByteBuffer.allocate(payload.readableBytes());
payload.duplicate().readBytes(buf);
buf.flip();
return new RecordBatch() {
@Override
public int count() {
return count;
}

@Override
public long baseTimestamp() {
return 0;
}

@Override
public Map<String, String> properties() {
return Collections.emptyMap();
}

@Override
public ByteBuffer rawPayload() {
return buf;
}
};
public ByteBuf getPayload() {
return payload;
}

public int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ public void testWrite() throws ExecutionException, InterruptedException {
StreamRecordBatch r = it.next();
assertEquals(233L, r.getStreamId());
assertEquals(10L, r.getBaseOffset());
assertEquals(5L, r.getRecordBatch().count());
assertEquals(r1.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(5L, r.getCount());
assertEquals(r1.getPayload(), r.getPayload());
r.release();
r = it.next();
assertEquals(233L, r.getStreamId());
assertEquals(15L, r.getBaseOffset());
assertEquals(10L, r.getRecordBatch().count());
assertEquals(r2.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(10L, r.getCount());
assertEquals(r2.getPayload(), r.getPayload());
assertFalse(it.hasNext());
r.release();
}
Expand All @@ -90,8 +90,8 @@ public void testWrite() throws ExecutionException, InterruptedException {
StreamRecordBatch r = it.next();
assertEquals(233L, r.getStreamId());
assertEquals(25L, r.getBaseOffset());
assertEquals(5L, r.getRecordBatch().count());
assertEquals(r3.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(5L, r.getCount());
assertEquals(r3.getPayload(), r.getPayload());
r.release();
}

Expand All @@ -103,8 +103,8 @@ public void testWrite() throws ExecutionException, InterruptedException {
StreamRecordBatch r = it.next();
assertEquals(234L, r.getStreamId());
assertEquals(0L, r.getBaseOffset());
assertEquals(5L, r.getRecordBatch().count());
assertEquals(r4.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(5L, r.getCount());
assertEquals(r4.getPayload(), r.getPayload());
assertFalse(it.hasNext());
r.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ public void testCopy() throws ExecutionException, InterruptedException {
StreamRecordBatch r = it.next();
assertEquals(streamId, r.getStreamId());
assertEquals(10L, r.getBaseOffset());
assertEquals(5L, r.getRecordBatch().count());
assertEquals(r1.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(5L, r.getCount());
assertEquals(r1.getPayload(), r.getPayload());
r = it.next();
assertEquals(streamId, r.getStreamId());
assertEquals(15L, r.getBaseOffset());
assertEquals(10L, r.getRecordBatch().count());
assertEquals(r2.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(10L, r.getCount());
assertEquals(r2.getPayload(), r.getPayload());
assertFalse(it.hasNext());
r.release();
}
Expand All @@ -91,13 +91,13 @@ public void testCopy() throws ExecutionException, InterruptedException {
StreamRecordBatch r = it.next();
assertEquals(streamId, r.getStreamId());
assertEquals(25L, r.getBaseOffset());
assertEquals(8L, r.getRecordBatch().count());
assertEquals(r3.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(8L, r.getCount());
assertEquals(r3.getPayload(), r.getPayload());
r = it.next();
assertEquals(streamId, r.getStreamId());
assertEquals(33L, r.getBaseOffset());
assertEquals(6L, r.getRecordBatch().count());
assertEquals(r4.getRecordBatch().rawPayload(), r.getRecordBatch().rawPayload());
assertEquals(6L, r.getCount());
assertEquals(r4.getPayload(), r.getPayload());
assertFalse(it.hasNext());
r.release();
}
Expand Down

0 comments on commit 46df6ea

Please sign in to comment.