From 472af7f0a9f1363a830ba9923a7429f5b83ee73e Mon Sep 17 00:00:00 2001 From: Gagan Juneja Date: Wed, 5 Jun 2024 19:16:48 +0530 Subject: [PATCH] Adds support to provide tags with value in Gauge metric Signed-off-by: Gagan Juneja --- CHANGELOG.md | 1 + .../metrics/DefaultMetricsRegistry.java | 5 ++ .../telemetry/metrics/MetricsRegistry.java | 12 +++++ .../metrics/ObservableMeasurement.java | 53 +++++++++++++++++++ .../metrics/noop/NoopMetricsRegistry.java | 6 +++ .../metrics/DefaultMetricsRegistryTests.java | 14 +++++ .../TelemetryMetricsEnabledSanityIT.java | 42 +++++++++++++++ .../metrics/OTelMetricsTelemetry.java | 11 ++++ .../metrics/OTelMetricsTelemetryTests.java | 29 ++++++++++ .../test/telemetry/MockTelemetry.java | 6 +++ 10 files changed, 179 insertions(+) create mode 100644 libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/ObservableMeasurement.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ce11dabb6e9c..e7743dbd69d69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650)) - Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481)) - Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636)) +- Adds support to provide tags with value in Gauge metric. ([#13994](https://github.com/opensearch-project/OpenSearch/pull/13994)) ### Deprecated diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java index c861c21f89fc5..c41529f038958 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistry.java @@ -48,6 +48,11 @@ public Closeable createGauge(String name, String description, String unit, Suppl return metricsTelemetry.createGauge(name, description, unit, valueProvider, tags); } + @Override + public Closeable createGauge(String name, String description, String unit, Supplier value) { + return metricsTelemetry.createGauge(name, description, unit, value); + } + @Override public void close() throws IOException { metricsTelemetry.close(); diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java index 3ab3dcf82c7a7..fd9c0f32cb835 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/MetricsRegistry.java @@ -63,4 +63,16 @@ public interface MetricsRegistry extends Closeable { */ Closeable createGauge(String name, String description, String unit, Supplier valueProvider, Tags tags); + /** + * Creates the Observable Gauge type of Metric. Where the value provider will be called at a certain frequency + * to capture the value. + * + * @param name name of the observable gauge. + * @param description any description about the metric. + * @param unit unit of the metric. + * @param value value provider. + * @return closeable to dispose/close the Gauge metric. + */ + Closeable createGauge(String name, String description, String unit, Supplier value); + } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/ObservableMeasurement.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/ObservableMeasurement.java new file mode 100644 index 0000000000000..7225feeffdc50 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/ObservableMeasurement.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.metrics; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * Observable Measurement for the Asynchronous instruments. + * @opensearch.experimental + */ +@ExperimentalApi +public class ObservableMeasurement { + private final Double value; + private final Tags tags; + + /** + * Factory method to create the {@link ObservableMeasurement} object. + * @param value value. + * @param tags tags to be added per value. + * @return ObservableMeasurement + */ + public static ObservableMeasurement create(double value, Tags tags) { + return new ObservableMeasurement(value, tags); + } + + private ObservableMeasurement(double value, Tags tags) { + this.value = value; + this.tags = tags; + } + + /** + * Returns the value. + * @return value + */ + public Double getValue() { + return value; + } + + /** + * Returns the tags. + * @return tags + */ + public Tags getTags() { + return tags; + } +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java index 9a913d25e872d..493c30791827f 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/metrics/noop/NoopMetricsRegistry.java @@ -12,6 +12,7 @@ import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.ObservableMeasurement; import org.opensearch.telemetry.metrics.tags.Tags; import java.io.Closeable; @@ -52,6 +53,11 @@ public Closeable createGauge(String name, String description, String unit, Suppl return () -> {}; } + @Override + public Closeable createGauge(String name, String description, String unit, Supplier value) { + return () -> {}; + } + @Override public void close() throws IOException { diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java index 872f697ade09e..2c6910f66cd69 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/metrics/DefaultMetricsRegistryTests.java @@ -79,4 +79,18 @@ public void testGauge() { assertSame(mockCloseable, closeable); } + public void testGaugeWithValueAndTagSupplier() { + Closeable mockCloseable = mock(Closeable.class); + when(defaultMeterRegistry.createGauge(any(String.class), any(String.class), any(String.class), any(Supplier.class))).thenReturn( + mockCloseable + ); + Closeable closeable = defaultMeterRegistry.createGauge( + "org.opensearch.telemetry.metrics.DefaultMeterRegistryTests.testObservableGauge", + "test observable gauge", + "ms", + () -> ObservableMeasurement.create(1.0, Tags.EMPTY) + ); + assertSame(mockCloseable, closeable); + } + } diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java index 90143d907cd99..51c18313e4e60 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/metrics/TelemetryMetricsEnabledSanityIT.java @@ -23,10 +23,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import io.opentelemetry.sdk.metrics.data.DoublePointData; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramPointData; @@ -147,6 +150,36 @@ public void testGauge() throws Exception { } + public void testGaugeWithValueAndTagSupplier() throws Exception { + String metricName = "test-gauge"; + MetricsRegistry metricsRegistry = internalCluster().getInstance(MetricsRegistry.class); + InMemorySingletonMetricsExporter.INSTANCE.reset(); + Tags tags = Tags.create().addTag("test", "integ-test"); + final AtomicInteger testValue = new AtomicInteger(0); + Supplier valueProvider = () -> { + return ObservableMeasurement.create(Double.valueOf(testValue.incrementAndGet()), tags); + }; + Closeable gaugeCloseable = metricsRegistry.createGauge(metricName, "test", "ms", valueProvider); + // Sleep for about 2.2s to wait for metrics to be published. + Thread.sleep(2200); + + InMemorySingletonMetricsExporter exporter = InMemorySingletonMetricsExporter.INSTANCE; + + assertTrue(getMaxObservableGaugeValue(exporter, metricName) >= 2.0); + + gaugeCloseable.close(); + double observableGaugeValueAfterStop = getMaxObservableGaugeValue(exporter, metricName); + + Map, Object> attributes = getMetricAttributes(exporter, metricName); + + assertEquals("integ-test", attributes.get(AttributeKey.stringKey("test"))); + + // Sleep for about 1.2s to wait for metrics to see that closed observableGauge shouldn't execute the callable. + Thread.sleep(1200); + assertEquals(observableGaugeValueAfterStop, getMaxObservableGaugeValue(exporter, metricName), 0.0); + + } + private static double getMaxObservableGaugeValue(InMemorySingletonMetricsExporter exporter, String metricName) { List dataPoints = exporter.getFinishedMetricItems() .stream() @@ -159,6 +192,15 @@ private static double getMaxObservableGaugeValue(InMemorySingletonMetricsExporte return totalValue; } + private static Map, Object> getMetricAttributes(InMemorySingletonMetricsExporter exporter, String metricName) { + List dataPoints = exporter.getFinishedMetricItems() + .stream() + .filter(a -> a.getName().contains(metricName)) + .collect(Collectors.toList()); + Attributes attributes = dataPoints.get(0).getDoubleGaugeData().getPoints().stream().findAny().get().getAttributes(); + return attributes.asMap(); + } + @After public void reset() { InMemorySingletonMetricsExporter.INSTANCE.reset(); diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java index 6fe08040d7af5..98e8aea88eac6 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java @@ -101,6 +101,17 @@ public Closeable createGauge(String name, String description, String unit, Suppl return () -> doubleObservableGauge.close(); } + @Override + public Closeable createGauge(String name, String description, String unit, Supplier value) { + ObservableDoubleGauge doubleObservableGauge = AccessController.doPrivileged( + (PrivilegedAction) () -> otelMeter.gaugeBuilder(name) + .setUnit(unit) + .setDescription(description) + .buildWithCallback(record -> record.record(value.get().getValue(), OTelAttributesConverter.convert(value.get().getTags()))) + ); + return () -> doubleObservableGauge.close(); + } + @Override public void close() throws IOException { meterProvider.close(); diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java index 2e89a3c488d5c..df92124c0bddc 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetryTests.java @@ -180,4 +180,33 @@ public void testGauge() throws Exception { closeable.close(); verify(observableDoubleGauge).close(); } + + public void testGaugeWithValueAndTagsSupplier() throws Exception { + String observableGaugeName = "test-gauge"; + String description = "test"; + String unit = "1"; + Meter mockMeter = mock(Meter.class); + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + ObservableDoubleGauge observableDoubleGauge = mock(ObservableDoubleGauge.class); + DoubleGaugeBuilder mockOTelDoubleGaugeBuilder = mock(DoubleGaugeBuilder.class); + MeterProvider meterProvider = mock(MeterProvider.class); + when(meterProvider.get(OTelTelemetryPlugin.INSTRUMENTATION_SCOPE_NAME)).thenReturn(mockMeter); + MetricsTelemetry metricsTelemetry = new OTelMetricsTelemetry( + new RefCountedReleasable("telemetry", mockOpenTelemetry, () -> {}), + meterProvider + ); + when(mockMeter.gaugeBuilder(Mockito.contains(observableGaugeName))).thenReturn(mockOTelDoubleGaugeBuilder); + when(mockOTelDoubleGaugeBuilder.setDescription(description)).thenReturn(mockOTelDoubleGaugeBuilder); + when(mockOTelDoubleGaugeBuilder.setUnit(unit)).thenReturn(mockOTelDoubleGaugeBuilder); + when(mockOTelDoubleGaugeBuilder.buildWithCallback(any(Consumer.class))).thenReturn(observableDoubleGauge); + + Closeable closeable = metricsTelemetry.createGauge( + observableGaugeName, + description, + unit, + () -> ObservableMeasurement.create(1.0, Tags.EMPTY) + ); + closeable.close(); + verify(observableDoubleGauge).close(); + } } diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java index 4ba130343e889..a5fe8d1f0511a 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetry.java @@ -13,6 +13,7 @@ import org.opensearch.telemetry.metrics.Counter; import org.opensearch.telemetry.metrics.Histogram; import org.opensearch.telemetry.metrics.MetricsTelemetry; +import org.opensearch.telemetry.metrics.ObservableMeasurement; import org.opensearch.telemetry.metrics.noop.NoopCounter; import org.opensearch.telemetry.metrics.noop.NoopHistogram; import org.opensearch.telemetry.metrics.tags.Tags; @@ -62,6 +63,11 @@ public Closeable createGauge(String name, String description, String unit, Suppl return () -> {}; } + @Override + public Closeable createGauge(String name, String description, String unit, Supplier value) { + return () -> {}; + } + @Override public void close() {