Skip to content

Commit

Permalink
feat(s3stream): add memory usage detect switch (#948)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Feb 27, 2024
1 parent b4a7909 commit a534c38
Showing 1 changed file with 25 additions and 15 deletions.
40 changes: 25 additions & 15 deletions s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.LoggerFactory;

public class DirectByteBufAlloc {
public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT"));

private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -67,24 +69,32 @@ public static ByteBuf byteBuffer(int initCapacity) {

public static ByteBuf byteBuffer(int initCapacity, int type) {
try {
LongAdder usage = USAGE_STATS.compute(type, (k, v) -> {
if (v == null) {
v = new LongAdder();
if (MEMORY_USAGE_DETECT) {
LongAdder usage = USAGE_STATS.compute(type, (k, v) -> {
if (v == null) {
v = new LongAdder();
}
v.add(initCapacity);
return v;
});
long now = System.currentTimeMillis();
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric);
}
v.add(initCapacity);
return v;
});
long now = System.currentTimeMillis();
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric);
return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return ALLOC.directBuffer(initCapacity);
}
return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} catch (OutOfMemoryError e) {
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e);
if (MEMORY_USAGE_DETECT) {
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e);
} else {
LOGGER.error("alloc direct buffer OOM", e);
}
System.err.println("alloc direct buffer OOM");
Runtime.getRuntime().halt(1);
throw e;
Expand Down

0 comments on commit a534c38

Please sign in to comment.