Skip to content

otel: subchannel metrics #12202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/LoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ public abstract class LoadBalancer {
public static final Attributes.Key<Boolean> IS_PETIOLE_POLICY =
Attributes.Key.create("io.grpc.IS_PETIOLE_POLICY");

/**
* The name of the locality that this EquivalentAddressGroup is in.
*/
public static final Attributes.Key<String> ATTR_LOCALITY_NAME =
Attributes.Key.create("io.grpc.lb.locality");

/**
* A picker that always returns an erring pick.
*
Expand Down
32 changes: 32 additions & 0 deletions api/src/main/java/io/grpc/LongUpDownCounterMetricInstrument.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.util.List;

/**
* Represents a long-valued up down counter metric instrument.
*/
@Internal
public final class LongUpDownCounterMetricInstrument extends PartialMetricInstrument {
public LongUpDownCounterMetricInstrument(int index, String name, String description, String unit,
List<String> requiredLabelKeys,
List<String> optionalLabelKeys,
boolean enableByDefault) {
super(index, name, description, unit, requiredLabelKeys, optionalLabelKeys, enableByDefault);
}
}
41 changes: 41 additions & 0 deletions api/src/main/java/io/grpc/MetricInstrumentRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,47 @@
}
}

/**
* Registers a new Long Up Down Counter metric instrument.
*
* @param name the name of the metric
* @param description a description of the metric
* @param unit the unit of measurement for the metric
* @param requiredLabelKeys a list of required label keys
* @param optionalLabelKeys a list of optional label keys
* @param enableByDefault whether the metric should be enabled by default
* @return the newly created LongUpDownCounterMetricInstrument
* @throws IllegalStateException if a metric with the same name already exists
*/
public LongUpDownCounterMetricInstrument registerLongUpDownCounter(String name,
String description,
String unit,
List<String> requiredLabelKeys,
List<String> optionalLabelKeys,
boolean enableByDefault) {
checkArgument(!Strings.isNullOrEmpty(name), "missing metric name");
checkNotNull(description, "description");
checkNotNull(unit, "unit");
checkNotNull(requiredLabelKeys, "requiredLabelKeys");
checkNotNull(optionalLabelKeys, "optionalLabelKeys");
synchronized (lock) {
if (registeredMetricNames.contains(name)) {
throw new IllegalStateException("Metric with name " + name + " already exists");

Check warning on line 172 in api/src/main/java/io/grpc/MetricInstrumentRegistry.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/MetricInstrumentRegistry.java#L172

Added line #L172 was not covered by tests
}
int index = nextAvailableMetricIndex;
if (index + 1 == metricInstruments.length) {
resizeMetricInstruments();

Check warning on line 176 in api/src/main/java/io/grpc/MetricInstrumentRegistry.java

View check run for this annotation

Codecov / codecov/patch

api/src/main/java/io/grpc/MetricInstrumentRegistry.java#L176

Added line #L176 was not covered by tests
}
LongUpDownCounterMetricInstrument instrument = new LongUpDownCounterMetricInstrument(
index, name, description, unit, requiredLabelKeys, optionalLabelKeys,
enableByDefault);
metricInstruments[index] = instrument;
registeredMetricNames.add(name);
nextAvailableMetricIndex += 1;
return instrument;
}
}

/**
* Registers a new Double Histogram metric instrument.
*
Expand Down
25 changes: 24 additions & 1 deletion api/src/main/java/io/grpc/MetricRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do
* Adds a value for a long valued counter metric instrument.
*
* @param metricInstrument The counter metric instrument to add the value against.
* @param value The value to add.
* @param value The value to add. MUST be non-negative.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
Expand All @@ -66,6 +66,29 @@ default void addLongCounter(LongCounterMetricInstrument metricInstrument, long v
metricInstrument.getOptionalLabelKeys().size());
}

/**
* Adds a value for a long valued up down counter metric instrument.
*
* @param metricInstrument The counter metric instrument to add the value against.
* @param value The value to add. May be positive, negative or zero.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument,
long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
checkArgument(requiredLabelValues != null
&& requiredLabelValues.size() == metricInstrument.getRequiredLabelKeys().size(),
"Incorrect number of required labels provided. Expected: %s",
metricInstrument.getRequiredLabelKeys().size());
checkArgument(optionalLabelValues != null
&& optionalLabelValues.size() == metricInstrument.getOptionalLabelKeys().size(),
"Incorrect number of optional labels provided. Expected: %s",
metricInstrument.getOptionalLabelKeys().size());
}


/**
* Records a value for a double-precision histogram metric instrument.
*
Expand Down
18 changes: 16 additions & 2 deletions api/src/main/java/io/grpc/MetricSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,26 @@ default void addDoubleCounter(DoubleCounterMetricInstrument metricInstrument, do
* Adds a value for a long valued counter metric associated with specified metric instrument.
*
* @param metricInstrument The counter metric instrument identifies metric measure to add.
* @param value The value to record.
* @param value The value to record. MUST be non-negative.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
default void addLongCounter(LongCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues, List<String> optionalLabelValues) {
List<String> requiredLabelValues, List<String> optionalLabelValues) {
}

/**
* Adds a value for a long valued up down counter metric associated with specified metric
* instrument.
*
* @param metricInstrument The counter metric instrument identifies metric measure to add.
* @param value The value to record. May be positive, negative or zero.
* @param requiredLabelValues A list of required label values for the metric.
* @param optionalLabelValues A list of additional, optional label values for the metric.
*/
default void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
}

/**
Expand Down
69 changes: 68 additions & 1 deletion core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import io.grpc.LoadBalancer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.NameResolver;
import io.grpc.SecurityLevel;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
Expand Down Expand Up @@ -160,6 +163,8 @@
private Status shutdownReason;

private volatile Attributes connectedAddressAttributes;
private final SubchannelMetrics subchannelMetrics;
private final String target;

InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
BackoffPolicy.Provider backoffPolicyProvider,
Expand All @@ -168,7 +173,9 @@
Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
Callback callback, InternalChannelz channelz, CallTracer callsTracer,
ChannelTracer channelTracer, InternalLogId logId,
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) {
ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters,
String target,
MetricRecorder metricRecorder) {
List<EquivalentAddressGroup> addressGroups = args.getAddresses();
Preconditions.checkNotNull(addressGroups, "addressGroups");
Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
Expand All @@ -192,6 +199,8 @@
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
this.transportFilters = transportFilters;
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
this.target = target;
this.subchannelMetrics = new SubchannelMetrics(metricRecorder);
}

ChannelLogger getChannelLogger() {
Expand Down Expand Up @@ -579,6 +588,15 @@
@Override
public void transportReady() {
channelLogger.log(ChannelLogLevel.INFO, "READY");
subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null,
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
syncContext.execute(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -608,6 +626,13 @@
channelLogger.log(
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
shutdownInitiated = true;
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
null, null
));
syncContext.execute(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -648,6 +673,15 @@
for (ClientTransportFilter filter : transportFilters) {
filter.transportTerminated(transport.getAttributes());
}
subchannelMetrics.recordDisconnection(buildLabelSet(
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
getAttributeOrDefault(
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
"Peer Pressure",
extractSecurityLevel(
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
));
syncContext.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -658,6 +692,27 @@
}
});
}

private String extractSecurityLevel(SecurityLevel securityLevel) {
if (securityLevel == null) {
return "none";
}
switch (securityLevel) {
case NONE:
return "none";

Check warning on line 702 in core/src/main/java/io/grpc/internal/InternalSubchannel.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/InternalSubchannel.java#L702

Added line #L702 was not covered by tests
case INTEGRITY:
return "integrity_only";

Check warning on line 704 in core/src/main/java/io/grpc/internal/InternalSubchannel.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/InternalSubchannel.java#L704

Added line #L704 was not covered by tests
case PRIVACY_AND_INTEGRITY:
return "privacy_and_integrity";

Check warning on line 706 in core/src/main/java/io/grpc/internal/InternalSubchannel.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/InternalSubchannel.java#L706

Added line #L706 was not covered by tests
default:
throw new IllegalArgumentException("Unknown SecurityLevel: " + securityLevel);

Check warning on line 708 in core/src/main/java/io/grpc/internal/InternalSubchannel.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/InternalSubchannel.java#L708

Added line #L708 was not covered by tests
}
}

private String getAttributeOrDefault(Attributes attributes, Attributes.Key<String> key) {
String value = attributes.get(key);
return value == null ? "" : value;
}
}

// All methods are called in syncContext
Expand Down Expand Up @@ -817,6 +872,18 @@
return buffer.toString();
}

private OtelMetricsAttributes buildLabelSet(String backendService, String locality,
String disconnectError, String securityLevel) {
return new OtelMetricsAttributes(
target,
backendService,
locality,
disconnectError,
securityLevel
);
}


@VisibleForTesting
static final class TransportLogger extends ChannelLogger {
// Changed just after construction to break a cyclic dependency.
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void exitIdleMode() {
LbHelperImpl lbHelper = new LbHelperImpl();
lbHelper.lb = loadBalancerFactory.newLoadBalancer(lbHelper);
// Delay setting lbHelper until fully initialized, since loadBalancerFactory is user code and
// may throw. We don't want to confuse our state, even if we will enter panic mode.
// may throw. We don't want to confuse our state, even if we enter panic mode.
this.lbHelper = lbHelper;

channelStateManager.gotoState(CONNECTING);
Expand Down Expand Up @@ -1464,7 +1464,9 @@ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
subchannelTracer,
subchannelLogId,
subchannelLogger,
transportFilters);
transportFilters,
target,
lbHelper.getMetricRecorder());
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel created")
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
Expand Down Expand Up @@ -1895,7 +1897,8 @@ void onNotInUse(InternalSubchannel is) {
subchannelTracer,
subchannelLogId,
subchannelLogger,
transportFilters);
transportFilters, target,
lbHelper.getMetricRecorder());

channelTracer.reportEvent(new ChannelTrace.Event.Builder()
.setDescription("Child Subchannel started")
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/io/grpc/internal/MetricRecorderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
import io.grpc.LongHistogramMetricInstrument;
import io.grpc.LongUpDownCounterMetricInstrument;
import io.grpc.MetricInstrument;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder;
Expand Down Expand Up @@ -82,7 +83,7 @@
* Records a long counter value.
*
* @param metricInstrument the {@link LongCounterMetricInstrument} to record.
* @param value the value to record.
* @param value the value to record. Must be non-negative.
* @param requiredLabelValues the required label values for the metric.
* @param optionalLabelValues the optional label values for the metric.
*/
Expand All @@ -103,6 +104,32 @@
}
}

/**
* Adds a long up down counter value.
*
* @param metricInstrument the {@link io.grpc.LongUpDownCounterMetricInstrument} to record.
* @param value the value to record. May be positive, negative or zero.
* @param requiredLabelValues the required label values for the metric.
* @param optionalLabelValues the optional label values for the metric.
*/
@Override
public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
MetricRecorder.super.addLongUpDownCounter(metricInstrument, value, requiredLabelValues,
optionalLabelValues);
for (MetricSink sink : metricSinks) {
int measuresSize = sink.getMeasuresSize();
if (measuresSize <= metricInstrument.getIndex()) {
// Measures may need updating in two cases:
// 1. When the sink is initially created with an empty list of measures.
// 2. When new metric instruments are registered, requiring the sink to accommodate them.
sink.updateMeasures(registry.getMetricInstruments());

Check warning on line 127 in core/src/main/java/io/grpc/internal/MetricRecorderImpl.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/io/grpc/internal/MetricRecorderImpl.java#L127

Added line #L127 was not covered by tests
}
sink.addLongUpDownCounter(metricInstrument, value, requiredLabelValues, optionalLabelValues);
}
}

/**
* Records a double histogram value.
*
Expand Down
Loading