Skip to content

Commit

Permalink
feat(s3stream): use sequential memory alloc for stream set object com…
Browse files Browse the repository at this point in the history
…paction

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Feb 28, 2024
1 parent 336fe9e commit 856735e
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
public class DirectByteBufSeqAlloc {
public static final int HUGE_BUF_SIZE = 8 * 1024 * 1024;
// why not use ThreadLocal? the partition open has too much threads
final AtomicReference<HugeBuf>[] hugeBufArray = new AtomicReference[8];
final AtomicReference<HugeBuf>[] hugeBufArray;
private final int allocType;

public DirectByteBufSeqAlloc(int allocType) {
public DirectByteBufSeqAlloc(int allocType, int concurrency) {
this.allocType = allocType;
hugeBufArray = new AtomicReference[concurrency];
for (int i = 0; i < hugeBufArray.length; i++) {
hugeBufArray[i] = new AtomicReference<>(new HugeBuf(ByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)));
}
Expand Down
20 changes: 10 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/ByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ByteBufAlloc {
public static final int STREAM_OBJECT_COMPACTION_WRITE = 8;
public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9;
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static DirectByteBufAllocMetric directByteBufAllocMetric = null;
public static ByteBufAllocMetric byteBufAllocMetric = null;

static {
registerAllocType(DEFAULT, "default");
Expand Down Expand Up @@ -87,21 +87,21 @@ public static ByteBuf byteBuffer(int initCapacity, int type) {
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", ByteBufAlloc.directByteBufAllocMetric);
ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric();
LOGGER.info("Buffer usage: {}", ByteBufAlloc.byteBufAllocMetric);
}
return new WrappedByteBuf(BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return BUFFER_USAGE_HEAPED ? ALLOC.heapBuffer(initCapacity) : ALLOC.directBuffer(initCapacity);
}
} catch (OutOfMemoryError e) {
if (MEMORY_USAGE_DETECT) {
ByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", ByteBufAlloc.directByteBufAllocMetric, e);
ByteBufAlloc.byteBufAllocMetric = new ByteBufAllocMetric();
LOGGER.error("alloc buffer OOM, {}", ByteBufAlloc.byteBufAllocMetric, e);
} else {
LOGGER.error("alloc direct buffer OOM", e);
LOGGER.error("alloc buffer OOM", e);
}
System.err.println("alloc direct buffer OOM");
System.err.println("alloc buffer OOM");
Runtime.getRuntime().halt(1);
throw e;
}
Expand All @@ -114,12 +114,12 @@ public static void registerAllocType(int type, String name) {
ALLOC_TYPE.put(type, name);
}

public static class DirectByteBufAllocMetric {
public static class ByteBufAllocMetric {
private final long usedMemory;
private final long allocatedMemory;
private final Map<String, Long> detail = new HashMap<>();

public DirectByteBufAllocMetric() {
public ByteBufAllocMetric() {
USAGE_STATS.forEach((k, v) -> {
detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue());
});
Expand All @@ -138,7 +138,7 @@ public Map<String, Long> getDetailedMap() {

@Override
public String toString() {
StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedMemory=");
StringBuilder sb = new StringBuilder("ByteBufAllocMetric{usedMemory=");
sb.append(usedMemory);
sb.append(", allocatedMemory=");
sb.append(allocatedMemory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class StreamRecordBatchCodec {
+ 8 // baseOffset
+ 4 // lastOffsetDelta
+ 4; // payload length
private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD);
private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD, 8);

public static ByteBuf encode(StreamRecordBatch streamRecord) {
int totalLength = HEADER_SIZE + streamRecord.size(); // payload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3.compact.operator;

import com.automq.stream.DirectByteBufSeqAlloc;
import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.DataBlockIndex;
import com.automq.stream.s3.ObjectReader;
Expand Down Expand Up @@ -39,6 +40,7 @@
//TODO: refactor to reduce duplicate code with ObjectWriter
public class DataBlockReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReader.class);
private static final DirectByteBufSeqAlloc DIRECT_ALLOC = new DirectByteBufSeqAlloc(STREAM_SET_OBJECT_COMPACTION_READ, 1);
private final S3ObjectMetadata metadata;
private final String objectKey;
private final S3Operator s3Operator;
Expand Down Expand Up @@ -195,7 +197,7 @@ private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes());
directBuf.writeBytes(buf);
buf.release();
return directBuf;
Expand All @@ -205,7 +207,7 @@ private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
.thenCompose(v ->
s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = ByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
ByteBuf directBuf = DIRECT_ALLOC.byteBuffer(buf.readableBytes());
directBuf.writeBytes(buf);
buf.release();
return directBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ public static void initMetrics(Meter meter, String prefix) {
.setUnit("bytes")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) {
Map<String, Long> allocateSizeMap = ByteBufAlloc.directByteBufAllocMetric.getDetailedMap();
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.byteBufAllocMetric != null) {
Map<String, Long> allocateSizeMap = ByteBufAlloc.byteBufAllocMetric.getDetailedMap();
for (Map.Entry<String, Long> entry : allocateSizeMap.entrySet()) {
result.record(entry.getValue(), ALLOC_TYPE_ATTRIBUTES.get(entry.getKey()));
}
Expand All @@ -284,8 +284,8 @@ public static void initMetrics(Meter meter, String prefix) {
.setUnit("bytes")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.directByteBufAllocMetric != null) {
result.record(ByteBufAlloc.directByteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes());
if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && ByteBufAlloc.byteBufAllocMetric != null) {
result.record(ByteBufAlloc.byteBufAllocMetric.getUsedMemory(), metricsConfig.getBaseAttributes());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DirectByteBufSeqAllocTest {

@Test
public void testAlloc() {
DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0);
DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0, 1);

AtomicReference<DirectByteBufSeqAlloc.HugeBuf> bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)];

Expand Down

0 comments on commit 856735e

Please sign in to comment.