From cdb99548598bef5ba504302821d7ab1bca96be75 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Tue, 27 Feb 2024 12:12:11 +0800 Subject: [PATCH] feat(s3stream): monitor direct memory usage Signed-off-by: Robin Han --- .../com/automq/stream/WrappedByteBuf.java | 1062 +++++++++++++++++ .../automq/stream/s3/DirectByteBufAlloc.java | 94 +- .../com/automq/stream/s3/ObjectReader.java | 3 +- .../com/automq/stream/s3/ObjectWriter.java | 10 +- .../stream/s3/StreamObjectCompactor.java | 8 +- .../stream/s3/StreamRecordBatchCodec.java | 7 +- .../s3/compact/operator/DataBlockReader.java | 6 +- .../s3/compact/operator/DataBlockWriter.java | 8 +- .../stream/s3/operator/DefaultS3Operator.java | 4 +- .../stream/s3/operator/MemoryS3Operator.java | 2 +- .../stream/s3/operator/MultiPartWriter.java | 7 +- .../stream/s3/operator/ProxyWriter.java | 13 +- .../automq/stream/s3/operator/S3Operator.java | 4 +- .../com/automq/stream/s3/operator/Writer.java | 16 + .../s3/operator/MultiPartWriterTest.java | 4 +- .../stream/s3/operator/ProxyWriterTest.java | 2 +- 16 files changed, 1212 insertions(+), 38 deletions(-) create mode 100644 s3stream/src/main/java/com/automq/stream/WrappedByteBuf.java diff --git a/s3stream/src/main/java/com/automq/stream/WrappedByteBuf.java b/s3stream/src/main/java/com/automq/stream/WrappedByteBuf.java new file mode 100644 index 000000000..cb32e041a --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/WrappedByteBuf.java @@ -0,0 +1,1062 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.automq.stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.util.ByteProcessor; +import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.StringUtil; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +/** + * Modify base on io.netty.buffer.WrappedByteBuf + * Wraps another {@link ByteBuf}. + *

+ * It's important that the {@link #readerIndex()} and {@link #writerIndex()} will not do any adjustments on the + * indices on the fly because of internal optimizations made by {@link ByteBufUtil#writeAscii(ByteBuf, CharSequence)} + * and {@link ByteBufUtil#writeUtf8(ByteBuf, CharSequence)}. + */ +public class WrappedByteBuf extends ByteBuf { + private final ByteBuf root; + protected final ByteBuf buf; + private final Runnable releaseHook; + + public WrappedByteBuf(ByteBuf buf, Runnable releaseHook) { + this(buf, buf, releaseHook); + } + + public WrappedByteBuf(ByteBuf root, ByteBuf buf, Runnable releaseHook) { + this.root = root; + this.buf = ObjectUtil.checkNotNull(buf, "buf"); + this.releaseHook = ObjectUtil.checkNotNull(releaseHook, "releaseHook"); + } + + @Override + public final boolean hasMemoryAddress() { + return buf.hasMemoryAddress(); + } + + @Override + public boolean isContiguous() { + return buf.isContiguous(); + } + + @Override + public final long memoryAddress() { + return buf.memoryAddress(); + } + + @Override + public final int capacity() { + return buf.capacity(); + } + + @Override + public ByteBuf capacity(int newCapacity) { + buf.capacity(newCapacity); + return this; + } + + @Override + public final int maxCapacity() { + return buf.maxCapacity(); + } + + @Override + public final ByteBufAllocator alloc() { + return buf.alloc(); + } + + @Override + public final ByteOrder order() { + return buf.order(); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + return new WrappedByteBuf(root, buf.order(endianness), releaseHook); + } + + @Override + public final ByteBuf unwrap() { + return buf; + } + + @Override + public ByteBuf asReadOnly() { + return buf.asReadOnly(); + } + + @Override + public boolean isReadOnly() { + return buf.isReadOnly(); + } + + @Override + public final boolean isDirect() { + return buf.isDirect(); + } + + @Override + public final int readerIndex() { + return buf.readerIndex(); + } + + @Override + public final ByteBuf readerIndex(int readerIndex) { + buf.readerIndex(readerIndex); + return this; + } + + @Override + public final int writerIndex() { + return buf.writerIndex(); + } + + @Override + public final ByteBuf writerIndex(int writerIndex) { + buf.writerIndex(writerIndex); + return this; + } + + @Override + public ByteBuf setIndex(int readerIndex, int writerIndex) { + buf.setIndex(readerIndex, writerIndex); + return this; + } + + @Override + public final int readableBytes() { + return buf.readableBytes(); + } + + @Override + public final int writableBytes() { + return buf.writableBytes(); + } + + @Override + public final int maxWritableBytes() { + return buf.maxWritableBytes(); + } + + @Override + public int maxFastWritableBytes() { + return buf.maxFastWritableBytes(); + } + + @Override + public final boolean isReadable() { + return buf.isReadable(); + } + + @Override + public final boolean isWritable() { + return buf.isWritable(); + } + + @Override + public final ByteBuf clear() { + buf.clear(); + return this; + } + + @Override + public final ByteBuf markReaderIndex() { + buf.markReaderIndex(); + return this; + } + + @Override + public final ByteBuf resetReaderIndex() { + buf.resetReaderIndex(); + return this; + } + + @Override + public final ByteBuf markWriterIndex() { + buf.markWriterIndex(); + return this; + } + + @Override + public final ByteBuf resetWriterIndex() { + buf.resetWriterIndex(); + return this; + } + + @Override + public ByteBuf discardReadBytes() { + buf.discardReadBytes(); + return this; + } + + @Override + public ByteBuf discardSomeReadBytes() { + buf.discardSomeReadBytes(); + return this; + } + + @Override + public ByteBuf ensureWritable(int minWritableBytes) { + buf.ensureWritable(minWritableBytes); + return this; + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + return buf.ensureWritable(minWritableBytes, force); + } + + @Override + public boolean getBoolean(int index) { + return buf.getBoolean(index); + } + + @Override + public byte getByte(int index) { + return buf.getByte(index); + } + + @Override + public short getUnsignedByte(int index) { + return buf.getUnsignedByte(index); + } + + @Override + public short getShort(int index) { + return buf.getShort(index); + } + + @Override + public short getShortLE(int index) { + return buf.getShortLE(index); + } + + @Override + public int getUnsignedShort(int index) { + return buf.getUnsignedShort(index); + } + + @Override + public int getUnsignedShortLE(int index) { + return buf.getUnsignedShortLE(index); + } + + @Override + public int getMedium(int index) { + return buf.getMedium(index); + } + + @Override + public int getMediumLE(int index) { + return buf.getMediumLE(index); + } + + @Override + public int getUnsignedMedium(int index) { + return buf.getUnsignedMedium(index); + } + + @Override + public int getUnsignedMediumLE(int index) { + return buf.getUnsignedMediumLE(index); + } + + @Override + public int getInt(int index) { + return buf.getInt(index); + } + + @Override + public int getIntLE(int index) { + return buf.getIntLE(index); + } + + @Override + public long getUnsignedInt(int index) { + return buf.getUnsignedInt(index); + } + + @Override + public long getUnsignedIntLE(int index) { + return buf.getUnsignedIntLE(index); + } + + @Override + public long getLong(int index) { + return buf.getLong(index); + } + + @Override + public long getLongLE(int index) { + return buf.getLongLE(index); + } + + @Override + public char getChar(int index) { + return buf.getChar(index); + } + + @Override + public float getFloat(int index) { + return buf.getFloat(index); + } + + @Override + public double getDouble(int index) { + return buf.getDouble(index); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst) { + buf.getBytes(index, dst); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int length) { + buf.getBytes(index, dst, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + buf.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst) { + buf.getBytes(index, dst); + return this; + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + buf.getBytes(index, dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + buf.getBytes(index, dst); + return this; + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + buf.getBytes(index, out, length); + return this; + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + return buf.getBytes(index, out, length); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + return buf.getBytes(index, out, position, length); + } + + @Override + public CharSequence getCharSequence(int index, int length, Charset charset) { + return buf.getCharSequence(index, length, charset); + } + + @Override + public ByteBuf setBoolean(int index, boolean value) { + buf.setBoolean(index, value); + return this; + } + + @Override + public ByteBuf setByte(int index, int value) { + buf.setByte(index, value); + return this; + } + + @Override + public ByteBuf setShort(int index, int value) { + buf.setShort(index, value); + return this; + } + + @Override + public ByteBuf setShortLE(int index, int value) { + buf.setShortLE(index, value); + return this; + } + + @Override + public ByteBuf setMedium(int index, int value) { + buf.setMedium(index, value); + return this; + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + buf.setMediumLE(index, value); + return this; + } + + @Override + public ByteBuf setInt(int index, int value) { + buf.setInt(index, value); + return this; + } + + @Override + public ByteBuf setIntLE(int index, int value) { + buf.setIntLE(index, value); + return this; + } + + @Override + public ByteBuf setLong(int index, long value) { + buf.setLong(index, value); + return this; + } + + @Override + public ByteBuf setLongLE(int index, long value) { + buf.setLongLE(index, value); + return this; + } + + @Override + public ByteBuf setChar(int index, int value) { + buf.setChar(index, value); + return this; + } + + @Override + public ByteBuf setFloat(int index, float value) { + buf.setFloat(index, value); + return this; + } + + @Override + public ByteBuf setDouble(int index, double value) { + buf.setDouble(index, value); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src) { + buf.setBytes(index, src); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int length) { + buf.setBytes(index, src, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + buf.setBytes(index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src) { + buf.setBytes(index, src); + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + buf.setBytes(index, src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + buf.setBytes(index, src); + return this; + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + return buf.setBytes(index, in, length); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + return buf.setBytes(index, in, length); + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + return buf.setBytes(index, in, position, length); + } + + @Override + public ByteBuf setZero(int index, int length) { + buf.setZero(index, length); + return this; + } + + @Override + public int setCharSequence(int index, CharSequence sequence, Charset charset) { + return buf.setCharSequence(index, sequence, charset); + } + + @Override + public boolean readBoolean() { + return buf.readBoolean(); + } + + @Override + public byte readByte() { + return buf.readByte(); + } + + @Override + public short readUnsignedByte() { + return buf.readUnsignedByte(); + } + + @Override + public short readShort() { + return buf.readShort(); + } + + @Override + public short readShortLE() { + return buf.readShortLE(); + } + + @Override + public int readUnsignedShort() { + return buf.readUnsignedShort(); + } + + @Override + public int readUnsignedShortLE() { + return buf.readUnsignedShortLE(); + } + + @Override + public int readMedium() { + return buf.readMedium(); + } + + @Override + public int readMediumLE() { + return buf.readMediumLE(); + } + + @Override + public int readUnsignedMedium() { + return buf.readUnsignedMedium(); + } + + @Override + public int readUnsignedMediumLE() { + return buf.readUnsignedMediumLE(); + } + + @Override + public int readInt() { + return buf.readInt(); + } + + @Override + public int readIntLE() { + return buf.readIntLE(); + } + + @Override + public long readUnsignedInt() { + return buf.readUnsignedInt(); + } + + @Override + public long readUnsignedIntLE() { + return buf.readUnsignedIntLE(); + } + + @Override + public long readLong() { + return buf.readLong(); + } + + @Override + public long readLongLE() { + return buf.readLongLE(); + } + + @Override + public char readChar() { + return buf.readChar(); + } + + @Override + public float readFloat() { + return buf.readFloat(); + } + + @Override + public double readDouble() { + return buf.readDouble(); + } + + @Override + public ByteBuf readBytes(int length) { + return buf.readBytes(length); + } + + @Override + public ByteBuf readSlice(int length) { + return new WrappedByteBuf(root, buf.readSlice(length), releaseHook); + } + + @Override + public ByteBuf readRetainedSlice(int length) { + return new WrappedByteBuf(root, buf.readRetainedSlice(length), releaseHook); + } + + @Override + public ByteBuf readBytes(ByteBuf dst) { + buf.readBytes(dst); + return this; + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int length) { + buf.readBytes(dst, length); + return this; + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { + buf.readBytes(dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf readBytes(byte[] dst) { + buf.readBytes(dst); + return this; + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + buf.readBytes(dst, dstIndex, length); + return this; + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + buf.readBytes(dst); + return this; + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + buf.readBytes(out, length); + return this; + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + return buf.readBytes(out, length); + } + + @Override + public int readBytes(FileChannel out, long position, int length) throws IOException { + return buf.readBytes(out, position, length); + } + + @Override + public CharSequence readCharSequence(int length, Charset charset) { + return buf.readCharSequence(length, charset); + } + + @Override + public ByteBuf skipBytes(int length) { + buf.skipBytes(length); + return this; + } + + @Override + public ByteBuf writeBoolean(boolean value) { + buf.writeBoolean(value); + return this; + } + + @Override + public ByteBuf writeByte(int value) { + buf.writeByte(value); + return this; + } + + @Override + public ByteBuf writeShort(int value) { + buf.writeShort(value); + return this; + } + + @Override + public ByteBuf writeShortLE(int value) { + buf.writeShortLE(value); + return this; + } + + @Override + public ByteBuf writeMedium(int value) { + buf.writeMedium(value); + return this; + } + + @Override + public ByteBuf writeMediumLE(int value) { + buf.writeMediumLE(value); + return this; + } + + @Override + public ByteBuf writeInt(int value) { + buf.writeInt(value); + return this; + } + + @Override + public ByteBuf writeIntLE(int value) { + buf.writeIntLE(value); + return this; + } + + @Override + public ByteBuf writeLong(long value) { + buf.writeLong(value); + return this; + } + + @Override + public ByteBuf writeLongLE(long value) { + buf.writeLongLE(value); + return this; + } + + @Override + public ByteBuf writeChar(int value) { + buf.writeChar(value); + return this; + } + + @Override + public ByteBuf writeFloat(float value) { + buf.writeFloat(value); + return this; + } + + @Override + public ByteBuf writeDouble(double value) { + buf.writeDouble(value); + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + buf.writeBytes(src); + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int length) { + buf.writeBytes(src, length); + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { + buf.writeBytes(src, srcIndex, length); + return this; + } + + @Override + public ByteBuf writeBytes(byte[] src) { + buf.writeBytes(src); + return this; + } + + @Override + public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { + buf.writeBytes(src, srcIndex, length); + return this; + } + + @Override + public ByteBuf writeBytes(ByteBuffer src) { + buf.writeBytes(src); + return this; + } + + @Override + public int writeBytes(InputStream in, int length) throws IOException { + return buf.writeBytes(in, length); + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) throws IOException { + return buf.writeBytes(in, length); + } + + @Override + public int writeBytes(FileChannel in, long position, int length) throws IOException { + return buf.writeBytes(in, position, length); + } + + @Override + public ByteBuf writeZero(int length) { + buf.writeZero(length); + return this; + } + + @Override + public int writeCharSequence(CharSequence sequence, Charset charset) { + return buf.writeCharSequence(sequence, charset); + } + + @Override + public int indexOf(int fromIndex, int toIndex, byte value) { + return buf.indexOf(fromIndex, toIndex, value); + } + + @Override + public int bytesBefore(byte value) { + return buf.bytesBefore(value); + } + + @Override + public int bytesBefore(int length, byte value) { + return buf.bytesBefore(length, value); + } + + @Override + public int bytesBefore(int index, int length, byte value) { + return buf.bytesBefore(index, length, value); + } + + @Override + public int forEachByte(ByteProcessor processor) { + return buf.forEachByte(processor); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + return buf.forEachByte(index, length, processor); + } + + @Override + public int forEachByteDesc(ByteProcessor processor) { + return buf.forEachByteDesc(processor); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + return buf.forEachByteDesc(index, length, processor); + } + + @Override + public ByteBuf copy() { + return buf.copy(); + } + + @Override + public ByteBuf copy(int index, int length) { + return buf.copy(index, length); + } + + @Override + public ByteBuf slice() { + return new WrappedByteBuf(root, buf.slice(), releaseHook); + } + + @Override + public ByteBuf retainedSlice() { + return new WrappedByteBuf(root, buf.retainedSlice(), releaseHook); + } + + @Override + public ByteBuf slice(int index, int length) { + return new WrappedByteBuf(root, buf.slice(index, length), releaseHook); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + return new WrappedByteBuf(root, buf.retainedSlice(index, length), releaseHook); + } + + @Override + public ByteBuf duplicate() { + return new WrappedByteBuf(root, buf.duplicate(), releaseHook); + } + + @Override + public ByteBuf retainedDuplicate() { + return new WrappedByteBuf(root, buf.retainedDuplicate(), releaseHook); + } + + @Override + public int nioBufferCount() { + return buf.nioBufferCount(); + } + + @Override + public ByteBuffer nioBuffer() { + return buf.nioBuffer(); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + return buf.nioBuffer(index, length); + } + + @Override + public ByteBuffer[] nioBuffers() { + return buf.nioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return buf.nioBuffers(index, length); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + return buf.internalNioBuffer(index, length); + } + + @Override + public boolean hasArray() { + return buf.hasArray(); + } + + @Override + public byte[] array() { + return buf.array(); + } + + @Override + public int arrayOffset() { + return buf.arrayOffset(); + } + + @Override + public String toString(Charset charset) { + return buf.toString(charset); + } + + @Override + public String toString(int index, int length, Charset charset) { + return buf.toString(index, length, charset); + } + + @Override + public int hashCode() { + return buf.hashCode(); + } + + @Override + @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") + public boolean equals(Object obj) { + return buf.equals(obj); + } + + @Override + public int compareTo(ByteBuf buffer) { + return buf.compareTo(buffer); + } + + @Override + public String toString() { + return StringUtil.simpleClassName(this) + '(' + buf.toString() + ')'; + } + + @Override + public ByteBuf retain(int increment) { + buf.retain(increment); + return this; + } + + @Override + public ByteBuf retain() { + buf.retain(); + return this; + } + + @Override + public ByteBuf touch() { + buf.touch(); + return this; + } + + @Override + public ByteBuf touch(Object hint) { + buf.touch(hint); + return this; + } + + @Override + public final boolean isReadable(int size) { + return buf.isReadable(size); + } + + @Override + public final boolean isWritable(int size) { + return buf.isWritable(size); + } + + @Override + public final int refCnt() { + return buf.refCnt(); + } + + @Override + public boolean release() { + boolean rst = buf.release(); + if (rst && root != null && root.refCnt() == 0) { + releaseHook.run(); + } + return rst; + } + + @Override + public boolean release(int decrement) { + boolean rst = buf.release(decrement); + if (rst && root != null && root.refCnt() == 0) { + releaseHook.run(); + } + return rst; + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java index b88ea2160..9e0a652d5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java @@ -11,40 +11,118 @@ package com.automq.stream.s3; -import com.automq.stream.s3.metrics.MetricsLevel; -import com.automq.stream.s3.metrics.stats.ByteBufStats; +import com.automq.stream.WrappedByteBuf; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAdder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DirectByteBufAlloc { private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class); private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; + private static final Map USAGE_STATS = new ConcurrentHashMap<>(); + private static long lastMetricLogTime = System.currentTimeMillis(); + private static final Map ALLOC_TYPE = new HashMap<>(); + + public static final int DEFAULT = 0; + public static final int ENCODE_RECORD = 1; + public static final int DECODE_RECORD = 2; + public static final int WRITE_INDEX_BLOCK = 3; + public static final int READ_INDEX_BLOCK = 4; + public static final int WRITE_DATA_BLOCK_HEADER = 5; + public static final int WRITE_FOOTER = 6; + public static final int STREAM_OBJECT_COMPACTION_READ = 7; + public static final int STREAM_OBJECT_COMPACTION_WRITE = 8; + public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9; + public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10; + + static { + registerAllocType(DEFAULT, "default"); + registerAllocType(ENCODE_RECORD, "write_record"); + registerAllocType(DECODE_RECORD, "read_record"); + registerAllocType(WRITE_INDEX_BLOCK, "write_index_block"); + registerAllocType(READ_INDEX_BLOCK, "read_index_block"); + registerAllocType(WRITE_DATA_BLOCK_HEADER, "write_data_block_header"); + registerAllocType(WRITE_FOOTER, "write_footer"); + registerAllocType(STREAM_OBJECT_COMPACTION_READ, "stream_object_compaction_read"); + registerAllocType(STREAM_OBJECT_COMPACTION_WRITE, "stream_object_compaction_write"); + registerAllocType(STREAM_SET_OBJECT_COMPACTION_READ, "stream_set_object_compaction_read"); + registerAllocType(STREAM_SET_OBJECT_COMPACTION_WRITE, "stream_set_object_compaction_write"); + + } public static CompositeByteBuf compositeByteBuffer() { return ALLOC.compositeDirectBuffer(Integer.MAX_VALUE); } public static ByteBuf byteBuffer(int initCapacity) { - return byteBuffer(initCapacity, null); + return byteBuffer(initCapacity, DEFAULT); } - public static ByteBuf byteBuffer(int initCapacity, String name) { + public static ByteBuf byteBuffer(int initCapacity, int type) { try { - if (name != null) { - ByteBufStats.getInstance().allocateByteBufSizeStats(name).record(MetricsLevel.DEBUG, initCapacity); + LongAdder usage = USAGE_STATS.compute(type, (k, v) -> { + if (v == null) { + v = new LongAdder(); + } + v.add(initCapacity); + return v; + }); + long now = System.currentTimeMillis(); + if (now - lastMetricLogTime > 60000) { + // it's ok to be not thread safe + lastMetricLogTime = now; + LOGGER.info("Direct Memory usage: netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric()); } - return ALLOC.directBuffer(initCapacity); + return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity)); } catch (OutOfMemoryError e) { - LOGGER.error("alloc direct buffer OOM", e); + LOGGER.error("alloc direct buffer OOM, netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric(), e); System.err.println("alloc direct buffer OOM"); Runtime.getRuntime().halt(1); throw e; } } + public static void registerAllocType(int type, String name) { + if (ALLOC_TYPE.containsKey(type)) { + throw new IllegalArgumentException("type already registered: " + type + "=" + ALLOC_TYPE.get(type)); + } + ALLOC_TYPE.put(type, name); + } + + public static Metric metric() { + return new Metric(); + } + + public static class Metric { + private final long usage; + private final Map detail; + + public Metric() { + Map detail = new HashMap<>(); + USAGE_STATS.forEach((k, v) -> detail.put(k, v.longValue())); + this.detail = detail; + this.usage = this.detail.values().stream().mapToLong(Long::longValue).sum(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usage="); + sb.append(usage); + sb.append(", detail="); + for (Map.Entry entry : detail.entrySet()) { + sb.append(entry.getKey()).append("/").append(ALLOC_TYPE.get(entry.getKey())).append("=").append(entry.getValue()).append(","); + } + sb.append("}"); + return sb.toString(); + } + } + public interface OOMHandler { /** * Try handle OOM exception. diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 271147a55..3e9969aeb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.automq.stream.s3.DirectByteBufAlloc.READ_INDEX_BLOCK; import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET; @@ -144,7 +145,7 @@ public static BasicObjectInfo parse(ByteBuf objectTailBuf, // trim the ByteBuf to avoid extra memory occupy. ByteBuf indexBlockBuf = objectTailBuf.slice(objectTailBuf.readerIndex() + indexRelativePosition, indexBlockSize); - ByteBuf copy = DirectByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes()); + ByteBuf copy = DirectByteBufAlloc.byteBuffer(indexBlockBuf.readableBytes(), READ_INDEX_BLOCK); indexBlockBuf.readBytes(copy, indexBlockBuf.readableBytes()); objectTailBuf.release(); indexBlockBuf = copy; diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java index 3b84025bf..0d1431a8c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java @@ -24,6 +24,10 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_DATA_BLOCK_HEADER; +import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_FOOTER; +import static com.automq.stream.s3.DirectByteBufAlloc.WRITE_INDEX_BLOCK; + /** * Write stream records to a single object. */ @@ -193,7 +197,7 @@ class IndexBlock { public IndexBlock() { long nextPosition = 0; int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size(); - buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, "write_index_block"); + buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, WRITE_INDEX_BLOCK); for (DataBlock block : completedBlocks) { ObjectStreamRange streamRange = block.getStreamRange(); new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()), @@ -227,7 +231,7 @@ class DataBlock { public DataBlock(long streamId, List records) { this.recordCount = records.size(); this.encodedBuf = DirectByteBufAlloc.compositeByteBuffer(); - ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE); + ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE, WRITE_DATA_BLOCK_HEADER); header.writeByte(DATA_BLOCK_MAGIC); header.writeByte(DATA_BLOCK_DEFAULT_FLAG); header.writeInt(recordCount); @@ -262,7 +266,7 @@ class Footer { private final ByteBuf buf; public Footer(long indexStartPosition, int indexBlockLength) { - buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE); + buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE, WRITE_FOOTER); // start position of index block buf.writeLong(indexStartPosition); // size of index block diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java index 73cc855a8..035ea9452 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamObjectCompactor.java @@ -32,6 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_READ; +import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID; import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OFFSET; @@ -173,7 +175,7 @@ public Optional compact() throws ExecutionException, long compactedEndOffset = objectGroup.get(objectGroup.size() - 1).endOffset(); List compactedObjectIds = new LinkedList<>(); CompositeByteBuf indexes = DirectByteBufAlloc.compositeByteBuffer(); - Writer writer = s3Operator.writer(ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2); + Writer writer = s3Operator.writer(new Writer.Context(STREAM_OBJECT_COMPACTION_READ), ObjectUtils.genKey(0, objectId), ThrottleStrategy.THROTTLE_2); long groupStartOffset = -1L; long groupStartPosition = -1L; int groupSize = 0; @@ -182,7 +184,7 @@ public Optional compact() throws ExecutionException, for (S3ObjectMetadata object : objectGroup) { try (ObjectReader reader = new ObjectReader(object, s3Operator)) { ObjectReader.BasicObjectInfo basicObjectInfo = reader.basicObjectInfo().get(); - ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE); + ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(basicObjectInfo.indexBlock().count() * DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE); Iterator it = basicObjectInfo.indexBlock().iterator(); long validDataBlockStartPosition = 0; while (it.hasNext()) { @@ -217,7 +219,7 @@ public Optional compact() throws ExecutionException, } } if (lastIndex != null) { - ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE); + ByteBuf subIndexes = DirectByteBufAlloc.byteBuffer(DataBlockIndex.BLOCK_INDEX_SIZE, STREAM_OBJECT_COMPACTION_WRITE); new DataBlockIndex(streamId, groupStartOffset, (int) (lastIndex.endOffset() - groupStartOffset), groupRecordCount, groupStartPosition, groupSize).encode(subIndexes); indexes.addComponent(true, subIndexes); diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java index ef618fbfa..0a54dedea 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java @@ -14,6 +14,8 @@ import com.automq.stream.s3.model.StreamRecordBatch; import io.netty.buffer.ByteBuf; +import static com.automq.stream.s3.DirectByteBufAlloc.ENCODE_RECORD; + public class StreamRecordBatchCodec { public static final byte MAGIC_V0 = 0x22; public static final int HEADER_SIZE = @@ -26,8 +28,7 @@ public class StreamRecordBatchCodec { public static ByteBuf encode(StreamRecordBatch streamRecord) { int totalLength = HEADER_SIZE + streamRecord.size(); // payload - - ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength); + ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength, ENCODE_RECORD); buf.writeByte(MAGIC_V0); buf.writeLong(streamRecord.getStreamId()); buf.writeLong(streamRecord.getEpoch()); @@ -52,7 +53,7 @@ public static StreamRecordBatch duplicateDecode(ByteBuf buf) { long baseOffset = buf.readLong(); int lastOffsetDelta = buf.readInt(); int payloadLength = buf.readInt(); - ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength); + ByteBuf payload = DirectByteBufAlloc.byteBuffer(payloadLength, DirectByteBufAlloc.DECODE_RECORD); buf.readBytes(payload); return new StreamRecordBatch(streamId, epoch, baseOffset, lastOffsetDelta, payload); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index eb383ed27..5c785e2d5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -34,6 +34,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ; + //TODO: refactor to reduce duplicate code with ObjectWriter public class DataBlockReader { private static final Logger LOGGER = LoggerFactory.getLogger(DataBlockReader.class); @@ -193,7 +195,7 @@ private CompletableFuture rangeRead0(long start, long end) { if (throttleBucket == null) { return s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes()); + ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); directBuf.writeBytes(buf); buf.release(); return directBuf; @@ -203,7 +205,7 @@ private CompletableFuture rangeRead0(long start, long end) { .thenCompose(v -> s3Operator.rangeRead(objectKey, start, end, ThrottleStrategy.THROTTLE_2).thenApply(buf -> { // convert heap buffer to direct buffer - ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes()); + ByteBuf directBuf = DirectByteBufAlloc.byteBuffer(buf.readableBytes(), STREAM_SET_OBJECT_COMPACTION_READ); directBuf.writeBytes(buf); buf.release(); return directBuf; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index 5bd683609..ba946b48b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -30,6 +30,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_OBJECT_COMPACTION_WRITE; +import static com.automq.stream.s3.DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_READ; import static com.automq.stream.s3.operator.Writer.MIN_PART_SIZE; //TODO: refactor to reduce duplicate code with ObjectWriter @@ -51,7 +53,7 @@ public DataBlockWriter(long objectId, S3Operator s3Operator, int partSizeThresho waitingUploadBlocks = new LinkedList<>(); waitingUploadBlockCfs = new ConcurrentHashMap<>(); completedBlocks = new LinkedList<>(); - writer = s3Operator.writer(objectKey, ThrottleStrategy.THROTTLE_2); + writer = s3Operator.writer(new Writer.Context(STREAM_SET_OBJECT_COMPACTION_READ), objectKey, ThrottleStrategy.THROTTLE_2); } public long getObjectId() { @@ -152,7 +154,7 @@ public IndexBlock() { List dataBlockIndices = CompactionUtils.buildDataBlockIndicesFromGroup( CompactionUtils.groupStreamDataBlocks(completedBlocks, new GroupByLimitPredicate(DEFAULT_DATA_BLOCK_GROUP_SIZE_THRESHOLD))); - buf = DirectByteBufAlloc.byteBuffer(dataBlockIndices.size() * DataBlockIndex.BLOCK_INDEX_SIZE, "write_index_block"); + buf = DirectByteBufAlloc.byteBuffer(dataBlockIndices.size() * DataBlockIndex.BLOCK_INDEX_SIZE, DirectByteBufAlloc.STREAM_SET_OBJECT_COMPACTION_WRITE); for (DataBlockIndex dataBlockIndex : dataBlockIndices) { dataBlockIndex.encode(buf); } @@ -177,7 +179,7 @@ class Footer { private final ByteBuf buf; public Footer() { - buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE); + buf = DirectByteBufAlloc.byteBuffer(FOOTER_SIZE, STREAM_OBJECT_COMPACTION_WRITE); buf.writeLong(indexBlock.position()); buf.writeInt(indexBlock.size()); buf.writeZero(40 - 8 - 4); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index b36725f9b..c2c74fad0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -391,8 +391,8 @@ private void write0(String path, ByteBuf data, CompletableFuture cf) { } @Override - public Writer writer(String path, ThrottleStrategy throttleStrategy) { - return new ProxyWriter(this, path, throttleStrategy); + public Writer writer(Writer.Context context, String path, ThrottleStrategy throttleStrategy) { + return new ProxyWriter(context, this, path, throttleStrategy); } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java index cb179d469..8ab57b818 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MemoryS3Operator.java @@ -46,7 +46,7 @@ public CompletableFuture write(String path, ByteBuf data, ThrottleStrategy } @Override - public Writer writer(String path, ThrottleStrategy throttleStrategy) { + public Writer writer(Writer.Context context, String path, ThrottleStrategy throttleStrategy) { ByteBuf buf = Unpooled.buffer(); storage.put(path, buf); return new Writer() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java index 87774c96c..66527143a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/MultiPartWriter.java @@ -28,9 +28,11 @@ import java.util.stream.Collectors; import software.amazon.awssdk.services.s3.model.CompletedPart; + public class MultiPartWriter implements Writer { private static final long MAX_MERGE_WRITE_SIZE = 16L * 1024 * 1024; final CompletableFuture uploadIdCf = new CompletableFuture<>(); + private final Context context; private final S3Operator operator; private final String path; private final List> parts = new LinkedList<>(); @@ -46,7 +48,8 @@ public class MultiPartWriter implements Writer { private CompletableFuture closeCf; private ObjectPart objectPart = null; - public MultiPartWriter(S3Operator operator, String path, long minPartSize, ThrottleStrategy throttleStrategy) { + public MultiPartWriter(Context context, S3Operator operator, String path, long minPartSize, ThrottleStrategy throttleStrategy) { + this.context = context; this.operator = operator; this.path = path; this.minPartSize = minPartSize; @@ -200,7 +203,7 @@ public void write(ByteBuf data) { public void copyOnWrite() { int size = partBuf.readableBytes(); if (size > 0) { - ByteBuf buf = DirectByteBufAlloc.byteBuffer(size); + ByteBuf buf = DirectByteBufAlloc.byteBuffer(size, context.allocType()); buf.writeBytes(partBuf.duplicate()); CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf); this.partBuf.release(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java index 8eb579f48..3e5e6714b 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java @@ -28,21 +28,24 @@ */ class ProxyWriter implements Writer { final ObjectWriter objectWriter = new ObjectWriter(); + private final Context context; private final S3Operator operator; private final String path; private final long minPartSize; private final ThrottleStrategy throttleStrategy; Writer multiPartWriter = null; - public ProxyWriter(S3Operator operator, String path, long minPartSize, ThrottleStrategy throttleStrategy) { + public ProxyWriter(Context context, S3Operator operator, String path, long minPartSize, + ThrottleStrategy throttleStrategy) { + this.context = context; this.operator = operator; this.path = path; this.minPartSize = minPartSize; this.throttleStrategy = throttleStrategy; } - public ProxyWriter(S3Operator operator, String path, ThrottleStrategy throttleStrategy) { - this(operator, path, MIN_PART_SIZE, throttleStrategy); + public ProxyWriter(Context context, S3Operator operator, String path, ThrottleStrategy throttleStrategy) { + this(context, operator, path, MIN_PART_SIZE, throttleStrategy); } @Override @@ -103,7 +106,7 @@ public CompletableFuture release() { } private void newMultiPartWriter() { - this.multiPartWriter = new MultiPartWriter(operator, path, minPartSize, throttleStrategy); + this.multiPartWriter = new MultiPartWriter(context, operator, path, minPartSize, throttleStrategy); if (objectWriter.data.readableBytes() > 0) { FutureUtil.propagate(multiPartWriter.write(objectWriter.data), objectWriter.cf); } else { @@ -129,7 +132,7 @@ public CompletableFuture write(ByteBuf part) { public void copyOnWrite() { int size = data.readableBytes(); if (size > 0) { - ByteBuf buf = DirectByteBufAlloc.byteBuffer(size); + ByteBuf buf = DirectByteBufAlloc.byteBuffer(size, context.allocType()); buf.writeBytes(data.duplicate()); CompositeByteBuf copy = DirectByteBufAlloc.compositeByteBuffer().addComponent(true, buf); this.data.release(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/S3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/S3Operator.java index ee3aad93e..31ec18ec6 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/S3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/S3Operator.java @@ -56,10 +56,10 @@ default CompletableFuture write(String path, ByteBuf data) { * @param throttleStrategy throttle strategy. * @return {@link Writer} */ - Writer writer(String path, ThrottleStrategy throttleStrategy); + Writer writer(Writer.Context ctx, String path, ThrottleStrategy throttleStrategy); default Writer writer(String path) { - return writer(path, ThrottleStrategy.BYPASS); + return writer(Writer.Context.DEFAULT, path, ThrottleStrategy.BYPASS); } CompletableFuture delete(String path); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java index d1d7d0941..6d47559ab 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/Writer.java @@ -11,6 +11,7 @@ package com.automq.stream.s3.operator; +import com.automq.stream.s3.DirectByteBufAlloc; import io.netty.buffer.ByteBuf; import java.util.concurrent.CompletableFuture; @@ -72,4 +73,19 @@ public interface Writer { * Release all resources held by this writer. */ CompletableFuture release(); + + class Context { + public static final Context DEFAULT = new Context(DirectByteBufAlloc.DEFAULT); + + private final int allocType; + + public Context(int allocType) { + this.allocType = allocType; + } + + public int allocType() { + return allocType; + } + + } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java index 9bc3aa52d..370699f54 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java @@ -59,7 +59,7 @@ void setUp() { @Test void testWrite() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ExecutionException, InterruptedException { - writer = new MultiPartWriter(operator, "test-path", 100, null); + writer = new MultiPartWriter(Writer.Context.DEFAULT, operator, "test-path", 100, null); List requests = new ArrayList<>(); List contentLengths = new ArrayList<>(); @@ -110,7 +110,7 @@ void testWrite() throws NoSuchMethodException, InvocationTargetException, Illega @Test @SuppressWarnings("unchecked") void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ExecutionException, InterruptedException { - writer = new MultiPartWriter(operator, "test-path-2", 100, null); + writer = new MultiPartWriter(Writer.Context.DEFAULT, operator, "test-path-2", 100, null); List uploadPartRequests = new ArrayList<>(); List uploadPartCopyRequests = new ArrayList<>(); List writeContentLengths = new ArrayList<>(); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java index 62682b910..dcf4acc3f 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/ProxyWriterTest.java @@ -42,7 +42,7 @@ public class ProxyWriterTest { @BeforeEach public void setup() { operator = mock(S3Operator.class); - writer = new ProxyWriter(operator, "testpath", null); + writer = new ProxyWriter(Writer.Context.DEFAULT, operator, "testpath", null); } @Test