From 8eda5ce2366c35f2497363c8d29e4c077d05e18f Mon Sep 17 00:00:00 2001 From: Nathan Dias Date: Fri, 19 Aug 2022 06:26:43 -0500 Subject: [PATCH 1/3] add ability to attach custom headers to traces This change adds the ability to attach custom gRPC metadata headers to trace requests. The Go library has a WithHeaders() option with this functionality: https://pkg.go.dev/contrib.go.opencensus.io/exporter/ocagent#WithHeaders --- .../trace/ocagent/OcAgentTraceExporter.java | 3 +- .../OcAgentTraceExporterConfiguration.java | 14 ++++++ .../ocagent/OcAgentTraceExporterHandler.java | 18 +++++-- ...OcAgentTraceExporterConfigurationTest.java | 2 + .../OcAgentTraceExporterIntegrationTest.java | 48 ++++++++++++++----- 5 files changed, 68 insertions(+), 17 deletions(-) diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java index 0df00bdfc4..d83bdeba7a 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java @@ -80,7 +80,8 @@ public static void createAndRegister(OcAgentTraceExporterConfiguration configura configuration.getSslContext(), configuration.getRetryInterval(), configuration.getEnableConfig(), - configuration.getDeadline()); + configuration.getDeadline(), + configuration.getHeaders()); registerInternal(newHandler); } } diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java index 3e2ed86379..05cb801542 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import io.netty.handler.ssl.SslContext; import io.opencensus.common.Duration; import javax.annotation.Nullable; @@ -112,6 +113,12 @@ public abstract class OcAgentTraceExporterConfiguration { */ public abstract Duration getDeadline(); + /** + * Returns custom headers to attach as gRPC metadata. + * @return + */ + public abstract ImmutableMap getHeaders(); + /** * Returns a new {@link Builder}. * @@ -210,6 +217,13 @@ public abstract static class Builder { abstract Duration getDeadline(); + abstract ImmutableMap.Builder headersBuilder(); + + public final Builder addHeader(String key, String value) { + headersBuilder().put(key, value); + return this; + } + /** * Builds a {@link OcAgentTraceExporterConfiguration}. * diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java index edc5e47fe6..9eb871cc1a 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java @@ -18,8 +18,10 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.MetadataUtils; import io.netty.handler.ssl.SslContext; import io.opencensus.common.Duration; import io.opencensus.exporter.trace.util.TimeLimitedHandler; @@ -28,6 +30,7 @@ import io.opencensus.proto.agent.trace.v1.TraceServiceGrpc; import io.opencensus.trace.export.SpanData; import java.util.Collection; +import java.util.Map; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -42,6 +45,7 @@ final class OcAgentTraceExporterHandler extends TimeLimitedHandler { private final Node node; private final Boolean useInsecure; @Nullable private final SslContext sslContext; + private final Map headers; @javax.annotation.Nullable private OcAgentTraceServiceExportRpcHandler exportRpcHandler; // Thread-safe @@ -53,12 +57,14 @@ final class OcAgentTraceExporterHandler extends TimeLimitedHandler { @Nullable SslContext sslContext, Duration retryInterval, boolean enableConfig, - Duration deadline) { + Duration deadline, + Map headers) { super(deadline, EXPORT_SPAN_NAME); this.endPoint = endPoint; this.node = OcAgentNodeUtils.getNodeInfo(serviceName); this.useInsecure = useInsecure; this.sslContext = sslContext; + this.headers = headers; } @Override @@ -67,7 +73,7 @@ public void timeLimitedExport(Collection spanDataList) { // If not connected, try to initiate a new connection when a new batch of spans arrive. // Export RPC doesn't respect the retry interval. TraceServiceGrpc.TraceServiceStub stub = - getTraceServiceStub(endPoint, useInsecure, sslContext); + getTraceServiceStub(endPoint, useInsecure, sslContext, headers); exportRpcHandler = createExportRpcHandlerAndConnect(stub, node); } @@ -104,7 +110,7 @@ private static OcAgentTraceServiceExportRpcHandler createExportRpcHandlerAndConn // Creates a TraceServiceStub with the given parameters. // One stub can be used for both Export RPC and Config RPC. private static TraceServiceGrpc.TraceServiceStub getTraceServiceStub( - String endPoint, Boolean useInsecure, SslContext sslContext) { + String endPoint, Boolean useInsecure, SslContext sslContext, Map headers) { ManagedChannelBuilder channelBuilder; if (useInsecure) { channelBuilder = ManagedChannelBuilder.forTarget(endPoint).usePlaintext(); @@ -115,6 +121,10 @@ private static TraceServiceGrpc.TraceServiceStub getTraceServiceStub( .sslContext(sslContext); } ManagedChannel channel = channelBuilder.build(); - return TraceServiceGrpc.newStub(channel); + Metadata metadata = new Metadata(); + for(Map.Entry e : headers.entrySet()) { + metadata.put(Metadata.Key.of(e.getKey(), Metadata.ASCII_STRING_MARSHALLER), e.getValue()); + } + return TraceServiceGrpc.newStub(channel).withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); } } diff --git a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java index daca516d82..82d22da78c 100644 --- a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java +++ b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java @@ -60,6 +60,7 @@ public void setAndGet() throws SSLException { .setRetryInterval(oneMinute) .setEnableConfig(false) .setDeadline(oneMinute) + .addHeader("foo", "bar") .build(); assertThat(configuration.getEndPoint()).isEqualTo("192.168.0.1:50051"); assertThat(configuration.getServiceName()).isEqualTo("service"); @@ -68,5 +69,6 @@ public void setAndGet() throws SSLException { assertThat(configuration.getRetryInterval()).isEqualTo(oneMinute); assertThat(configuration.getEnableConfig()).isFalse(); assertThat(configuration.getDeadline()).isEqualTo(oneMinute); + assertThat(configuration.getHeaders()).containsExactly("foo", "bar"); } } diff --git a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java index 6f7e29c5f3..d7c58895f4 100644 --- a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java +++ b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java @@ -20,9 +20,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; -import io.grpc.BindableService; -import io.grpc.Server; -import io.grpc.ServerBuilder; +import io.grpc.*; import io.grpc.netty.NettyServerBuilder; import io.opencensus.common.Scope; import io.opencensus.proto.agent.common.v1.Node; @@ -37,12 +35,7 @@ import io.opencensus.trace.samplers.Samplers; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Executor; import org.junit.After; import org.junit.Before; @@ -50,21 +43,27 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import javax.annotation.concurrent.GuardedBy; + /** End-to-end integration test for {@link OcAgentTraceExporter}. */ @RunWith(JUnit4.class) public class OcAgentTraceExporterIntegrationTest { private Server agent; private FakeOcAgentTraceServiceGrpcImpl fakeOcAgentTraceServiceGrpc; + private HeaderInterceptor headerInterceptor; private final Tracer tracer = Tracing.getTracer(); private static final String SERVICE_NAME = "integration-test"; + private static final String TEST_METADATA_HEADER = "test-header"; + private static final String TEST_METADATA_VALUE = "test-value"; @Before public void setUp() throws IOException { fakeOcAgentTraceServiceGrpc = new FakeOcAgentTraceServiceGrpcImpl(); + headerInterceptor = new HeaderInterceptor(); agent = - getServer(OcAgentTraceExporterConfiguration.DEFAULT_END_POINT, fakeOcAgentTraceServiceGrpc); + getServer(OcAgentTraceExporterConfiguration.DEFAULT_END_POINT, fakeOcAgentTraceServiceGrpc, headerInterceptor); } @After @@ -92,6 +91,7 @@ public void testExportSpans() throws InterruptedException, IOException { .setServiceName(SERVICE_NAME) .setUseInsecure(true) .setEnableConfig(false) + .addHeader(TEST_METADATA_HEADER, TEST_METADATA_VALUE) .build()); // Create one root span and 5 children. @@ -164,6 +164,12 @@ public void testExportSpans() throws InterruptedException, IOException { for (int i = 0; i < 8; i++) { assertThat(exportedSpanNames).contains("second-iteration-child-" + i); } + + for(Metadata metadata : headerInterceptor.getReceivedMetadata()) { + Metadata.Key key = Metadata.Key.of(TEST_METADATA_HEADER, Metadata.ASCII_STRING_MARSHALLER); + assertThat(metadata.containsKey(key)).isTrue(); + assertThat(metadata.get(key)).isEqualTo(TEST_METADATA_VALUE); + } } @Test @@ -195,11 +201,11 @@ private void doWork(String spanName, int i) { } } - private static Server getServer(String endPoint, BindableService service) throws IOException { + private static Server getServer(String endPoint, BindableService service, HeaderInterceptor headerInterceptor) throws IOException { ServerBuilder builder = NettyServerBuilder.forAddress(parseEndpoint(endPoint)); Executor executor = MoreExecutors.directExecutor(); builder.executor(executor); - return builder.addService(service).build(); + return builder.addService(service).intercept(headerInterceptor).build(); } private static InetSocketAddress parseEndpoint(String endPoint) { @@ -212,4 +218,22 @@ private static InetSocketAddress parseEndpoint(String endPoint) { return new InetSocketAddress("localhost", 55678); } } + + private static class HeaderInterceptor implements ServerInterceptor { + @GuardedBy("this") + private final List receivedMetadata = new ArrayList<>(); + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + addReceivedMetadata(headers); + return next.startCall(call, headers); + } + + private synchronized void addReceivedMetadata(Metadata metadata) { + receivedMetadata.add(metadata); + } + + synchronized List getReceivedMetadata() { + return Collections.unmodifiableList(receivedMetadata); + } + } } From f0a6b0df03107e54a37f99f89ddef1f65964ee99 Mon Sep 17 00:00:00 2001 From: Nathan Dias Date: Fri, 19 Aug 2022 06:29:20 -0500 Subject: [PATCH 2/3] fix indentation --- .../opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java index d83bdeba7a..ed49b3c723 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporter.java @@ -81,7 +81,7 @@ public static void createAndRegister(OcAgentTraceExporterConfiguration configura configuration.getRetryInterval(), configuration.getEnableConfig(), configuration.getDeadline(), - configuration.getHeaders()); + configuration.getHeaders()); registerInternal(newHandler); } } From 1a254d7ac809531244de86f017de130b1cbfd309 Mon Sep 17 00:00:00 2001 From: Nathan Dias Date: Fri, 19 Aug 2022 06:34:14 -0500 Subject: [PATCH 3/3] run formatter --- .../OcAgentTraceExporterConfiguration.java | 5 +---- .../ocagent/OcAgentTraceExporterHandler.java | 5 +++-- ...OcAgentTraceExporterConfigurationTest.java | 2 +- .../OcAgentTraceExporterIntegrationTest.java | 21 ++++++++++++------- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java index 05cb801542..3d712f4114 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfiguration.java @@ -113,10 +113,7 @@ public abstract class OcAgentTraceExporterConfiguration { */ public abstract Duration getDeadline(); - /** - * Returns custom headers to attach as gRPC metadata. - * @return - */ + /** Returns custom headers to attach as gRPC metadata. */ public abstract ImmutableMap getHeaders(); /** diff --git a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java index 9eb871cc1a..c7ea267131 100644 --- a/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java +++ b/exporters/trace/ocagent/src/main/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterHandler.java @@ -122,9 +122,10 @@ private static TraceServiceGrpc.TraceServiceStub getTraceServiceStub( } ManagedChannel channel = channelBuilder.build(); Metadata metadata = new Metadata(); - for(Map.Entry e : headers.entrySet()) { + for (Map.Entry e : headers.entrySet()) { metadata.put(Metadata.Key.of(e.getKey(), Metadata.ASCII_STRING_MARSHALLER), e.getValue()); } - return TraceServiceGrpc.newStub(channel).withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); + return TraceServiceGrpc.newStub(channel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)); } } diff --git a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java index 82d22da78c..e986af7845 100644 --- a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java +++ b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterConfigurationTest.java @@ -60,7 +60,7 @@ public void setAndGet() throws SSLException { .setRetryInterval(oneMinute) .setEnableConfig(false) .setDeadline(oneMinute) - .addHeader("foo", "bar") + .addHeader("foo", "bar") .build(); assertThat(configuration.getEndPoint()).isEqualTo("192.168.0.1:50051"); assertThat(configuration.getServiceName()).isEqualTo("service"); diff --git a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java index d7c58895f4..83ffc4645d 100644 --- a/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java +++ b/exporters/trace/ocagent/src/test/java/io/opencensus/exporter/trace/ocagent/OcAgentTraceExporterIntegrationTest.java @@ -37,14 +37,13 @@ import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.Executor; +import javax.annotation.concurrent.GuardedBy; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import javax.annotation.concurrent.GuardedBy; - /** End-to-end integration test for {@link OcAgentTraceExporter}. */ @RunWith(JUnit4.class) public class OcAgentTraceExporterIntegrationTest { @@ -63,7 +62,10 @@ public void setUp() throws IOException { fakeOcAgentTraceServiceGrpc = new FakeOcAgentTraceServiceGrpcImpl(); headerInterceptor = new HeaderInterceptor(); agent = - getServer(OcAgentTraceExporterConfiguration.DEFAULT_END_POINT, fakeOcAgentTraceServiceGrpc, headerInterceptor); + getServer( + OcAgentTraceExporterConfiguration.DEFAULT_END_POINT, + fakeOcAgentTraceServiceGrpc, + headerInterceptor); } @After @@ -165,8 +167,9 @@ public void testExportSpans() throws InterruptedException, IOException { assertThat(exportedSpanNames).contains("second-iteration-child-" + i); } - for(Metadata metadata : headerInterceptor.getReceivedMetadata()) { - Metadata.Key key = Metadata.Key.of(TEST_METADATA_HEADER, Metadata.ASCII_STRING_MARSHALLER); + for (Metadata metadata : headerInterceptor.getReceivedMetadata()) { + Metadata.Key key = + Metadata.Key.of(TEST_METADATA_HEADER, Metadata.ASCII_STRING_MARSHALLER); assertThat(metadata.containsKey(key)).isTrue(); assertThat(metadata.get(key)).isEqualTo(TEST_METADATA_VALUE); } @@ -201,7 +204,9 @@ private void doWork(String spanName, int i) { } } - private static Server getServer(String endPoint, BindableService service, HeaderInterceptor headerInterceptor) throws IOException { + private static Server getServer( + String endPoint, BindableService service, HeaderInterceptor headerInterceptor) + throws IOException { ServerBuilder builder = NettyServerBuilder.forAddress(parseEndpoint(endPoint)); Executor executor = MoreExecutors.directExecutor(); builder.executor(executor); @@ -222,8 +227,10 @@ private static InetSocketAddress parseEndpoint(String endPoint) { private static class HeaderInterceptor implements ServerInterceptor { @GuardedBy("this") private final List receivedMetadata = new ArrayList<>(); + @Override - public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { addReceivedMetadata(headers); return next.startCall(call, headers); }