Skip to content

Commit

Permalink
feat(s3stream): add direct memory usage metrics (#947)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Feb 27, 2024
1 parent f972cfb commit b4a7909
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 83 deletions.
45 changes: 28 additions & 17 deletions s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class DirectByteBufAlloc {
public static final int STREAM_OBJECT_COMPACTION_WRITE = 8;
public static final int STREAM_SET_OBJECT_COMPACTION_READ = 9;
public static final int STREAM_SET_OBJECT_COMPACTION_WRITE = 10;
public static DirectByteBufAllocMetric directByteBufAllocMetric = null;

static {
registerAllocType(DEFAULT, "default");
Expand Down Expand Up @@ -77,11 +78,13 @@ public static ByteBuf byteBuffer(int initCapacity, int type) {
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
LOGGER.info("Direct Memory usage: netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric());
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric);
}
return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} catch (OutOfMemoryError e) {
LOGGER.error("alloc direct buffer OOM, netty.usedDirectMemory={}, DirectByteBufAlloc={}", ALLOC.metric().usedDirectMemory(), metric(), e);
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e);
System.err.println("alloc direct buffer OOM");
Runtime.getRuntime().halt(1);
throw e;
Expand All @@ -95,28 +98,36 @@ public static void registerAllocType(int type, String name) {
ALLOC_TYPE.put(type, name);
}

public static Metric metric() {
return new Metric();
}
public static class DirectByteBufAllocMetric {
private final long usedDirectMemory;
private final long allocatedDirectMemory;
private final Map<String, Long> detail = new HashMap<>();

public static class Metric {
private final long usage;
private final Map<Integer, Long> detail;
public DirectByteBufAllocMetric() {
USAGE_STATS.forEach((k, v) -> {
detail.put(k + "/" + ALLOC_TYPE.get(k), v.longValue());
});
this.usedDirectMemory = ALLOC.metric().usedDirectMemory();
this.allocatedDirectMemory = this.detail.values().stream().mapToLong(Long::longValue).sum();
}

public long getUsedDirectMemory() {
return usedDirectMemory;
}

public Metric() {
Map<Integer, Long> detail = new HashMap<>();
USAGE_STATS.forEach((k, v) -> detail.put(k, v.longValue()));
this.detail = detail;
this.usage = this.detail.values().stream().mapToLong(Long::longValue).sum();
public Map<String, Long> getDetailedMap() {
return detail;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usage=");
sb.append(usage);
StringBuilder sb = new StringBuilder("DirectByteBufAllocMetric{usedDirectMemory=");
sb.append(usedDirectMemory);
sb.append(", allocatedDirectMemory=");
sb.append(allocatedDirectMemory);
sb.append(", detail=");
for (Map.Entry<Integer, Long> entry : detail.entrySet()) {
sb.append(entry.getKey()).append("/").append(ALLOC_TYPE.get(entry.getKey())).append("=").append(entry.getValue()).append(",");
for (Map.Entry<String, Long> entry : detail.entrySet()) {
sb.append(entry.getKey()).append("=").append(entry.getValue()).append(",");
}
sb.append("}");
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ public static Attributes buildAttributes(S3ObjectStage objectStage) {
.build();
}

public static Attributes buildAttributes(String source) {
return Attributes.builder()
.put(S3StreamMetricsConstant.LABEL_ALLOCATE_BYTE_BUF_SOURCE, source)
.build();
}

public static String getObjectBucketLabel(long objectSize) {
int index = (int) Math.ceil(Math.log((double) objectSize / (16 * 1024)) / Math.log(2));
index = Math.min(S3StreamMetricsConstant.OBJECT_SIZE_BUCKET_NAMES.length - 1, Math.max(0, index));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3.metrics;

import com.automq.stream.s3.metrics.wrapper.ConfigListener;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MultiAttributes<K> implements ConfigListener {
private final Map<K, Attributes> attributesMap = new ConcurrentHashMap<>();
private final AttributeKey<K> keyName;
private Attributes baseAttributes;

public MultiAttributes(Attributes baseAttributes, AttributeKey<K> keyName) {
this.baseAttributes = baseAttributes;
this.keyName = keyName;
}

public Attributes get(K key) {
return attributesMap.computeIfAbsent(key, k -> buildAttributes(baseAttributes, Attributes.of(keyName, key)));
}

private Attributes buildAttributes(Attributes baseAttributes, Attributes attributes) {
return Attributes.builder().putAll(baseAttributes).putAll(attributes).build();
}

private void reBuildAttributes(Attributes baseAttributes) {
for (Map.Entry<K, Attributes> entry : attributesMap.entrySet()) {
attributesMap.replace(entry.getKey(), buildAttributes(baseAttributes, entry.getValue()));
}
}

@Override
public void onConfigChange(MetricsConfig metricsConfig) {
this.baseAttributes = metricsConfig.getBaseAttributes();
reBuildAttributes(metricsConfig.getBaseAttributes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class S3StreamMetricsConstant {
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_SIZE_METRIC_NAME = "network_outbound_limiter_queue_size";
public static final String NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_inbound_limiter_queue_time";
public static final String NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME = "network_outbound_limiter_queue_time";
public static final String ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME = "allocate_byte_buf_size";
public static final String READ_AHEAD_SIZE_METRIC_NAME = "read_ahead_size";
public static final String WAL_START_OFFSET = "wal_start_offset";
public static final String WAL_TRIMMED_OFFSET = "wal_trimmed_offset";
Expand All @@ -92,12 +91,14 @@ public class S3StreamMetricsConstant {
public static final String INFLIGHT_WAL_UPLOAD_TASKS_COUNT_METRIC_NAME = "inflight_wal_upload_tasks_count";
public static final String COMPACTION_READ_SIZE_METRIC_NAME = "compaction_read_size";
public static final String COMPACTION_WRITE_SIZE_METRIC_NAME = "compaction_write_size";
public static final String ALLOCATED_DIRECT_MEMORY_SIZE_METRIC_NAME = "allocated_direct_memory_size";
public static final String USED_DIRECT_MEMORY_SIZE_METRIC_NAME = "used_direct_memory_size";
public static final AttributeKey<String> LABEL_OPERATION_TYPE = AttributeKey.stringKey("operation_type");
public static final AttributeKey<String> LABEL_OPERATION_NAME = AttributeKey.stringKey("operation_name");
public static final AttributeKey<String> LABEL_SIZE_NAME = AttributeKey.stringKey("size");
public static final AttributeKey<String> LABEL_STAGE = AttributeKey.stringKey("stage");
public static final AttributeKey<String> LABEL_STATUS = AttributeKey.stringKey("status");
public static final AttributeKey<String> LABEL_ALLOCATE_BYTE_BUF_SOURCE = AttributeKey.stringKey("source");
public static final AttributeKey<String> LABEL_ALLOC_TYPE = AttributeKey.stringKey("type");
public static final String LABEL_STATUS_SUCCESS = "success";
public static final String LABEL_STATUS_FAILED = "failed";
public static final String LABEL_STATUS_HIT = "hit";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3.metrics;

import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.wrapper.CounterMetric;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand All @@ -25,6 +26,7 @@
import io.opentelemetry.api.metrics.ObservableLongGauge;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

public class S3StreamMetricsManager {
Expand All @@ -44,7 +46,6 @@ public class S3StreamMetricsManager {
private static ObservableLongGauge networkOutboundLimiterQueueSize = new NoopObservableLongGauge();
private static LongHistogram networkInboundLimiterQueueTime = new NoopLongHistogram();
private static LongHistogram networkOutboundLimiterQueueTime = new NoopLongHistogram();
private static LongHistogram allocateByteBufSize = new NoopLongHistogram();
private static LongHistogram readAheadSize = new NoopLongHistogram();
private static LongHistogram readAheadLimierQueueTime = new NoopLongHistogram();
private static ObservableLongGauge deltaWalStartOffset = new NoopObservableLongGauge();
Expand All @@ -55,6 +56,8 @@ public class S3StreamMetricsManager {
private static ObservableLongGauge availableInflightS3ReadQuota = new NoopObservableLongGauge();
private static ObservableLongGauge availableInflightS3WriteQuota = new NoopObservableLongGauge();
private static ObservableLongGauge inflightWALUploadTasksCount = new NoopObservableLongGauge();
private static ObservableLongGauge allocatedDirectMemorySize = new NoopObservableLongGauge();
private static ObservableLongGauge usedDirectMemorySize = new NoopObservableLongGauge();
private static LongCounter compactionReadSizeInTotal = new NoopLongCounter();
private static LongCounter compactionWriteSizeInTotal = new NoopLongCounter();
private static Supplier<Long> networkInboundAvailableBandwidthSupplier = () -> 0L;
Expand All @@ -70,6 +73,12 @@ public class S3StreamMetricsManager {
private static Supplier<Integer> availableInflightS3WriteQuotaSupplier = () -> 0;
private static Supplier<Integer> inflightWALUploadTasksCountSupplier = () -> 0;
private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty());
private static final MultiAttributes<String> ALLOC_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_ALLOC_TYPE);

static {
BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES);
}

public static void configure(MetricsConfig metricsConfig) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
Expand Down Expand Up @@ -172,11 +181,6 @@ public static void initMetrics(Meter meter, String prefix) {
.ofLongs()
.setExplicitBucketBoundariesAdvice(S3StreamMetricsConstant.LATENCY_BOUNDARIES)
.build();
allocateByteBufSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.ALLOCATE_BYTE_BUF_SIZE_METRIC_NAME)
.setDescription("Allocate byte buf size")
.setUnit("bytes")
.ofLongs()
.build();
readAheadSize = meter.histogramBuilder(prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME)
.setDescription("Read ahead size")
.setUnit("bytes")
Expand Down Expand Up @@ -263,6 +267,27 @@ public static void initMetrics(Meter meter, String prefix) {
.setDescription("Compaction write size")
.setUnit("bytes")
.build();
allocatedDirectMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.ALLOCATED_DIRECT_MEMORY_SIZE_METRIC_NAME)
.setDescription("Allocated direct memory size")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel()) && DirectByteBufAlloc.directByteBufAllocMetric != null) {
Map<String, Long> allocateSizeMap = DirectByteBufAlloc.directByteBufAllocMetric.getDetailedMap();
for (Map.Entry<String, Long> entry : allocateSizeMap.entrySet()) {
result.record(entry.getValue(), ALLOC_TYPE_ATTRIBUTES.get(entry.getKey()));
}
}
});
usedDirectMemorySize = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.USED_DIRECT_MEMORY_SIZE_METRIC_NAME)
.setDescription("Used direct memory size")
.setUnit("bytes")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.DEBUG.isWithin(metricsConfig.getMetricsLevel()) && DirectByteBufAlloc.directByteBufAllocMetric != null) {
result.record(DirectByteBufAlloc.directByteBufAllocMetric.getUsedDirectMemory(), metricsConfig.getBaseAttributes());
}
});
}

public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.Type type,
Expand Down Expand Up @@ -425,14 +450,6 @@ public static HistogramMetric buildNetworkOutboundLimiterQueueTimeMetric() {
}
}

public static HistogramMetric buildAllocateByteBufSizeMetric(String source) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
HistogramMetric metric = new HistogramMetric(metricsConfig, AttributesUtils.buildAttributes(source), allocateByteBufSize);
BASE_ATTRIBUTES_LISTENERS.add(metric);
return metric;
}
}

public static HistogramMetric buildReadAheadSizeMetric() {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
HistogramMetric metric = new HistogramMetric(metricsConfig, readAheadSize);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.metrics.MetricsLevel;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.stats.ByteBufStats;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.wal.util.WALUtil;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -107,10 +106,8 @@ public ByteBuf data() {
data = DirectByteBufAlloc.compositeByteBuffer();
for (Supplier<ByteBuf> supplier : records) {
ByteBuf record = supplier.get();
ByteBufStats.getInstance().allocateByteBufSizeStats("wal_record").record(MetricsLevel.DEBUG, record.readableBytes());
data.addComponent(true, record);
}
ByteBufStats.getInstance().allocateByteBufSizeStats("wal_block").record(MetricsLevel.DEBUG, data.readableBytes());
return data;
}

Expand Down

0 comments on commit b4a7909

Please sign in to comment.