Skip to content

Commit

Permalink
fix(s3stream): fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Mar 6, 2024
1 parent c769562 commit 7b8b7d5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class S3StreamMetricsConstant {
public static final String OPERATION_LATENCY_METRIC_NAME = "operation_latency";
public static final String OBJECT_COUNT_METRIC_NAME = "object_count";
public static final String OBJECT_STAGE_COST_METRIC_NAME = "object_stage_cost";
public static final String OBJECT_UPLOAD_SIZE_METRIC_NAME = "object_upload_size";
public static final String NETWORK_INBOUND_USAGE_METRIC_NAME = "network_inbound_usage";
public static final String NETWORK_OUTBOUND_USAGE_METRIC_NAME = "network_outbound_usage";
public static final String NETWORK_INBOUND_AVAILABLE_BANDWIDTH_METRIC_NAME = "network_inbound_available_bandwidth";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,23 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;

public class S3StreamMetricsManager {
private static final List<ConfigListener> BASE_ATTRIBUTES_LISTENERS = new ArrayList<>();
public static final MetricsRegistry METRICS_REGISTRY = new MetricsRegistry();
public static final List<YammerHistogramMetric> OPERATION_LATENCY_METRICS = new CopyOnWriteArrayList<>();
public static final List<YammerHistogramMetric> OBJECT_STAGE_METRICS = new CopyOnWriteArrayList<>();
public static final List<YammerHistogramMetric> NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>();
public static final List<YammerHistogramMetric> NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>();
public static final List<YammerHistogramMetric> READ_AHEAD_SIZE_METRICS = new CopyOnWriteArrayList<>();
public static final List<YammerHistogramMetric> READ_AHEAD_LIMITER_QUEUE_TIME_METRICS = new CopyOnWriteArrayList<>();
private static LongCounter s3DownloadSizeInTotal = new NoopLongCounter();
private static LongCounter s3UploadSizeInTotal = new NoopLongCounter();
private static HistogramInstrument operationLatency;
private static LongCounter objectNumInTotal = new NoopLongCounter();
private static HistogramInstrument objectStageCost;
private static HistogramInstrument objectUploadSize;
private static HistogramInstrument objectDownloadSize;
private static LongCounter networkInboundUsageInTotal = new NoopLongCounter();
private static LongCounter networkOutboundUsageInTotal = new NoopLongCounter();
private static ObservableLongGauge networkInboundAvailableBandwidth = new NoopObservableLongGauge();
Expand Down Expand Up @@ -106,14 +111,12 @@ public static void initMetrics(Meter meter, String prefix) {
.setUnit("bytes")
.build();
operationLatency = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OPERATION_LATENCY_METRIC_NAME,
"Operation latency", "nanoseconds");
"Operation latency", "nanoseconds", () -> OPERATION_LATENCY_METRICS);
objectNumInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.OBJECT_COUNT_METRIC_NAME)
.setDescription("Objects count")
.build();
objectStageCost = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_STAGE_COST_METRIC_NAME,
"Objects stage cost", "nanoseconds");
objectUploadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.OBJECT_UPLOAD_SIZE_METRIC_NAME,
"Objects upload size", "bytes");
"Objects stage cost", "nanoseconds", () -> OBJECT_STAGE_METRICS);
networkInboundUsageInTotal = meter.counterBuilder(prefix + S3StreamMetricsConstant.NETWORK_INBOUND_USAGE_METRIC_NAME)
.setDescription("Network inbound usage")
.setUnit("bytes")
Expand Down Expand Up @@ -157,13 +160,13 @@ public static void initMetrics(Meter meter, String prefix) {
}
});
networkInboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRIC_NAME,
"Network inbound limiter queue time", "nanoseconds");
"Network inbound limiter queue time", "nanoseconds", () -> NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS);
networkOutboundLimiterQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRIC_NAME,
"Network outbound limiter queue time", "nanoseconds");
"Network outbound limiter queue time", "nanoseconds", () -> NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS);
readAheadSize = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_SIZE_METRIC_NAME,
"Read ahead size", "bytes");
"Read ahead size", "bytes", () -> READ_AHEAD_SIZE_METRICS);
readAheadLimierQueueTime = new HistogramInstrument(meter, prefix + S3StreamMetricsConstant.READ_AHEAD_QUEUE_TIME_METRIC_NAME,
"Read ahead limiter queue time", "nanoseconds");
"Read ahead limiter queue time", "nanoseconds", () -> READ_AHEAD_LIMITER_QUEUE_TIME_METRICS);
deltaWalStartOffset = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.WAL_START_OFFSET)
.setDescription("Delta WAL start offset")
.ofLongs()
Expand Down Expand Up @@ -330,7 +333,7 @@ public static YammerHistogramMetric buildStageOperationMetric(MetricName metricN
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel,
metricsConfig, AttributesUtils.buildAttributes(stage));
BASE_ATTRIBUTES_LISTENERS.add(metric);
operationLatency.registerYammerHistogramMetric(metric);
OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
Expand All @@ -340,7 +343,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName,
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel,
metricsConfig, AttributesUtils.buildAttributes(operation));
BASE_ATTRIBUTES_LISTENERS.add(metric);
operationLatency.registerYammerHistogramMetric(metric);
OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
Expand All @@ -351,7 +354,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName,
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
AttributesUtils.buildAttributes(operation, status, sizeLabelName));
BASE_ATTRIBUTES_LISTENERS.add(metric);
operationLatency.registerYammerHistogramMetric(metric);
OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
Expand All @@ -361,7 +364,7 @@ public static YammerHistogramMetric buildOperationMetric(MetricName metricName,
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
AttributesUtils.buildAttributes(operation, status));
BASE_ATTRIBUTES_LISTENERS.add(metric);
operationLatency.registerYammerHistogramMetric(metric);
OPERATION_LATENCY_METRICS.add(metric);
return metric;
}
}
Expand All @@ -379,7 +382,7 @@ public static YammerHistogramMetric buildObjectStageCostMetric(MetricName metric
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig,
AttributesUtils.buildAttributes(stage));
BASE_ATTRIBUTES_LISTENERS.add(metric);
objectStageCost.registerYammerHistogramMetric(metric);
OBJECT_STAGE_METRICS.add(metric);
return metric;
}
}
Expand All @@ -388,7 +391,7 @@ public static YammerHistogramMetric buildObjectUploadSizeMetric(MetricName metri
synchronized (BASE_ATTRIBUTES_LISTENERS) {
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
objectStageCost.registerYammerHistogramMetric(metric);
OBJECT_STAGE_METRICS.add(metric);
return metric;
}
}
Expand All @@ -413,7 +416,7 @@ public static YammerHistogramMetric buildNetworkInboundLimiterQueueTimeMetric(Me
synchronized (BASE_ATTRIBUTES_LISTENERS) {
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
networkInboundLimiterQueueTime.registerYammerHistogramMetric(metric);
NETWORK_INBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
}
}
Expand All @@ -422,7 +425,7 @@ public static YammerHistogramMetric buildNetworkOutboundLimiterQueueTimeMetric(M
synchronized (BASE_ATTRIBUTES_LISTENERS) {
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
networkOutboundLimiterQueueTime.registerYammerHistogramMetric(metric);
NETWORK_OUTBOUND_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
}
}
Expand All @@ -431,7 +434,7 @@ public static YammerHistogramMetric buildReadAheadSizeMetric(MetricName metricNa
synchronized (BASE_ATTRIBUTES_LISTENERS) {
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
readAheadSize.registerYammerHistogramMetric(metric);
READ_AHEAD_SIZE_METRICS.add(metric);
return metric;
}

Expand All @@ -441,7 +444,7 @@ public static YammerHistogramMetric buildReadAheadLimiterQueueTimeMetric(MetricN
synchronized (BASE_ATTRIBUTES_LISTENERS) {
YammerHistogramMetric metric = new YammerHistogramMetric(metricName, metricsLevel, metricsConfig);
BASE_ATTRIBUTES_LISTENERS.add(metric);
readAheadLimierQueueTime.registerYammerHistogramMetric(metric);
READ_AHEAD_LIMITER_QUEUE_TIME_METRICS.add(metric);
return metric;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableLongGauge;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Supplier;

public class HistogramInstrument {
private final List<YammerHistogramMetric> histograms;
private final ObservableLongGauge count;
private final ObservableLongGauge sum;
private final ObservableDoubleGauge histP50Value;
private final ObservableDoubleGauge histP99Value;
private final ObservableDoubleGauge histMeanValue;
private final ObservableDoubleGauge histMaxValue;

public HistogramInstrument(Meter meter, String name, String desc, String unit) {
this.histograms = new CopyOnWriteArrayList<>();
public HistogramInstrument(Meter meter, String name, String desc, String unit, Supplier<List<YammerHistogramMetric>> histogramsSupplier) {
this.count = meter.gaugeBuilder(name + S3StreamMetricsConstant.COUNT_METRIC_NAME_SUFFIX)
.setDescription(desc + " (count)")
.ofLongs()
.buildWithCallback(result -> {
List<YammerHistogramMetric> histograms = histogramsSupplier.get();
histograms.forEach(histogram -> {
if (histogram.shouldRecord()) {
result.record(histogram.count(), histogram.attributes);
Expand All @@ -50,6 +49,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) {
.ofLongs()
.setUnit(unit)
.buildWithCallback(result -> {
List<YammerHistogramMetric> histograms = histogramsSupplier.get();
histograms.forEach(histogram -> {
if (histogram.shouldRecord()) {
result.record(histogram.sum(), histogram.attributes);
Expand All @@ -60,6 +60,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) {
.setDescription(desc + " (50th percentile)")
.setUnit(unit)
.buildWithCallback(result -> {
List<YammerHistogramMetric> histograms = histogramsSupplier.get();
histograms.forEach(histogram -> {
if (histogram.shouldRecord()) {
result.record(histogram.p50(), histogram.attributes);
Expand All @@ -70,6 +71,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) {
.setDescription(desc + " (99th percentile)")
.setUnit(unit)
.buildWithCallback(result -> {
List<YammerHistogramMetric> histograms = histogramsSupplier.get();
histograms.forEach(histogram -> {
if (histogram.shouldRecord()) {
result.record(histogram.p99(), histogram.attributes);
Expand All @@ -80,6 +82,7 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) {
.setDescription(desc + " (mean)")
.setUnit(unit)
.buildWithCallback(result -> {
List<YammerHistogramMetric> histograms = histogramsSupplier.get();
histograms.forEach(histogram -> {
if (histogram.shouldRecord()) {
result.record(histogram.mean(), histogram.attributes);
Expand All @@ -90,15 +93,12 @@ public HistogramInstrument(Meter meter, String name, String desc, String unit) {
.setDescription(desc + " (max)")
.setUnit(unit)
.buildWithCallback(result -> {
List<YammerHistogramMetric> histograms = histogramsSupplier.get();
histograms.forEach(histogram -> {
if (histogram.shouldRecord()) {
result.record(histogram.max(), histogram.attributes);
}
});
});
}

public void registerYammerHistogramMetric(YammerHistogramMetric histogram) {
histograms.add(histogram);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testMetricsLevel() {
yammerHistogramMetric = new YammerHistogramMetric(Mockito.mock(MetricName.class), MetricsLevel.DEBUG, new MetricsConfig(),
Attributes.builder().put("extra", "v").build());
Assertions.assertFalse(yammerHistogramMetric.shouldRecord());
yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.INFO, null));
yammerHistogramMetric.onConfigChange(new MetricsConfig(MetricsLevel.DEBUG, null));
Assertions.assertTrue(yammerHistogramMetric.shouldRecord());
}
}

0 comments on commit 7b8b7d5

Please sign in to comment.