Skip to content

Commit

Permalink
feat(s3stream): sequential allocate memory for record write (#949)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Feb 27, 2024
1 parent a534c38 commit b38b1d0
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream;

import com.automq.stream.s3.DirectByteBufAlloc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.util.concurrent.atomic.AtomicReference;

public class DirectByteBufSeqAlloc {
public static final int HUGE_BUF_SIZE = 8 * 1024 * 1024;
// why not use ThreadLocal? the partition open has too much threads
final AtomicReference<HugeBuf>[] hugeBufArray = new AtomicReference[8];
private final int allocType;

public DirectByteBufSeqAlloc(int allocType) {
this.allocType = allocType;
for (int i = 0; i < hugeBufArray.length; i++) {
hugeBufArray[i] = new AtomicReference<>(new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)));
}
}

public ByteBuf byteBuffer(int capacity) {
if (capacity >= HUGE_BUF_SIZE) {
// if the request capacity is larger than HUGE_BUF_SIZE, just allocate a new ByteBuf
return DirectByteBufAlloc.byteBuffer(capacity, allocType);
}
int bufIndex = Math.abs(Thread.currentThread().hashCode() % hugeBufArray.length);

AtomicReference<HugeBuf> bufRef = hugeBufArray[bufIndex];
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (bufRef) {
HugeBuf hugeBuf = bufRef.get();

if (hugeBuf.nextIndex + capacity <= hugeBuf.buf.capacity()) {
// if the request capacity can be satisfied by the current hugeBuf, return a slice of it
int nextIndex = hugeBuf.nextIndex;
hugeBuf.nextIndex += capacity;
ByteBuf slice = hugeBuf.buf.retainedSlice(nextIndex, capacity);
return slice.writerIndex(slice.readerIndex());
}

// if the request capacity cannot be satisfied by the current hugeBuf
// 1. slice the remaining of the current hugeBuf and release the hugeBuf
// 2. create a new hugeBuf and slice the remaining of the required capacity
// 3. return the composite ByteBuf of the two slices
CompositeByteBuf cbf = DirectByteBufAlloc.compositeByteBuffer();
int readLength = hugeBuf.buf.capacity() - hugeBuf.nextIndex;
cbf.addComponent(false, hugeBuf.buf.retainedSlice(hugeBuf.nextIndex, readLength));
capacity -= readLength;
hugeBuf.buf.release();

HugeBuf newHugeBuf = new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType));
bufRef.set(newHugeBuf);

cbf.addComponent(false, newHugeBuf.buf.retainedSlice(0, capacity));
newHugeBuf.nextIndex = capacity;

return cbf;
}
}

static class HugeBuf {
final ByteBuf buf;
int nextIndex;

HugeBuf(ByteBuf buf) {
this.buf = buf;
this.nextIndex = 0;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3;

import com.automq.stream.DirectByteBufSeqAlloc;
import com.automq.stream.s3.model.StreamRecordBatch;
import io.netty.buffer.ByteBuf;

Expand All @@ -25,10 +26,12 @@ public class StreamRecordBatchCodec {
+ 8 // baseOffset
+ 4 // lastOffsetDelta
+ 4; // payload length
private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD);

public static ByteBuf encode(StreamRecordBatch streamRecord) {
int totalLength = HEADER_SIZE + streamRecord.size(); // payload
ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength, ENCODE_RECORD);
// use sequential allocator to avoid memory fragmentation
ByteBuf buf = ENCODE_ALLOC.byteBuffer(totalLength);
buf.writeByte(MAGIC_V0);
buf.writeLong(streamRecord.getStreamId());
buf.writeLong(streamRecord.getEpoch());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
package com.automq.stream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class DirectByteBufSeqAllocTest {

@Test
public void testAlloc() {
DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0);

AtomicReference<DirectByteBufSeqAlloc.HugeBuf> bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)];

ByteBuf buf1 = alloc.byteBuffer(12);
buf1.writeLong(1);
buf1.writeInt(2);

ByteBuf buf2 = alloc.byteBuffer(20);
buf2.writeLong(3);
buf2.writeInt(4);
buf2.writeLong(5);

ByteBuf buf3 = alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12 - 20 - 4);

ByteBuf oldHugeBuf = bufRef.get().buf;

ByteBuf buf4 = alloc.byteBuffer(16);
buf4.writeLong(6);
buf4.writeLong(7);

assertTrue(oldHugeBuf != bufRef.get().buf);

assertEquals(1, buf1.readLong());
assertEquals(2, buf1.readInt());
assertEquals(3, buf2.readLong());
assertEquals(4, buf2.readInt());
assertEquals(5, buf2.readLong());
assertInstanceOf(CompositeByteBuf.class, buf4);
assertEquals(6, buf4.readLong());
assertEquals(7, buf4.readLong());

buf1.release();
buf2.release();
buf3.release();
buf4.release();
assertEquals(0, oldHugeBuf.refCnt());
assertEquals(1, bufRef.get().buf.refCnt());

ByteBuf oldHugeBuf2 = bufRef.get().buf;

alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12).release();
alloc.byteBuffer(12).release();
assertEquals(0, oldHugeBuf2.refCnt());
}

}

0 comments on commit b38b1d0

Please sign in to comment.