Skip to content

Commit

Permalink
feat(s3stream): monitor direct memory usage (#946)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Feb 27, 2024
1 parent 684e272 commit f972cfb
Show file tree
Hide file tree
Showing 16 changed files with 1,212 additions and 38 deletions.
1,062 changes: 1,062 additions & 0 deletions s3stream/src/main/java/com/automq/stream/WrappedByteBuf.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,118 @@

package com.automq.stream.s3;

import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.stats.ByteBufStats;
import com.automq.stream.WrappedByteBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectByteBufAlloc {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
private static long lastMetricLogTime = System.currentTimeMillis();
private static final Map<Integer, String> ALLOC_TYPE = new HashMap<>();

public static final int DEFAULT = 0;
public static final int ENCODE_RECORD = 1;
public static final int DECODE_RECORD = 2;
public static final int WRITE_INDEX_BLOCK = 3;
public static final int READ_INDEX_BLOCK = 4;
public static final int WRITE_DATA_BLOCK_HEADER = 5;
public static final int WRITE_FOOTER = 6;
public static final int STREAM_OBJECT_COMPACTION_READ = 7;
public static final int STREAM_OBJECT_COMPACTION_WRITE = 8;
public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9;
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;

static {
registerAllocType(DEFAULT, "default");
registerAllocType(ENCODE_RECORD, "write_record");
registerAllocType(DECODE_RECORD, "read_record");
registerAllocType(WRITE_INDEX_BLOCK, "write_index_block");
registerAllocType(READ_INDEX_BLOCK, "read_index_block");
registerAllocType(WRITE_DATA_BLOCK_HEADER, "write_data_block_header");
registerAllocType(WRITE_FOOTER, "write_footer");
registerAllocType(STREAM_OBJECT_COMPACTION_READ, "stream_object_compaction_read");
registerAllocType(STREAM_OBJECT_COMPACTION_WRITE, "stream_object_compaction_write");
registerAllocType(STREAM_SET_OBJECT_COMPACTION_READ, "stream_set_object_compaction_read");
registerAllocType(STREAM_SET_OBJECT_COMPACTION_WRITE, "stream_set_object_compaction_write");

}

public static CompositeByteBuf compositeByteBuffer() {
return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE);
}

public static ByteBuf byteBuffer(int initCapacity) {
return byteBuffer(initCapacity, null);
return byteBuffer(initCapacity, DEFAULT);
}

public static ByteBuf byteBuffer(int initCapacity, String name) {
public static ByteBuf byteBuffer(int initCapacity, int type) {
try {
if (name != null) {
ByteBufStats.getInstance().allocateByteBufSizeStats(name).record(MetricsLevel.DEBUG, initCapacity);
LongAdder usage = USAGE_STATS.compute(type, (k, v) -> {
if (v == null) {
v = new LongAdder();
}
v.add(initCapacity);
return v;
});
long now = System.currentTimeMillis();
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
LOGGER.info("Direct Memory usage: netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric());
}
return ALLOC.directBuffer(initCapacity);
return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} catch (OutOfMemoryError e) {
LOGGER.error("alloc direct buffer OOM", e);
LOGGER.error("alloc direct buffer OOM, netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric(), e);
System.err.println("alloc direct buffer OOM");
Runtime.getRuntime().halt(1);
throw e;
}
}

public static void registerAllocType(int type, String name) {
if (ALLOC_TYPE.containsKey(type)) {
throw new IllegalArgumentException("type already registered: " + type + "=" + ALLOC_TYPE.get(type));
}
ALLOC_TYPE.put(type, name);
}

public static Metric metric() {
return new Metric();
}

public static class Metric {
private final long usage;
private final Map<Integer, Long> detail;

public Metric() {
Map<Integer, Long> detail = new HashMap<>();
USAGE_STATS.forEach((k, v) -> detail.put(k, v.longValue()));
this.detail = detail;
this.usage = this.detail.values().stream().mapToLong(Long::longValue).sum();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usage=");
sb.append(usage);
sb.append(", detail=");
for (Map.Entry<Integer, Long> entry : detail.entrySet()) {
sb.append(entry.getKey()).append("/").append(ALLOC_TYPE.get(entry.getKey())).append("=").append(entry.getValue()).append(",");
}
sb.append("}");
return sb.toString();
}
}

public interface OOMHandler {
/**
* Try handle OOM exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.DirectByteBufAlloc.READ_INDEX_BLOCK;
import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;

Expand Down Expand Up @@ -144,7 +145,7 @@ public static BasicObjectInfo parse(ByteBuf objectTailBuf,

// trim the ByteBuf to avoid extra memory occupy.
ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize);
ByteBuf copy = DirectByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes());
ByteBuf copy = DirectByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes(), READ_INDEX_BLOCK);
indexBlockBuf.readBytes(copy, indexBlockBuf.readableBytes());
objectTailBuf.release();
indexBlockBuf = copy;
Expand Down
10 changes: 7 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_DATA_BLOCK_HEADER;
import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_FOOTER;
import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_INDEX_BLOCK;

/**
* Write stream records to a single object.
*/
Expand Down Expand Up @@ -193,7 +197,7 @@ class IndexBlock {
public IndexBlock() {
long nextPosition = 0;
int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size();
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, "write_index_block");
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, WRITE_INDEX_BLOCK);
for (DataBlock block : completedBlocks) {
ObjectStreamRange streamRange = block.getStreamRange();
new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()),
Expand Down Expand Up @@ -227,7 +231,7 @@ class DataBlock {
public DataBlock(long streamId, List<StreamRecordBatch> records) {
this.recordCount = records.size();
this.encodedBuf = DirectByteBufAlloc.compositeByteBuffer();
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE);
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE, WRITE_DATA_BLOCK_HEADER);
header.writeByte(DATA_BLOCK_MAGIC);
header.writeByte(DATA_BLOCK_DEFAULT_FLAG);
header.writeInt(recordCount);
Expand Down Expand Up @@ -262,7 +266,7 @@ class Footer {
private final ByteBuf buf;

public Footer(long indexStartPosition, int indexBlockLength) {
buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE);
buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE, WRITE_FOOTER);
// start position of index block
buf.writeLong(indexStartPosition);
// size of index block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_READ;
import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID;
import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET;

Expand Down Expand Up @@ -173,7 +175,7 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
long compactedEndOffset = objectGroup.get(objectGroup.size() - 1).endOffset();
List<Long> compactedObjectIds = new LinkedList<>();
CompositeByteBuf indexes = DirectByteBufAlloc.compositeByteBuffer();
Writer writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2);
Writer writer = s3Operator.writer(new Writer.Context(STREAM_OBJECT_COMPACTION_READ), ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2);
long groupStartOffset = -1L;
long groupStartPosition = -1L;
int groupSize = 0;
Expand All @@ -182,7 +184,7 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
for (S3ObjectMetadata object : objectGroup) {
try (ObjectReader reader = new ObjectReader(object, s3Operator)) {
ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().get();
ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE);
ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE);
Iterator<DataBlockIndex> it = basicObjectInfo.indexBlock().iterator();
long validDataBlockStartPosition = 0;
while (it.hasNext()) {
Expand Down Expand Up @@ -217,7 +219,7 @@ public Optional<CompactStreamObjectRequest> compact() throws ExecutionException,
}
}
if (lastIndex != null) {
ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE);
ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE);
new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset),
groupRecordCount, groupStartPosition, groupSize).encode(subIndexes);
indexes.addComponent(true, subIndexes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.automq.stream.s3.model.StreamRecordBatch;
import io.netty.buffer.ByteBuf;

import static com.automq.stream.s3.DirectByteBufAlloc.ENCODE_RECORD;

public class StreamRecordBatchCodec {
public static final byte MAGIC_V0 = 0x22;
public static final int HEADER_SIZE =
Expand All @@ -26,8 +28,7 @@ public class StreamRecordBatchCodec {

public static ByteBuf encode(StreamRecordBatch streamRecord) {
int totalLength = HEADER_SIZE + streamRecord.size(); // payload

ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength);
ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength, ENCODE_RECORD);
buf.writeByte(MAGIC_V0);
buf.writeLong(streamRecord.getStreamId());
buf.writeLong(streamRecord.getEpoch());
Expand All @@ -52,7 +53,7 @@ public static StreamRecordBatch duplicateDecode(ByteBuf buf) {
long baseOffset = buf.readLong();
int lastOffsetDelta = buf.readInt();
int payloadLength = buf.readInt();
ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength);
ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength, DirectByteBufAlloc.DECODE_RECORD);
buf.readBytes(payload);
return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ;

//TODO: refactor to reduce duplicate code with ObjectWriter
public class DataBlockReader {
private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReader.class);
Expand Down Expand Up @@ -193,7 +195,7 @@ private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
if (throttleBucket == null) {
return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes());
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
directBuf.writeBytes(buf);
buf.release();
return directBuf;
Expand All @@ -203,7 +205,7 @@ private CompletableFuture<ByteBuf> rangeRead0(long start, long end) {
.thenCompose(v ->
s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> {
// convert heap buffer to direct buffer
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes());
ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ);
directBuf.writeBytes(buf);
buf.release();
return directBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE;
import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ;
import static com.automq.stream.s3.operator.Writer.MIN_PART_SIZE;

//TODO: refactor to reduce duplicate code with ObjectWriter
Expand All @@ -51,7 +53,7 @@ public DataBlockWriter(long objectId, S3Operator s3Operator, int partSizeThresho
waitingUploadBlocks = new LinkedList<>();
waitingUploadBlockCfs = new ConcurrentHashMap<>();
completedBlocks = new LinkedList<>();
writer = s3Operator.writer(objectKey, ThrottleStrategy.THROTTLE_2);
writer = s3Operator.writer(new Writer.Context(STREAM_SET_OBJECT_COMPACTION_READ), objectKey, ThrottleStrategy.THROTTLE_2);
}

public long getObjectId() {
Expand Down Expand Up @@ -152,7 +154,7 @@ public IndexBlock() {

List<DataBlockIndex> dataBlockIndices = CompactionUtils.buildDataBlockIndicesFromGroup(
CompactionUtils.groupStreamDataBlocks(completedBlocks, new GroupByLimitPredicate(DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD)));
buf = DirectByteBufAlloc.byteBuffer(dataBlockIndices.size() * DataBlockIndex.BLOCK_INDEX_SIZE, "write_index_block");
buf = DirectByteBufAlloc.byteBuffer(dataBlockIndices.size() * DataBlockIndex.BLOCK_INDEX_SIZE, DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_WRITE);
for (DataBlockIndex dataBlockIndex : dataBlockIndices) {
dataBlockIndex.encode(buf);
}
Expand All @@ -177,7 +179,7 @@ class Footer {
private final ByteBuf buf;

public Footer() {
buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE);
buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE, STREAM_OBJECT_COMPACTION_WRITE);
buf.writeLong(indexBlock.position());
buf.writeInt(indexBlock.size());
buf.writeZero(40 - 8 - 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ private void write0(String path, ByteBuf data, CompletableFuture<Void> cf) {
}

@Override
public Writer writer(String path, ThrottleStrategy throttleStrategy) {
return new ProxyWriter(this, path, throttleStrategy);
public Writer writer(Writer.Context context, String path, ThrottleStrategy throttleStrategy) {
return new ProxyWriter(context, this, path, throttleStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public CompletableFuture<Void> write(String path, ByteBuf data, ThrottleStrategy
}

@Override
public Writer writer(String path, ThrottleStrategy throttleStrategy) {
public Writer writer(Writer.Context context, String path, ThrottleStrategy throttleStrategy) {
ByteBuf buf = Unpooled.buffer();
storage.put(path, buf);
return new Writer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import java.util.stream.Collectors;
import software.amazon.awssdk.services.s3.model.CompletedPart;


public class MultiPartWriter implements Writer {
private static final long MAX_MERGE_WRITE_SIZE = 16L * 1024 * 1024;
final CompletableFuture<String> uploadIdCf = new CompletableFuture<>();
private final Context context;
private final S3Operator operator;
private final String path;
private final List<CompletableFuture<CompletedPart>> parts = new LinkedList<>();
Expand All @@ -46,7 +48,8 @@ public class MultiPartWriter implements Writer {
private CompletableFuture<Void> closeCf;
private ObjectPart objectPart = null;

public MultiPartWriter(S3Operator operator, String path, long minPartSize, ThrottleStrategy throttleStrategy) {
public MultiPartWriter(Context context, S3Operator operator, String path, long minPartSize, ThrottleStrategy throttleStrategy) {
this.context = context;
this.operator = operator;
this.path = path;
this.minPartSize = minPartSize;
Expand Down Expand Up @@ -200,7 +203,7 @@ public void write(ByteBuf data) {
public void copyOnWrite() {
int size = partBuf.readableBytes();
if (size > 0) {
ByteBuf buf = DirectByteBufAlloc.byteBuffer(size);
ByteBuf buf = DirectByteBufAlloc.byteBuffer(size, context.allocType());
buf.writeBytes(partBuf.duplicate());
CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf);
this.partBuf.release();
Expand Down
Loading

0 comments on commit f972cfb

Please sign in to comment.