diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 1c1fb7974..5d5dbf460 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -69,7 +69,6 @@ public class S3StreamMetricsConstant { public static final String OPERATION_LATENCY_METRIC_NAME = "operation_latency"; public static final String OBJECT_COUNT_METRIC_NAME = "object_count"; public static final String OBJECT_STAGE_COST_METRIC_NAME = "object_stage_cost"; - public static final String OBJECT_UPLOAD_SIZE_METRIC_NAME = "object_upload_size"; public static final String NETWORK_INBOUND_USAGE_METRIC_NAME = "network_inbound_usage"; public static final String NETWORK_OUTBOUND_USAGE_METRIC_NAME = "network_outbound_usage"; public static final String NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_inbound_available_bandwidth"; diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index d4e71d552..c50bda24d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -29,18 +29,23 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Supplier; public class S3StreamMetricsManager { private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); public static final MetricsRegistry METRICS_REGISTRY = new MetricsRegistry(); + public static final List OPERATION_LATENCY_METRICS = new CopyOnWriteArrayList<>(); + public static final List OBJECT_STAGE_METRICS = new CopyOnWriteArrayList<>(); + public static final List NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>(); + public static final List NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>(); + public static final List READ_AHEAD_SIZE_METRICS = new CopyOnWriteArrayList<>(); + public static final List READ_AHEAD_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>(); private static LongCounter s3DownloadSizeInTotal = new NoopLongCounter(); private static LongCounter s3UploadSizeInTotal = new NoopLongCounter(); private static HistogramInstrument operationLatency; private static LongCounter objectNumInTotal = new NoopLongCounter(); private static HistogramInstrument objectStageCost; - private static HistogramInstrument objectUploadSize; - private static HistogramInstrument objectDownloadSize; private static LongCounter networkInboundUsageInTotal = new NoopLongCounter(); private static LongCounter networkOutboundUsageInTotal = new NoopLongCounter(); private static ObservableLongGauge networkInboundAvailableBandwidth = new NoopObservableLongGauge(); @@ -106,14 +111,12 @@ public static void initMetrics(Meter meter, String prefix) { .setUnit("bytes") .build(); operationLatency = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME, - "Operation latency", "nanoseconds"); + "Operation latency", "nanoseconds", () -> OPERATION_LATENCY_METRICS); objectNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OBJECT_COUNT_METRIC_NAME) .setDescription("Objects count") .build(); objectStageCost = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME, - "Objects stage cost", "nanoseconds"); - objectUploadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_UPLOAD_SIZE_METRIC_NAME, - "Objects upload size", "bytes"); + "Objects stage cost", "nanoseconds", () -> OBJECT_STAGE_METRICS); networkInboundUsageInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_USAGE_METRIC_NAME) .setDescription("Network inbound usage") .setUnit("bytes") @@ -157,13 +160,13 @@ public static void initMetrics(Meter meter, String prefix) { } }); networkInboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME, - "Network inbound limiter queue time", "nanoseconds"); + "Network inbound limiter queue time", "nanoseconds", () -> NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS); networkOutboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME, - "Network outbound limiter queue time", "nanoseconds"); + "Network outbound limiter queue time", "nanoseconds", () -> NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS); readAheadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME, - "Read ahead size", "bytes"); + "Read ahead size", "bytes", () -> READ_AHEAD_SIZE_METRICS); readAheadLimierQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME, - "Read ahead limiter queue time", "nanoseconds"); + "Read ahead limiter queue time", "nanoseconds", () -> READ_AHEAD_LIMITER_QUEUE_TIME_METRICS); deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET) .setDescription("Delta WAL start offset") .ofLongs() @@ -330,7 +333,7 @@ public static YammerHistogramMetric buildStageOperationMetric(MetricName metricN YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(stage)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -340,7 +343,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(operation)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -351,7 +354,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(operation, status, sizeLabelName)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -361,7 +364,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName, YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(operation, status)); BASE_ATTRIBUTES_LISTENERS.add(metric); - operationLatency.registerYammerHistogramMetric(metric); + OPERATION_LATENCY_METRICS.add(metric); return metric; } } @@ -379,7 +382,7 @@ public static YammerHistogramMetric buildObjectStageCostMetric(MetricName metric YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig, AttributesUtils.buildAttributes(stage)); BASE_ATTRIBUTES_LISTENERS.add(metric); - objectStageCost.registerYammerHistogramMetric(metric); + OBJECT_STAGE_METRICS.add(metric); return metric; } } @@ -388,7 +391,7 @@ public static YammerHistogramMetric buildObjectUploadSizeMetric(MetricName metri synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - objectStageCost.registerYammerHistogramMetric(metric); + OBJECT_STAGE_METRICS.add(metric); return metric; } } @@ -413,7 +416,7 @@ public static YammerHistogramMetric buildNetworkInboundLimiterQueueTimeMetric(Me synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - networkInboundLimiterQueueTime.registerYammerHistogramMetric(metric); + NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric); return metric; } } @@ -422,7 +425,7 @@ public static YammerHistogramMetric buildNetworkOutboundLimiterQueueTimeMetric(M synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - networkOutboundLimiterQueueTime.registerYammerHistogramMetric(metric); + NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric); return metric; } } @@ -431,7 +434,7 @@ public static YammerHistogramMetric buildReadAheadSizeMetric(MetricName metricNa synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - readAheadSize.registerYammerHistogramMetric(metric); + READ_AHEAD_SIZE_METRICS.add(metric); return metric; } @@ -441,7 +444,7 @@ public static YammerHistogramMetric buildReadAheadLimiterQueueTimeMetric(MetricN synchronized (BASE_ATTRIBUTES_LISTENERS) { YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig); BASE_ATTRIBUTES_LISTENERS.add(metric); - readAheadLimierQueueTime.registerYammerHistogramMetric(metric); + READ_AHEAD_LIMITER_QUEUE_TIME_METRICS.add(metric); return metric; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java index a0d591bc6..2b9066020 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramInstrument.java @@ -22,10 +22,9 @@ import io.opentelemetry.api.metrics.ObservableDoubleGauge; import io.opentelemetry.api.metrics.ObservableLongGauge; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Supplier; public class HistogramInstrument { - private final List histograms; private final ObservableLongGauge count; private final ObservableLongGauge sum; private final ObservableDoubleGauge histP50Value; @@ -33,12 +32,12 @@ public class HistogramInstrument { private final ObservableDoubleGauge histMeanValue; private final ObservableDoubleGauge histMaxValue; - public HistogramInstrument(Meter meter, String name, String desc, String unit) { - this.histograms = new CopyOnWriteArrayList<>(); + public HistogramInstrument(Meter meter, String name, String desc, String unit, Supplier> histogramsSupplier) { this.count = meter.gaugeBuilder(name + S3StreamMetricsConstant.COUNT_METRIC_NAME_SUFFIX) .setDescription(desc + " (count)") .ofLongs() .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.count(), histogram.attributes); @@ -50,6 +49,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .ofLongs() .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.sum(), histogram.attributes); @@ -60,6 +60,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (50th percentile)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.p50(), histogram.attributes); @@ -70,6 +71,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (99th percentile)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.p99(), histogram.attributes); @@ -80,6 +82,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (mean)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.mean(), histogram.attributes); @@ -90,6 +93,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { .setDescription(desc + " (max)") .setUnit(unit) .buildWithCallback(result -> { + List histograms = histogramsSupplier.get(); histograms.forEach(histogram -> { if (histogram.shouldRecord()) { result.record(histogram.max(), histogram.attributes); @@ -97,8 +101,4 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) { }); }); } - - public void registerYammerHistogramMetric(YammerHistogramMetric histogram) { - histograms.add(histogram); - } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java index d08dfd4a7..fbea91eaf 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/metrics/wrapper/MetricsWrapperTest.java @@ -64,7 +64,7 @@ public void testMetricsLevel() { yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.DEBUG, new MetricsConfig(), Attributes.builder().put("extra", "v").build()); Assertions.assertFalse(yammerHistogramMetric.shouldRecord()); - yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.INFO, null)); + yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null)); Assertions.assertTrue(yammerHistogramMetric.shouldRecord()); } }