diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java index 917a44ab4f9..4e994eea822 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ProcessBesuNodeRunner.java @@ -204,6 +204,10 @@ public void startNode(final BesuNode node) { params.add("--metrics-category"); params.add(((Enum) category).name()); } + if (node.isMetricsEnabled() || metricsConfiguration.isPushEnabled()) { + params.add("--metrics-protocol"); + params.add(metricsConfiguration.getProtocol().name()); + } if (metricsConfiguration.isPushEnabled()) { params.add("--metrics-push-enabled"); params.add("--metrics-push-host"); diff --git a/acceptance-tests/tests/build.gradle b/acceptance-tests/tests/build.gradle index a2502913359..e4f26ba4670 100644 --- a/acceptance-tests/tests/build.gradle +++ b/acceptance-tests/tests/build.gradle @@ -24,6 +24,7 @@ dependencies { testImplementation project(path: ':ethereum:core', configuration: 'testSupportArtifacts') testImplementation project(':ethereum:permissioning') testImplementation project(':ethereum:rlp') + testImplementation project(':metrics:core') testImplementation project(':plugin-api') testImplementation project(':privacy-contracts') testImplementation project(':testutil') @@ -31,6 +32,15 @@ dependencies { testImplementation 'com.github.tomakehurst:wiremock-jre8-standalone' testImplementation 'commons-io:commons-io' + testImplementation 'io.grpc:grpc-core' + testImplementation 'io.grpc:grpc-netty' + testImplementation 'io.grpc:grpc-stub' + testImplementation 'io.netty:netty-all' + testImplementation 'io.opentelemetry:opentelemetry-api' + testImplementation 'io.opentelemetry:opentelemetry-proto' + testImplementation 'io.opentelemetry:opentelemetry-sdk' + testImplementation 'io.opentelemetry:opentelemetry-sdk-trace' + testImplementation 'io.opentelemetry:opentelemetry-exporter-otlp' testImplementation 'junit:junit' testImplementation 'net.consensys:orion' testImplementation 'org.apache.commons:commons-compress' diff --git a/acceptance-tests/tests/src/test/java/org/hyperledger/besu/tests/acceptance/OpenTelemetryAcceptanceTest.java b/acceptance-tests/tests/src/test/java/org/hyperledger/besu/tests/acceptance/OpenTelemetryAcceptanceTest.java new file mode 100644 index 00000000000..2496fb84ee6 --- /dev/null +++ b/acceptance-tests/tests/src/test/java/org/hyperledger/besu/tests/acceptance/OpenTelemetryAcceptanceTest.java @@ -0,0 +1,171 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.tests.acceptance; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +import org.hyperledger.besu.metrics.MetricsProtocol; +import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration; +import org.hyperledger.besu.tests.acceptance.dsl.AcceptanceTestBase; +import org.hyperledger.besu.tests.acceptance.dsl.WaitUtils; +import org.hyperledger.besu.tests.acceptance.dsl.node.BesuNode; +import org.hyperledger.besu.tests.acceptance.dsl.node.configuration.BesuNodeConfigurationBuilder; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.io.Closer; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse; +import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class OpenTelemetryAcceptanceTest extends AcceptanceTestBase { + + private static final class FakeCollector extends TraceServiceGrpc.TraceServiceImplBase { + private final List receivedSpans = new ArrayList<>(); + private Status returnedStatus = Status.OK; + + @Override + public void export( + final ExportTraceServiceRequest request, + final StreamObserver responseObserver) { + receivedSpans.addAll(request.getResourceSpansList()); + responseObserver.onNext(ExportTraceServiceResponse.newBuilder().build()); + if (!returnedStatus.isOk()) { + if (returnedStatus.getCode() == Status.Code.DEADLINE_EXCEEDED) { + // Do not call onCompleted to simulate a deadline exceeded. + return; + } + responseObserver.onError(returnedStatus.asRuntimeException()); + return; + } + responseObserver.onCompleted(); + } + + List getReceivedSpans() { + return receivedSpans; + } + + void setReturnedStatus(final Status returnedStatus) { + this.returnedStatus = returnedStatus; + } + } + + private static final class FakeMetricsCollector + extends MetricsServiceGrpc.MetricsServiceImplBase { + private final List receivedMetrics = new ArrayList<>(); + private Status returnedStatus = Status.OK; + + @Override + public void export( + final ExportMetricsServiceRequest request, + final StreamObserver responseObserver) { + + receivedMetrics.addAll(request.getResourceMetricsList()); + responseObserver.onNext(ExportMetricsServiceResponse.newBuilder().build()); + if (!returnedStatus.isOk()) { + if (returnedStatus.getCode() == Status.Code.DEADLINE_EXCEEDED) { + // Do not call onCompleted to simulate a deadline exceeded. + return; + } + responseObserver.onError(returnedStatus.asRuntimeException()); + return; + } + responseObserver.onCompleted(); + } + + List getReceivedMetrics() { + return receivedMetrics; + } + + void setReturnedStatus(final Status returnedStatus) { + this.returnedStatus = returnedStatus; + } + } + + private final FakeMetricsCollector fakeMetricsCollector = new FakeMetricsCollector(); + private final FakeCollector fakeTracesCollector = new FakeCollector(); + private final Closer closer = Closer.create(); + + private BesuNode metricsNode; + + @Before + public void setUp() throws Exception { + Server server = + NettyServerBuilder.forPort(4317) + .addService(fakeTracesCollector) + .addService(fakeMetricsCollector) + .build() + .start(); + closer.register(server::shutdownNow); + + MetricsConfiguration configuration = + MetricsConfiguration.builder() + .protocol(MetricsProtocol.OPENTELEMETRY) + .enabled(true) + .port(0) + .hostsAllowlist(singletonList("*")) + .build(); + metricsNode = + besu.create( + new BesuNodeConfigurationBuilder() + .name("metrics-node") + .jsonRpcEnabled() + .metricsConfiguration(configuration) + .build()); + cluster.start(metricsNode); + } + + @After + public void tearDown() throws Exception { + closer.close(); + } + + @Test + public void metricsReporting() { + WaitUtils.waitFor( + 30, + () -> { + List resourceMetrics = fakeMetricsCollector.getReceivedMetrics(); + assertThat(resourceMetrics.isEmpty()).isFalse(); + }); + } + + @Test + public void traceReporting() { + + WaitUtils.waitFor( + 30, + () -> { + // call the json RPC endpoint to generate a trace. + net.netVersion().verify(metricsNode); + List spans = fakeTracesCollector.getReceivedSpans(); + assertThat(spans.isEmpty()).isFalse(); + }); + } +} diff --git a/build.gradle b/build.gradle index 431248c23a7..b91e7355dd5 100644 --- a/build.gradle +++ b/build.gradle @@ -489,8 +489,6 @@ applicationDefaultJvmArgs = [ run { args project.hasProperty("besu.run.args") ? project.property("besu.run.args").toString().split("\\s+") : [] doFirst { - applicationDefaultJvmArgs.add(0, "-Dotel.resource.attributes=service.name=besu-dev") - applicationDefaultJvmArgs.add(0, "-javaagent:"+ configurations.javaAgent.singleFile.toPath() + "") applicationDefaultJvmArgs = applicationDefaultJvmArgs.collect { it.replace('BESU_HOME', "$buildDir/besu") } @@ -504,19 +502,6 @@ def tweakStartScript(createScriptTask) { createScriptTask.unixScript.text = createScriptTask.unixScript.text.replace('BESU_HOME', '\$APP_HOME') createScriptTask.windowsScript.text = createScriptTask.windowsScript.text.replace('BESU_HOME', '%~dp0..') - // OpenTelemetry Wiring for unix scripts - def agentFileName = configurations.javaAgent.singleFile.toPath().getFileName() - def unixRegex = $/exec "$$JAVACMD" /$ - def forwardSlash = "/" - def unixReplacement = $/if [ -n "$$TRACING" ];then - TRACING_AGENT="-javaagent:$$APP_HOME/agent${forwardSlash}${agentFileName}" -fi -exec "$$JAVACMD" $$TRACING_AGENT /$ - createScriptTask.unixScript.text = createScriptTask.unixScript.text.replace(unixRegex, unixReplacement) - // OpenTelemetry Wiring for windows scripts - def windowsRegex = $/"%JAVA_EXE%" %DEFAULT_JVM_OPTS%/$ - def windowsReplacement = $/if Defined TRACING (TRACING_AGENT="-javaagent:" "%APP_HOME%\agent\/$ + agentFileName + '")\r\n"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %TRACING_AGENT%' - createScriptTask.windowsScript.text = createScriptTask.windowsScript.text.replace(windowsRegex, windowsReplacement) // Prevent the error originating from the 8191 chars limit on Windows createScriptTask.windowsScript.text = @@ -790,14 +775,9 @@ tasks.register("verifyDistributions") { } } -configurations { - javaAgent -} - dependencies { implementation project(':besu') implementation project(':ethereum:evmtool') - javaAgent group: 'io.opentelemetry.instrumentation.auto', name: 'opentelemetry-javaagent', classifier: 'all' errorprone 'com.google.errorprone:error_prone_core' } @@ -808,7 +788,6 @@ distributions { from("build/reports/license/license-dependency.html") { into "." } from("./docs/GettingStartedBinaries.md") { into "." } from("./docs/DocsArchive0.8.0.html") { into "." } - from(configurations.javaAgent.singleFile.toPath()) { into "agent"} } } } diff --git a/docker/graalvm/Dockerfile b/docker/graalvm/Dockerfile index 983e410adc2..e813121d421 100644 --- a/docker/graalvm/Dockerfile +++ b/docker/graalvm/Dockerfile @@ -21,11 +21,7 @@ ENV BESU_RPC_HTTP_HOST 0.0.0.0 ENV BESU_RPC_WS_HOST 0.0.0.0 ENV BESU_GRAPHQL_HTTP_HOST 0.0.0.0 ENV BESU_PID_PATH "/tmp/pid" -# Tracing defaults -# To enable tracing, uncomment next line -#ENV TRACING=ENABLED -ENV OTEL_EXPORTER=otlp -ENV OTEL_OTLP_ENDPOINT="0.0.0.0:55680" + ENV OTEL_RESOURCE_ATTRIBUTES="service.name=besu-$VERSION" ENV PATH="/opt/besu/bin:${PATH}" diff --git a/docker/openjdk-11/Dockerfile b/docker/openjdk-11/Dockerfile index ada2e42d5fe..61da5830c5e 100644 --- a/docker/openjdk-11/Dockerfile +++ b/docker/openjdk-11/Dockerfile @@ -21,11 +21,7 @@ ENV BESU_RPC_HTTP_HOST 0.0.0.0 ENV BESU_RPC_WS_HOST 0.0.0.0 ENV BESU_GRAPHQL_HTTP_HOST 0.0.0.0 ENV BESU_PID_PATH "/tmp/pid" -# Tracing defaults -# To enable tracing, uncomment next line -#ENV TRACING=ENABLED -ENV OTEL_EXPORTER=otlp -ENV OTEL_OTLP_ENDPOINT="0.0.0.0:55680" + ENV OTEL_RESOURCE_ATTRIBUTES="service.name=besu-$VERSION" ENV PATH="/opt/besu/bin:${PATH}" diff --git a/docker/openjdk-latest/Dockerfile b/docker/openjdk-latest/Dockerfile index 74b65721f98..4d29841ca59 100644 --- a/docker/openjdk-latest/Dockerfile +++ b/docker/openjdk-latest/Dockerfile @@ -21,11 +21,7 @@ ENV BESU_RPC_HTTP_HOST 0.0.0.0 ENV BESU_RPC_WS_HOST 0.0.0.0 ENV BESU_GRAPHQL_HTTP_HOST 0.0.0.0 ENV BESU_PID_PATH "/tmp/pid" -# Tracing defaults -# To enable tracing, uncomment next line -#ENV TRACING=ENABLED -ENV OTEL_EXPORTER=otlp -ENV OTEL_OTLP_ENDPOINT="0.0.0.0:55680" + ENV OTEL_RESOURCE_ATTRIBUTES="service.name=besu-$VERSION" ENV PATH="/opt/besu/bin:${PATH}" diff --git a/docs/tracing/README.md b/docs/tracing/README.md index bbf1249dc53..0acac729d4f 100644 --- a/docs/tracing/README.md +++ b/docs/tracing/README.md @@ -10,7 +10,7 @@ To try out this example, start the Open Telemetry Collector and the Zipkin servi Start besu with: -`$> ./gradlew run --args="--network=dev --rpc-http-enabled"` +`$> OTEL_RESOURCE_ATTRIBUTES="service.name=besu-dev" OTEL_EXPORTER_OTLP_METRIC_INSECURE=true OTEL_EXPORTER_OTLP_SPAN_INSECURE=true ./gradlew run --args="--network=dev --rpc-http-enabled --metrics-enabled --metrics-protocol=opentelemetry"` Try interacting with the JSON-RPC API. Here is a simple example using cURL: @@ -19,3 +19,6 @@ Try interacting with the JSON-RPC API. Here is a simple example using cURL: Open the Zipkin UI by browsing to http://localhost:9411/ You will be able to see the detail of your traces. + +References: +* [OpenTelemetry Environment Variable Specification](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md) diff --git a/docs/tracing/docker-compose.yml b/docs/tracing/docker-compose.yml index 79e1ecf533a..7a351408447 100644 --- a/docs/tracing/docker-compose.yml +++ b/docs/tracing/docker-compose.yml @@ -1,7 +1,8 @@ version: "3" services: - otel-collector: - image: otel/opentelemetry-collector-contrib:0.11.0 + otelcollector: + image: otel/opentelemetry-collector-contrib:0.17.0 + container_name: otelcollector command: ["--config=/etc/otel-collector-config.yml"] volumes: - ./otel-collector-config.yml:/etc/otel-collector-config.yml @@ -10,13 +11,20 @@ services: - "8888:8888" # Prometheus metrics exposed by the collector - "8889:8889" # Prometheus exporter metrics - "13133:13133" # health_check extension - - "55680:55680" # zpages extension + - "4317:4317" # OTLP GRPC receiver depends_on: - zipkin + - metricsviewer - #Zipkin + # Zipkin zipkin: image: openzipkin/zipkin container_name: zipkin ports: - 9411:9411 + + metricsviewer: + image: docker.io/tmio/metrics-ui + container_name: metricsviewer + ports: + - 8080:8080 diff --git a/docs/tracing/otel-collector-config.yml b/docs/tracing/otel-collector-config.yml index 2d8f2d98390..86df95ea776 100644 --- a/docs/tracing/otel-collector-config.yml +++ b/docs/tracing/otel-collector-config.yml @@ -2,11 +2,17 @@ receivers: otlp: protocols: grpc: - http: exporters: zipkin: endpoint: "http://zipkin:9411/api/v2/spans" + otlp: + endpoint: "metricsviewer:4317" + insecure: true + logging: + loglevel: debug + sampling_initial: 5 + sampling_thereafter: 200 processors: batch: @@ -23,3 +29,7 @@ service: receivers: [otlp] exporters: [zipkin] processors: [batch] + metrics: + receivers: [otlp] + exporters: [otlp, logging] + processors: [batch] diff --git a/ethereum/api/build.gradle b/ethereum/api/build.gradle index caee178bbea..9dba37ab09f 100644 --- a/ethereum/api/build.gradle +++ b/ethereum/api/build.gradle @@ -49,6 +49,8 @@ dependencies { implementation 'com.google.guava:guava' implementation 'com.graphql-java:graphql-java' + implementation 'io.opentelemetry:opentelemetry-api' + implementation 'io.opentelemetry:opentelemetry-extension-trace-propagators' implementation 'io.vertx:vertx-auth-jwt' implementation 'io.vertx:vertx-core' implementation 'io.vertx:vertx-unit' diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java index 1a099d329a0..45f423a0392 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/JsonRpcHttpService.java @@ -61,11 +61,19 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; +import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import io.netty.handler.codec.http.HttpResponseStatus; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.extension.trace.propagation.B3Propagator; import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Handler; @@ -75,12 +83,14 @@ import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.DecodeException; import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.net.PfxOptions; +import io.vertx.core.net.SocketAddress; import io.vertx.ext.auth.User; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; @@ -93,17 +103,36 @@ public class JsonRpcHttpService { private static final Logger LOG = LogManager.getLogger(); + private static final String SPAN_CONTEXT = "span_context"; private static final InetSocketAddress EMPTY_SOCKET_ADDRESS = new InetSocketAddress("0.0.0.0", 0); private static final String APPLICATION_JSON = "application/json"; private static final JsonRpcResponse NO_RESPONSE = new JsonRpcNoResponse(); private static final String EMPTY_RESPONSE = ""; + private static final TextMapPropagator.Getter requestAttributesGetter = + new TextMapPropagator.Getter<>() { + @Override + public Iterable keys(final HttpServerRequest carrier) { + return carrier.headers().names(); + } + + @Nullable + @Override + public String get(final @Nullable HttpServerRequest carrier, final String key) { + if (carrier == null) { + return null; + } + return carrier.headers().get(key); + } + }; + private final Vertx vertx; private final JsonRpcConfiguration config; private final Map rpcMethods; private final NatService natService; private final Path dataDir; private final LabelledMetric requestTimer; + private final Tracer tracer; @VisibleForTesting public final Optional authenticationService; @@ -169,6 +198,7 @@ private JsonRpcHttpService( this.authenticationService = authenticationService; this.livenessService = livenessService; this.readinessService = readinessService; + this.tracer = GlobalOpenTelemetry.getTracer("org.hyperledger.besu.jsonrpc", "1.0.0"); } private void validateConfig(final JsonRpcConfiguration config) { @@ -229,6 +259,7 @@ public CompletableFuture start() { private Router buildRouter() { // Handle json rpc requests final Router router = Router.router(vertx); + router.route().handler(this::createSpan); // Verify Host header to avoid rebind attack. router.route().handler(checkAllowlistHostHeader()); @@ -279,6 +310,32 @@ private Router buildRouter() { return router; } + private void createSpan(final RoutingContext routingContext) { + final SocketAddress address = routingContext.request().connection().remoteAddress(); + + Context parent = + B3Propagator.getInstance() + .extract(Context.current(), routingContext.request(), requestAttributesGetter); + final Span serverSpan = + tracer + .spanBuilder(address.host() + ":" + address.port()) + .setParent(parent) + .setSpanKind(Span.Kind.SERVER) + .startSpan(); + routingContext.put(SPAN_CONTEXT, Context.root().with(serverSpan)); + + routingContext.addBodyEndHandler(event -> serverSpan.end()); + routingContext.addEndHandler( + event -> { + if (event.failed()) { + serverSpan.recordException(event.cause()); + serverSpan.setStatus(StatusCode.ERROR); + } + serverSpan.end(); + }); + routingContext.next(); + } + private HttpServerOptions getHttpServerOptions() { final HttpServerOptions httpServerOptions = new HttpServerOptions() @@ -590,40 +647,55 @@ private JsonRpcResponse process( } catch (final IllegalArgumentException exception) { return errorResponse(id, INVALID_REQUEST); } - // Handle notifications - if (requestBody.isNotification()) { - // Notifications aren't handled so create empty result for now. - return NO_RESPONSE; - } + Span span = + tracer + .spanBuilder(requestBody.getMethod()) + .setSpanKind(Span.Kind.INTERNAL) + .setParent(ctx.get(SPAN_CONTEXT)) + .startSpan(); + try { + // Handle notifications + if (requestBody.isNotification()) { + // Notifications aren't handled so create empty result for now. + return NO_RESPONSE; + } - final Optional unavailableMethod = validateMethodAvailability(requestBody); - if (unavailableMethod.isPresent()) { - return errorResponse(id, unavailableMethod.get()); - } + final Optional unavailableMethod = validateMethodAvailability(requestBody); + if (unavailableMethod.isPresent()) { + span.setStatus(StatusCode.ERROR, "method unavailable"); + return errorResponse(id, unavailableMethod.get()); + } - final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod()); + final JsonRpcMethod method = rpcMethods.get(requestBody.getMethod()); - if (AuthenticationUtils.isPermitted(authenticationService, user, method)) { - // Generate response - try (final OperationTimer.TimingContext ignored = - requestTimer.labels(requestBody.getMethod()).startTimer()) { - if (user.isPresent()) { + if (AuthenticationUtils.isPermitted(authenticationService, user, method)) { + // Generate response + try (final OperationTimer.TimingContext ignored = + requestTimer.labels(requestBody.getMethod()).startTimer()) { + if (user.isPresent()) { + return method.response( + new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed())); + } return method.response( - new JsonRpcRequestContext(requestBody, user.get(), () -> !ctx.response().closed())); + new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed())); + } catch (final InvalidJsonRpcParameters e) { + LOG.debug("Invalid Params", e); + span.setStatus(StatusCode.ERROR, "Invalid Params"); + return errorResponse(id, JsonRpcError.INVALID_PARAMS); + } catch (final MultiTenancyValidationException e) { + span.setStatus(StatusCode.ERROR, "Unauthorized"); + return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED); + } catch (final RuntimeException e) { + LOG.error("Error processing JSON-RPC requestBody", e); + span.setStatus(StatusCode.ERROR, "Error processing JSON-RPC requestBody"); + return errorResponse(id, JsonRpcError.INTERNAL_ERROR); } - return method.response( - new JsonRpcRequestContext(requestBody, () -> !ctx.response().closed())); - } catch (final InvalidJsonRpcParameters e) { - LOG.debug("Invalid Params", e); - return errorResponse(id, JsonRpcError.INVALID_PARAMS); - } catch (final MultiTenancyValidationException e) { + } else { + span.setStatus(StatusCode.ERROR, "Unauthorized"); return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED); - } catch (final RuntimeException e) { - LOG.error("Error processing JSON-RPC requestBody", e); - return errorResponse(id, JsonRpcError.INTERNAL_ERROR); } - } else { - return unauthorizedResponse(id, JsonRpcError.UNAUTHORIZED); + } finally { + span.end(); } } diff --git a/ethereum/core/build.gradle b/ethereum/core/build.gradle index a64a893c214..ab6c2ace82c 100644 --- a/ethereum/core/build.gradle +++ b/ethereum/core/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.google.guava:guava' + implementation 'io.opentelemetry:opentelemetry-api' implementation 'io.vertx:vertx-core' implementation 'net.java.dev.jna:jna' implementation 'org.apache.logging.log4j:log4j-api' diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java index cb92dd3aa2d..795b26e910e 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/mainnet/AbstractBlockProcessor.java @@ -34,6 +34,9 @@ import java.util.List; import com.google.common.collect.ImmutableList; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -50,6 +53,9 @@ TransactionReceipt create( private static final Logger LOG = LogManager.getLogger(); + private static final Tracer tracer = + OpenTelemetry.getGlobalTracer("org.hyperledger.besu.block", "1.0.0"); + static final int MAX_GENERATION = 6; public static class Result implements BlockProcessor.Result { @@ -121,66 +127,71 @@ public AbstractBlockProcessor.Result processBlock( final List transactions, final List ommers, final PrivateMetadataUpdater privateMetadataUpdater) { - - final List receipts = new ArrayList<>(); - long currentGasUsed = 0; - for (final Transaction transaction : transactions) { - final long remainingGasBudget = blockHeader.getGasLimit() - currentGasUsed; - if (!gasBudgetCalculator.hasBudget( - transaction, blockHeader.getNumber(), blockHeader.getGasLimit(), currentGasUsed)) { - LOG.info( - "Block processing error: transaction gas limit {} exceeds available block budget" - + " remaining {}. Block {} Transaction {}", - transaction.getGasLimit(), - remainingGasBudget, - blockHeader.getHash().toHexString(), - transaction.getHash().toHexString()); - return AbstractBlockProcessor.Result.failed(); + Span globalProcessBlock = + tracer.spanBuilder("processBlock").setSpanKind(Span.Kind.INTERNAL).startSpan(); + try { + final List receipts = new ArrayList<>(); + long currentGasUsed = 0; + for (final Transaction transaction : transactions) { + final long remainingGasBudget = blockHeader.getGasLimit() - currentGasUsed; + if (!gasBudgetCalculator.hasBudget( + transaction, blockHeader.getNumber(), blockHeader.getGasLimit(), currentGasUsed)) { + LOG.info( + "Block processing error: transaction gas limit {} exceeds available block budget" + + " remaining {}. Block {} Transaction {}", + transaction.getGasLimit(), + remainingGasBudget, + blockHeader.getHash().toHexString(), + transaction.getHash().toHexString()); + return AbstractBlockProcessor.Result.failed(); + } + + final WorldUpdater worldStateUpdater = worldState.updater(); + final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain); + final Address miningBeneficiary = + miningBeneficiaryCalculator.calculateBeneficiary(blockHeader); + + final TransactionProcessingResult result = + transactionProcessor.processTransaction( + blockchain, + worldStateUpdater, + blockHeader, + transaction, + miningBeneficiary, + OperationTracer.NO_TRACING, + blockHashLookup, + true, + TransactionValidationParams.processingBlock(), + privateMetadataUpdater); + if (result.isInvalid()) { + LOG.info( + "Block processing error: transaction invalid '{}'. Block {} Transaction {}", + result.getValidationResult().getInvalidReason(), + blockHeader.getHash().toHexString(), + transaction.getHash().toHexString()); + return AbstractBlockProcessor.Result.failed(); + } + + worldStateUpdater.commit(); + + currentGasUsed += transaction.getGasLimit() - result.getGasRemaining(); + + final TransactionReceipt transactionReceipt = + transactionReceiptFactory.create( + transaction.getType(), result, worldState, currentGasUsed); + receipts.add(transactionReceipt); } - final WorldUpdater worldStateUpdater = worldState.updater(); - final BlockHashLookup blockHashLookup = new BlockHashLookup(blockHeader, blockchain); - final Address miningBeneficiary = - miningBeneficiaryCalculator.calculateBeneficiary(blockHeader); - - final TransactionProcessingResult result = - transactionProcessor.processTransaction( - blockchain, - worldStateUpdater, - blockHeader, - transaction, - miningBeneficiary, - OperationTracer.NO_TRACING, - blockHashLookup, - true, - TransactionValidationParams.processingBlock(), - privateMetadataUpdater); - if (result.isInvalid()) { - LOG.info( - "Block processing error: transaction invalid '{}'. Block {} Transaction {}", - result.getValidationResult().getInvalidReason(), - blockHeader.getHash().toHexString(), - transaction.getHash().toHexString()); + if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) { + // no need to log, rewardCoinbase logs the error. return AbstractBlockProcessor.Result.failed(); } - worldStateUpdater.commit(); - - currentGasUsed += transaction.getGasLimit() - result.getGasRemaining(); - - final TransactionReceipt transactionReceipt = - transactionReceiptFactory.create( - transaction.getType(), result, worldState, currentGasUsed); - receipts.add(transactionReceipt); - } - - if (!rewardCoinbase(worldState, blockHeader, ommers, skipZeroBlockRewards)) { - // no need to log, rewardCoinbase logs the error. - return AbstractBlockProcessor.Result.failed(); + worldState.persist(blockHeader.getHash()); + return AbstractBlockProcessor.Result.successful(receipts); + } finally { + globalProcessBlock.end(); } - - worldState.persist(blockHeader.getHash()); - return AbstractBlockProcessor.Result.successful(receipts); } protected MiningBeneficiaryCalculator getMiningBeneficiaryCalculator() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java index 4e40a4971d8..26723804f6a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fastsync/FastSyncDownloadPipelineFactory.java @@ -144,7 +144,9 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncT "chain_download_pipeline_processed_total", "Number of entries process by each chain download pipeline stage", "step", - "action")) + "action"), + true, + "fastSync") .thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism) .thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize) .inBatches(headerRequestSize) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java index 075ca19ccd0..34c5d1f5de5 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/fullsync/FullSyncDownloadPipelineFactory.java @@ -101,7 +101,9 @@ public Pipeline createDownloadPipelineForSyncTarget(final SyncTarget target) "chain_download_pipeline_processed_total", "Number of entries process by each chain download pipeline stage", "step", - "action")) + "action"), + true, + "fullSync") .thenProcessAsyncOrdered("downloadHeaders", downloadHeadersStep, downloaderParallelism) .thenFlatMap("validateHeadersJoin", validateHeadersJoinUpStep, singleHeaderBufferSize) .inBatches(headerRequestSize) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java index 69f07c1c091..ec06e789b64 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/worldstate/WorldStateDownloadProcess.java @@ -206,7 +206,7 @@ public WorldStateDownloadProcess build() { final Pipeline> completionPipeline = PipelineBuilder.>createPipeline( - "requestDataAvailable", bufferCapacity, outputCounter) + "requestDataAvailable", bufferCapacity, outputCounter, true, "node_data_request") .andFinishWith( "requestCompleteTask", task -> @@ -219,7 +219,9 @@ public WorldStateDownloadProcess build() { "requestDequeued", new TaskQueueIterator(downloadState), bufferCapacity, - outputCounter) + outputCounter, + true, + "world_state_download") .thenFlatMapInParallel( "requestLoadLocalData", task -> loadLocalDataStep.loadLocalData(task, requestsToComplete), diff --git a/ethereum/trie/build.gradle b/ethereum/trie/build.gradle index 7bcf850c15b..f5d00fbc868 100644 --- a/ethereum/trie/build.gradle +++ b/ethereum/trie/build.gradle @@ -33,6 +33,7 @@ dependencies { implementation project(':services:kvstore') implementation 'com.google.guava:guava' + implementation 'io.opentelemetry:opentelemetry-api' implementation 'org.apache.tuweni:bytes' implementation 'org.bouncycastle:bcprov-jdk15on' diff --git a/gradle/versions.gradle b/gradle/versions.gradle index e0260acf034..e6959fe459a 100644 --- a/gradle/versions.gradle +++ b/gradle/versions.gradle @@ -47,18 +47,25 @@ dependencyManagement { dependency 'info.picocli:picocli:4.5.0' - dependency 'io.grpc:grpc-netty:1.33.0' + dependency 'io.grpc:grpc-core:1.33.1' + dependency 'io.grpc:grpc-netty:1.33.1' + dependency 'io.grpc:grpc-stub:1.33.1' + dependency 'io.netty:netty-tcnative-boringssl-static:2.0.31.Final' + dependency group: 'io.netty', name: 'netty-transport-native-epoll', version:'4.1.54.Final', classifier: 'linux-x86_64' + dependency 'io.netty:netty-all:4.1.54.Final' + dependency 'io.kubernetes:client-java:5.0.0' dependency 'io.pkts:pkts-core:3.0.7' - dependency group: 'io.opentelemetry.instrumentation.auto', name: 'opentelemetry-javaagent', version: '0.8.0', classifier: 'all' - - dependency 'io.opentelemetry:opentelemetry-api:0.9.1' - dependency 'io.opentelemetry:opentelemetry-sdk:0.9.1' - dependency 'io.opentelemetry:opentelemetry-exporters-otlp:0.9.1' - dependency 'io.opentelemetry:opentelemetry-extension-runtime-metrics:0.9.1' + dependency 'io.opentelemetry:opentelemetry-api:0.13.1' + dependency 'io.opentelemetry:opentelemetry-proto:0.13.1' + dependency 'io.opentelemetry:opentelemetry-sdk:0.13.1' + dependency 'io.opentelemetry:opentelemetry-sdk-trace:0.13.1' + dependency 'io.opentelemetry:opentelemetry-exporter-otlp:0.13.1' + dependency 'io.opentelemetry:opentelemetry-exporter-otlp-metrics:0.13.1-alpha' + dependency 'io.opentelemetry:opentelemetry-extension-trace-propagators:0.13.1' dependency 'io.prometheus:simpleclient:0.9.0' dependency 'io.prometheus:simpleclient_common:0.9.0' diff --git a/metrics/core/build.gradle b/metrics/core/build.gradle index 6be781d625d..87460afb830 100644 --- a/metrics/core/build.gradle +++ b/metrics/core/build.gradle @@ -39,10 +39,15 @@ dependencies { implementation 'com.google.guava:guava' implementation 'io.grpc:grpc-netty' + implementation 'io.grpc:grpc-core' + implementation 'io.netty:netty-tcnative-boringssl-static' + implementation 'io.netty:netty-transport-native-epoll' + implementation 'io.netty:netty-all' implementation 'io.opentelemetry:opentelemetry-api' implementation 'io.opentelemetry:opentelemetry-sdk' - implementation 'io.opentelemetry:opentelemetry-extension-runtime-metrics' - implementation 'io.opentelemetry:opentelemetry-exporters-otlp' + implementation 'io.opentelemetry:opentelemetry-sdk-trace' + implementation 'io.opentelemetry:opentelemetry-exporter-otlp' + implementation 'io.opentelemetry:opentelemetry-exporter-otlp-metrics' implementation 'io.prometheus:simpleclient' implementation 'io.prometheus:simpleclient_common' diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsService.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsService.java index 57bdaf94c60..82f5fa887e2 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsService.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsService.java @@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; import io.vertx.core.Vertx; +import org.apache.logging.log4j.LogManager; /** * Service responsible for exposing metrics to the outside, either through a port and network @@ -36,6 +37,7 @@ static Optional create( final Vertx vertx, final MetricsConfiguration configuration, final MetricsSystem metricsSystem) { + LogManager.getLogger().trace("Creating metrics service {}", configuration.getProtocol()); if (configuration.getProtocol() == MetricsProtocol.PROMETHEUS) { if (configuration.isEnabled()) { return Optional.of(new MetricsHttpService(vertx, configuration, metricsSystem)); diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsSystemFactory.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsSystemFactory.java index b80b6179734..872ad3ab22a 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsSystemFactory.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/MetricsSystemFactory.java @@ -22,9 +22,14 @@ import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration; import org.hyperledger.besu.metrics.prometheus.PrometheusMetricsSystem; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + /** Creates a new metric system based on configuration. */ public class MetricsSystemFactory { + private static final Logger LOG = LogManager.getLogger(); + private MetricsSystemFactory() {} /** @@ -34,6 +39,7 @@ private MetricsSystemFactory() {} * @return a new metric system */ public static ObservableMetricsSystem create(final MetricsConfiguration metricsConfiguration) { + LOG.trace("Creating a metric system with {}", metricsConfiguration.getProtocol()); if (!metricsConfiguration.isEnabled() && !metricsConfiguration.isPushEnabled()) { return new NoOpMetricsSystem(); } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/MetricsOtelGrpcPushService.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/MetricsOtelGrpcPushService.java index 337d83388e4..15d00fe9883 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/MetricsOtelGrpcPushService.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/MetricsOtelGrpcPushService.java @@ -22,23 +22,35 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import io.opentelemetry.exporters.otlp.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.export.IntervalMetricReader; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class MetricsOtelGrpcPushService implements MetricsService { + private static final Logger LOG = LogManager.getLogger(); + private final MetricsConfiguration configuration; private final OpenTelemetrySystem metricsSystem; private IntervalMetricReader periodicReader; + private SpanProcessor spanProcessor; public MetricsOtelGrpcPushService( final MetricsConfiguration configuration, final OpenTelemetrySystem metricsSystem) { + this.configuration = configuration; this.metricsSystem = metricsSystem; } @Override public CompletableFuture start() { + LOG.info("Starting OpenTelemetry push service"); OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.getDefault(); IntervalMetricReader.Builder builder = IntervalMetricReader.builder() @@ -49,6 +61,14 @@ public CompletableFuture start() { Collections.singleton(metricsSystem.getMeterSdkProvider().getMetricProducer())) .setMetricExporter(exporter); this.periodicReader = builder.build(); + this.spanProcessor = + BatchSpanProcessor.builder( + OtlpGrpcSpanExporter.builder() + .readSystemProperties() + .readEnvironmentVariables() + .build()) + .build(); + OpenTelemetrySdk.get().getTracerManagement().addSpanProcessor(spanProcessor); return CompletableFuture.completedFuture(null); } @@ -57,6 +77,12 @@ public CompletableFuture stop() { if (periodicReader != null) { periodicReader.shutdown(); } + if (spanProcessor != null) { + CompletableResultCode result = spanProcessor.shutdown(); + CompletableFuture future = new CompletableFuture<>(); + result.whenComplete(() -> future.complete(null)); + return future; + } return CompletableFuture.completedFuture(null); } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryCounter.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryCounter.java index 73a718357d4..78b297c6201 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryCounter.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryCounter.java @@ -20,8 +20,8 @@ import java.util.ArrayList; import java.util.List; -import io.opentelemetry.common.Labels; -import io.opentelemetry.metrics.LongCounter; +import io.opentelemetry.api.common.Labels; +import io.opentelemetry.api.metrics.LongCounter; public class OpenTelemetryCounter implements LabelledMetric { diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetrySystem.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetrySystem.java index 4f1c9fd06f0..717c7d0f0c0 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetrySystem.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetrySystem.java @@ -40,21 +40,23 @@ import java.util.stream.Stream; import com.google.common.collect.ImmutableSet; -import io.opentelemetry.common.Attributes; -import io.opentelemetry.common.Labels; -import io.opentelemetry.metrics.DoubleValueObserver; -import io.opentelemetry.metrics.DoubleValueRecorder; -import io.opentelemetry.metrics.LongCounter; -import io.opentelemetry.metrics.LongSumObserver; -import io.opentelemetry.metrics.LongUpDownSumObserver; -import io.opentelemetry.metrics.Meter; -import io.opentelemetry.sdk.metrics.MeterSdkProvider; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.Labels; +import io.opentelemetry.api.metrics.DoubleValueRecorder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.resources.ResourceAttributes; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /** Metrics system relying on the native OpenTelemetry format. */ public class OpenTelemetrySystem implements ObservableMetricsSystem { + + private static final Logger LOG = LogManager.getLogger(); + private static final String TYPE_LABEL_KEY = "type"; private static final String AREA_LABEL_KEY = "area"; private static final String POOL_LABEL_KEY = "pool"; @@ -69,25 +71,24 @@ public class OpenTelemetrySystem implements ObservableMetricsSystem { private final Map> cachedCounters = new ConcurrentHashMap<>(); private final Map> cachedTimers = new ConcurrentHashMap<>(); - private final MeterSdkProvider meterSdkProvider; + private final SdkMeterProvider meterSdkProvider; public OpenTelemetrySystem( final Set enabledCategories, final boolean timersEnabled, final String jobName) { + LOG.info("Starting OpenTelemetry metrics system"); this.enabledCategories = ImmutableSet.copyOf(enabledCategories); this.timersEnabled = timersEnabled; Resource resource = Resource.getDefault() .merge( Resource.create( - Attributes.newBuilder() - .setAttribute(ResourceAttributes.SERVICE_NAME, jobName) - .build())); - this.meterSdkProvider = MeterSdkProvider.builder().setResource(resource).build(); + Attributes.builder().put(ResourceAttributes.SERVICE_NAME, jobName).build())); + this.meterSdkProvider = SdkMeterProvider.builder().setResource(resource).build(); } - MeterSdkProvider getMeterSdkProvider() { + SdkMeterProvider getMeterSdkProvider() { return meterSdkProvider; } @@ -132,14 +133,13 @@ private MetricCategory categoryNameToMetricCategory(final String name) { private Object extractValue(final MetricData.Type type, final MetricData.Point point) { switch (type) { - case NON_MONOTONIC_LONG: - case MONOTONIC_LONG: + case LONG_GAUGE: + case LONG_SUM: return ((MetricData.LongPoint) point).getValue(); - case NON_MONOTONIC_DOUBLE: - case MONOTONIC_DOUBLE: + case DOUBLE_GAUGE: return ((MetricData.DoublePoint) point).getValue(); case SUMMARY: - return ((MetricData.SummaryPoint) point).getPercentileValues(); + return ((MetricData.DoubleSummaryPoint) point).getPercentileValues(); default: throw new UnsupportedOperationException("Unsupported type " + type); } @@ -151,6 +151,7 @@ public LabelledMetric createLabelledCounter( final String name, final String help, final String... labelNames) { + LOG.trace("Creating a counter {}", name); return cachedCounters.computeIfAbsent( name, (k) -> { @@ -171,6 +172,7 @@ public LabelledMetric createLabelledTimer( final String name, final String help, final String... labelNames) { + LOG.trace("Creating a timer {}", name); return cachedTimers.computeIfAbsent( name, (k) -> { @@ -192,11 +194,17 @@ public void createGauge( final String name, final String help, final DoubleSupplier valueSupplier) { + LOG.trace("Creating a gauge {}", name); if (isCategoryEnabled(category)) { final Meter meter = meterSdkProvider.get(category.getName()); - DoubleValueObserver observer = - meter.doubleValueObserverBuilder(name).setDescription(help).build(); - observer.setCallback(result -> result.observe(valueSupplier.getAsDouble(), Labels.empty())); + meter + .doubleValueObserverBuilder(name) + .setDescription(help) + .setUpdater( + res -> { + res.observe(valueSupplier.getAsDouble(), Labels.empty()); + }) + .build(); } } @@ -217,31 +225,22 @@ private void collectGC() { final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); final List poolBeans = ManagementFactory.getMemoryPoolMXBeans(); final Meter meter = meterSdkProvider.get(StandardMetricCategory.JVM.getName()); - final LongSumObserver gcMetric = - meter - .longSumObserverBuilder("jvm.gc.collection") - .setDescription("Time spent in a given JVM garbage collector in milliseconds.") - .setUnit("ms") - .build(); final List labelSets = new ArrayList<>(garbageCollectors.size()); for (final GarbageCollectorMXBean gc : garbageCollectors) { labelSets.add(Labels.of("gc", gc.getName())); } - - gcMetric.setCallback( - resultLongObserver -> { - for (int i = 0; i < garbageCollectors.size(); i++) { - resultLongObserver.observe( - garbageCollectors.get(i).getCollectionTime(), labelSets.get(i)); - } - }); - - final LongUpDownSumObserver areaMetric = - meter - .longUpDownSumObserverBuilder("jvm.memory.area") - .setDescription("Bytes of a given JVM memory area.") - .setUnit("By") - .build(); + meter + .longSumObserverBuilder("jvm.gc.collection") + .setDescription("Time spent in a given JVM garbage collector in milliseconds.") + .setUnit("ms") + .setUpdater( + resultLongObserver -> { + for (int i = 0; i < garbageCollectors.size(); i++) { + resultLongObserver.observe( + garbageCollectors.get(i).getCollectionTime(), labelSets.get(i)); + } + }) + .build(); final Labels usedHeap = Labels.of(TYPE_LABEL_KEY, USED, AREA_LABEL_KEY, HEAP); final Labels usedNonHeap = Labels.of(TYPE_LABEL_KEY, USED, AREA_LABEL_KEY, NON_HEAP); final Labels committedHeap = Labels.of(TYPE_LABEL_KEY, COMMITTED, AREA_LABEL_KEY, HEAP); @@ -249,24 +248,22 @@ private void collectGC() { // TODO: Decide if max is needed or not. May be derived with some approximation from max(used). final Labels maxHeap = Labels.of(TYPE_LABEL_KEY, MAX, AREA_LABEL_KEY, HEAP); final Labels maxNonHeap = Labels.of(TYPE_LABEL_KEY, MAX, AREA_LABEL_KEY, NON_HEAP); - areaMetric.setCallback( - resultLongObserver -> { - MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage(); - MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage(); - resultLongObserver.observe(heapUsage.getUsed(), usedHeap); - resultLongObserver.observe(nonHeapUsage.getUsed(), usedNonHeap); - resultLongObserver.observe(heapUsage.getUsed(), committedHeap); - resultLongObserver.observe(nonHeapUsage.getUsed(), committedNonHeap); - resultLongObserver.observe(heapUsage.getUsed(), maxHeap); - resultLongObserver.observe(nonHeapUsage.getUsed(), maxNonHeap); - }); - - final LongUpDownSumObserver poolMetric = - meter - .longUpDownSumObserverBuilder("jvm.memory.pool") - .setDescription("Bytes of a given JVM memory pool.") - .setUnit("By") - .build(); + meter + .longUpDownSumObserverBuilder("jvm.memory.area") + .setDescription("Bytes of a given JVM memory area.") + .setUnit("By") + .setUpdater( + resultLongObserver -> { + MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage(); + MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage(); + resultLongObserver.observe(heapUsage.getUsed(), usedHeap); + resultLongObserver.observe(nonHeapUsage.getUsed(), usedNonHeap); + resultLongObserver.observe(heapUsage.getUsed(), committedHeap); + resultLongObserver.observe(nonHeapUsage.getUsed(), committedNonHeap); + resultLongObserver.observe(heapUsage.getUsed(), maxHeap); + resultLongObserver.observe(nonHeapUsage.getUsed(), maxNonHeap); + }) + .build(); final List usedLabelSets = new ArrayList<>(poolBeans.size()); final List committedLabelSets = new ArrayList<>(poolBeans.size()); final List maxLabelSets = new ArrayList<>(poolBeans.size()); @@ -275,16 +272,22 @@ private void collectGC() { committedLabelSets.add(Labels.of(TYPE_LABEL_KEY, COMMITTED, POOL_LABEL_KEY, pool.getName())); maxLabelSets.add(Labels.of(TYPE_LABEL_KEY, MAX, POOL_LABEL_KEY, pool.getName())); } - poolMetric.setCallback( - resultLongObserver -> { - for (int i = 0; i < poolBeans.size(); i++) { - MemoryUsage poolUsage = poolBeans.get(i).getUsage(); - resultLongObserver.observe(poolUsage.getUsed(), usedLabelSets.get(i)); - resultLongObserver.observe(poolUsage.getCommitted(), committedLabelSets.get(i)); - // TODO: Decide if max is needed or not. May be derived with some approximation from - // max(used). - resultLongObserver.observe(poolUsage.getMax(), maxLabelSets.get(i)); - } - }); + + meter + .longUpDownSumObserverBuilder("jvm.memory.pool") + .setDescription("Bytes of a given JVM memory pool.") + .setUnit("By") + .setUpdater( + resultLongObserver -> { + for (int i = 0; i < poolBeans.size(); i++) { + MemoryUsage poolUsage = poolBeans.get(i).getUsage(); + resultLongObserver.observe(poolUsage.getUsed(), usedLabelSets.get(i)); + resultLongObserver.observe(poolUsage.getCommitted(), committedLabelSets.get(i)); + // TODO: Decide if max is needed or not. May be derived with some approximation from + // max(used). + resultLongObserver.observe(poolUsage.getMax(), maxLabelSets.get(i)); + } + }) + .build(); } } diff --git a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryTimer.java b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryTimer.java index 359ad82b7c2..1ae1bc475a5 100644 --- a/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryTimer.java +++ b/metrics/core/src/main/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryTimer.java @@ -20,8 +20,8 @@ import java.util.ArrayList; import java.util.List; -import io.opentelemetry.common.Labels; -import io.opentelemetry.metrics.DoubleValueRecorder; +import io.opentelemetry.api.common.Labels; +import io.opentelemetry.api.metrics.DoubleValueRecorder; public class OpenTelemetryTimer implements LabelledMetric { diff --git a/metrics/core/src/test/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryMetricsSystemTest.java b/metrics/core/src/test/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryMetricsSystemTest.java index b3a91723895..93323116deb 100644 --- a/metrics/core/src/test/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryMetricsSystemTest.java +++ b/metrics/core/src/test/java/org/hyperledger/besu/metrics/opentelemetry/OpenTelemetryMetricsSystemTest.java @@ -34,13 +34,10 @@ import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; import org.hyperledger.besu.plugin.services.metrics.OperationTimer; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.List; import com.google.common.collect.ImmutableSet; -import io.opentelemetry.sdk.metrics.data.MetricData; import org.junit.Test; public class OpenTelemetryMetricsSystemTest { @@ -174,13 +171,10 @@ public void shouldCreateObservationFromGauge() { .build(); final ObservableMetricsSystem localMetricSystem = MetricsSystemFactory.create(metricsConfiguration); - localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7d); - List values = new ArrayList<>(); - values.add(MetricData.ValueAtPercentile.create(0, 7d)); - values.add(MetricData.ValueAtPercentile.create(100, 7d)); + localMetricSystem.createGauge(RPC, "myValue", "Help", () -> 7.0); assertThat(localMetricSystem.streamObservations()) - .containsExactlyInAnyOrder(new Observation(RPC, "myValue", values, emptyList())); + .containsExactlyInAnyOrder(new Observation(RPC, "myValue", 7.0, emptyList())); } @Test diff --git a/metrics/core/src/test/java/org/hyperledger/besu/metrics/prometheus/PrometheusMetricsSystemTest.java b/metrics/core/src/test/java/org/hyperledger/besu/metrics/prometheus/PrometheusMetricsSystemTest.java index 59dea6114f5..8d9aa601bf4 100644 --- a/metrics/core/src/test/java/org/hyperledger/besu/metrics/prometheus/PrometheusMetricsSystemTest.java +++ b/metrics/core/src/test/java/org/hyperledger/besu/metrics/prometheus/PrometheusMetricsSystemTest.java @@ -57,11 +57,11 @@ public void shouldCreateObservationFromCounter() { counter.inc(); assertThat(metricsSystem.streamObservations()) - .containsExactly(new Observation(PEERS, "connected", 1d, emptyList())); + .containsExactly(new Observation(PEERS, "connected", 1.0, emptyList())); counter.inc(); assertThat(metricsSystem.streamObservations()) - .containsExactly(new Observation(PEERS, "connected", 2d, emptyList())); + .containsExactly(new Observation(PEERS, "connected", 2.0, emptyList())); } @Test @@ -74,11 +74,11 @@ public void shouldHandleDuplicateCounterCreation() { counter1.labels().inc(); assertThat(metricsSystem.streamObservations()) - .containsExactly(new Observation(PEERS, "connected", 1d, emptyList())); + .containsExactly(new Observation(PEERS, "connected", 1.0, emptyList())); counter2.labels().inc(); assertThat(metricsSystem.streamObservations()) - .containsExactly(new Observation(PEERS, "connected", 2d, emptyList())); + .containsExactly(new Observation(PEERS, "connected", 2.0, emptyList())); } @Test @@ -92,8 +92,8 @@ public void shouldCreateSeparateObservationsForEachCounterLabelValue() { assertThat(metricsSystem.streamObservations()) .containsExactlyInAnyOrder( - new Observation(PEERS, "connected", 2d, singletonList("value1")), - new Observation(PEERS, "connected", 1d, singletonList("value2"))); + new Observation(PEERS, "connected", 2.0, singletonList("value1")), + new Observation(PEERS, "connected", 1.0, singletonList("value2"))); } @Test @@ -102,11 +102,11 @@ public void shouldIncrementCounterBySpecifiedAmount() { counter.inc(5); assertThat(metricsSystem.streamObservations()) - .containsExactly(new Observation(PEERS, "connected", 5d, emptyList())); + .containsExactly(new Observation(PEERS, "connected", 5.0, emptyList())); counter.inc(6); assertThat(metricsSystem.streamObservations()) - .containsExactly(new Observation(PEERS, "connected", 11d, emptyList())); + .containsExactly(new Observation(PEERS, "connected", 11.0, emptyList())); } @Test @@ -174,10 +174,10 @@ public void shouldNotCreateObservationsFromTimerWhenTimersDisabled() { @Test public void shouldCreateObservationFromGauge() { - metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d); + metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0); assertThat(metricsSystem.streamObservations()) - .containsExactlyInAnyOrder(new Observation(JVM, "myValue", 7d, emptyList())); + .containsExactlyInAnyOrder(new Observation(JVM, "myValue", 7.0, emptyList())); } @Test @@ -185,8 +185,8 @@ public void shouldNotAllowDuplicateGaugeCreation() { // Gauges have a reference to the source of their data so creating it twice will still only // pull data from the first instance, possibly leaking memory and likely returning the wrong // results. - metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d); - assertThatThrownBy(() -> metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7d)) + metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0); + assertThatThrownBy(() -> metricsSystem.createGauge(JVM, "myValue", "Help", () -> 7.0)) .isInstanceOf(IllegalArgumentException.class); } diff --git a/plugins/rocksdb/build.gradle b/plugins/rocksdb/build.gradle index 1968b6682ec..1fa87ba4b59 100644 --- a/plugins/rocksdb/build.gradle +++ b/plugins/rocksdb/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.google.guava:guava' implementation 'info.picocli:picocli' + implementation 'io.opentelemetry:opentelemetry-api' implementation 'io.prometheus:simpleclient' implementation 'org.apache.logging.log4j:log4j-api' implementation 'org.apache.tuweni:bytes' diff --git a/services/pipeline/build.gradle b/services/pipeline/build.gradle index 1dc2cd1ba0a..7b208ee4a89 100644 --- a/services/pipeline/build.gradle +++ b/services/pipeline/build.gradle @@ -33,6 +33,7 @@ dependencies { implementation project(':metrics:core') implementation project(':plugin-api') + implementation 'io.opentelemetry:opentelemetry-api' implementation 'com.google.guava:guava' implementation 'org.apache.logging.log4j:log4j-api' diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipeline.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipeline.java index c3059309984..7ca90dcbc2d 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipeline.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/Pipeline.java @@ -27,6 +27,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,6 +41,8 @@ public class Pipeline { private final Collection> pipes; private final CompleterStage completerStage; private final AtomicBoolean started = new AtomicBoolean(false); + private final Tracer tracer = + OpenTelemetry.getGlobalTracer("org.hyperledger.besu.services.pipeline", "1.0.0"); /** * Flags that the pipeline is being completed so that when we abort we can close the streams @@ -47,14 +53,20 @@ public class Pipeline { private final AtomicBoolean completing = new AtomicBoolean(false); private final CompletableFuture overallFuture = new CompletableFuture<>(); + private final String name; + private final boolean tracingEnabled; private volatile List> futures; Pipeline( final Pipe inputPipe, + final String name, + final boolean tracingEnabled, final Collection stages, final Collection> pipes, final CompleterStage completerStage) { this.inputPipe = inputPipe; + this.tracingEnabled = tracingEnabled; + this.name = name; this.stages = stages; this.pipes = pipes; this.completerStage = completerStage; @@ -123,12 +135,24 @@ public void abort() { private Future runWithErrorHandling(final ExecutorService executorService, final Stage task) { return executorService.submit( () -> { + Span taskSpan = null; + if (tracingEnabled) { + taskSpan = + tracer + .spanBuilder(task.getName()) + .setAttribute("pipeline", name) + .setSpanKind(Span.Kind.INTERNAL) + .startSpan(); + } final Thread thread = Thread.currentThread(); final String originalName = thread.getName(); try { thread.setName(originalName + " (" + task.getName() + ")"); task.run(); } catch (final Throwable t) { + if (tracingEnabled) { + taskSpan.setStatus(StatusCode.ERROR); + } LOG.debug("Unhandled exception in pipeline. Aborting.", t); try { abort(t); @@ -139,6 +163,9 @@ private Future runWithErrorHandling(final ExecutorService executorService, fi LOG.error("Failed to abort pipeline after error", t2); } } finally { + if (tracingEnabled) { + taskSpan.end(); + } thread.setName(originalName); } }); diff --git a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java index 0bff48c36b4..61dcdac8831 100644 --- a/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java +++ b/services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java @@ -51,6 +51,8 @@ public class PipelineBuilder { private final ReadPipe pipeEnd; private final int bufferSize; private final LabelledMetric outputCounter; + private final boolean tracingEnabled; + private final String pipelineName; public PipelineBuilder( final Pipe inputPipe, @@ -59,7 +61,9 @@ public PipelineBuilder( final String lastStageName, final ReadPipe pipeEnd, final int bufferSize, - final LabelledMetric outputCounter) { + final LabelledMetric outputCounter, + final boolean tracingEnabled, + final String pipelineName) { checkArgument(!pipes.isEmpty(), "Must have at least one pipe in a pipeline"); this.lastStageName = lastStageName; this.outputCounter = outputCounter; @@ -68,6 +72,8 @@ public PipelineBuilder( this.pipes = pipes; this.pipeEnd = pipeEnd; this.bufferSize = bufferSize; + this.tracingEnabled = tracingEnabled; + this.pipelineName = pipelineName; } /** @@ -81,17 +87,29 @@ public PipelineBuilder( * @param itemCounter the counter to increment for each output of a stage. Must accept two labels, * the stage name and action (output or drained). * @param the type of items input into the pipeline. + * @param tracingEnabled whether this pipeline should be traced + * @param pipelineName the name of the pipeline for tracing purposes * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ public static PipelineBuilder createPipelineFrom( final String sourceName, final Iterator source, final int bufferSize, - final LabelledMetric itemCounter) { + final LabelledMetric itemCounter, + final boolean tracingEnabled, + final String pipelineName) { final Pipe pipe = createPipe(bufferSize, sourceName, itemCounter); final IteratorSourceStage sourceStage = new IteratorSourceStage<>(sourceName, source, pipe); return new PipelineBuilder<>( - pipe, singleton(sourceStage), singleton(pipe), sourceName, pipe, bufferSize, itemCounter); + pipe, + singleton(sourceStage), + singleton(pipe), + sourceName, + pipe, + bufferSize, + itemCounter, + tracingEnabled, + pipelineName); } /** @@ -103,13 +121,27 @@ public static PipelineBuilder createPipelineFrom( * @param outputCounter the counter to increment for each output of a stage. Must have a single * label which will be filled with the stage name. * @param the type of items input into the pipeline. + * @param tracingEnabled whether this pipeline should be traced + * @param pipelineName the name of the pipeline for tracing purposes * @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages. */ public static PipelineBuilder createPipeline( - final String sourceName, final int bufferSize, final LabelledMetric outputCounter) { + final String sourceName, + final int bufferSize, + final LabelledMetric outputCounter, + final boolean tracingEnabled, + final String pipelineName) { final Pipe pipe = createPipe(bufferSize, sourceName, outputCounter); return new PipelineBuilder<>( - pipe, emptyList(), singleton(pipe), sourceName, pipe, bufferSize, outputCounter); + pipe, + emptyList(), + singleton(pipe), + sourceName, + pipe, + bufferSize, + outputCounter, + tracingEnabled, + pipelineName); } /** @@ -216,7 +248,9 @@ public PipelineBuilder> inBatches(final int maximumBatchSize) { maximumBatchSize, outputCounter.labels(lastStageName + "_outputPipe", "batches")), (int) Math.ceil(((double) bufferSize) / maximumBatchSize), - outputCounter); + outputCounter, + tracingEnabled, + pipelineName); } /** @@ -274,7 +308,12 @@ public PipelineBuilder thenFlatMapInParallel( */ public Pipeline andFinishWith(final String stageName, final Consumer completer) { return new Pipeline<>( - inputPipe, stages, pipes, new CompleterStage<>(stageName, pipeEnd, completer)); + inputPipe, + pipelineName, + tracingEnabled, + stages, + pipes, + new CompleterStage<>(stageName, pipeEnd, completer)); } private PipelineBuilder thenProcessInParallel( @@ -297,7 +336,9 @@ private PipelineBuilder thenProcessInParallel( stageName, newPipeEnd, newBufferSize, - outputCounter); + outputCounter, + tracingEnabled, + pipelineName); } private PipelineBuilder addStage( @@ -317,7 +358,9 @@ private PipelineBuilder addStage( processStage.getName(), outputPipe, newBufferSize, - outputCounter); + outputCounter, + tracingEnabled, + pipelineName); } private List concat(final Collection existing, final X newItem) { diff --git a/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java b/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java index 839bccb9b04..9b33ba24708 100644 --- a/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java +++ b/services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java @@ -78,7 +78,8 @@ public void afterClass() throws Exception { public void shouldPipeTasksFromSupplierToCompleter() throws Exception { final List output = new ArrayList<>(); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .andFinishWith("end", output::add); final CompletableFuture result = pipeline.start(executorService); result.get(10, SECONDS); @@ -89,7 +90,8 @@ public void shouldPipeTasksFromSupplierToCompleter() throws Exception { public void shouldPassInputThroughIntermediateStage() throws Exception { final List output = new ArrayList<>(); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenProcess("toString", Object::toString) .andFinishWith("end", output::add); @@ -104,7 +106,8 @@ public void shouldPassInputThroughIntermediateStage() throws Exception { public void shouldCombineIntoBatches() throws Exception { final BlockingQueue> output = new ArrayBlockingQueue<>(10); final Pipeline pipeline = - PipelineBuilder.createPipeline("source", 20, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipeline( + "source", 20, NO_OP_LABELLED_2_COUNTER, false, "test") .inBatches(6) .andFinishWith("end", output::offer); @@ -135,7 +138,8 @@ public void shouldCombineIntoBatches() throws Exception { public void shouldProcessAsync() throws Exception { final List output = new ArrayList<>(); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenProcessAsync("toString", value -> completedFuture(Integer.toString(value)), 3) .andFinishWith("end", output::add); final CompletableFuture result = pipeline.start(executorService); @@ -150,7 +154,8 @@ public void shouldProcessAsyncOrdered() throws Exception { final Map> futures = new ConcurrentHashMap<>(); final List output = new CopyOnWriteArrayList<>(); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 15, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 15, NO_OP_LABELLED_2_COUNTER, false, "test") .thenProcessAsyncOrdered( "toString", value -> { @@ -198,7 +203,12 @@ public void shouldLimitInFlightProcessesWhenProcessingAsync() throws Exception { final List> futures = new CopyOnWriteArrayList<>(); final Pipeline pipeline = PipelineBuilder.createPipelineFrom( - "input", asList(1, 2, 3, 4, 5, 6, 7).iterator(), 10, NO_OP_LABELLED_2_COUNTER) + "input", + asList(1, 2, 3, 4, 5, 6, 7).iterator(), + 10, + NO_OP_LABELLED_2_COUNTER, + false, + "test") .thenProcessAsync( "createFuture", value -> { @@ -235,7 +245,8 @@ public void shouldLimitInFlightProcessesWhenProcessingAsync() throws Exception { public void shouldFlatMapItems() throws Exception { final List output = new ArrayList<>(); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenFlatMap("flatMap", input -> Stream.of(input, input * 2), 20) .andFinishWith("end", output::add); @@ -252,7 +263,8 @@ public void shouldProcessInParallel() throws Exception { final List output = synchronizedList(new ArrayList<>()); final CountDownLatch latch = new CountDownLatch(1); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenProcessInParallel( "stageName", value -> { @@ -287,7 +299,8 @@ public void shouldFlatMapInParallel() throws Exception { final List output = synchronizedList(new ArrayList<>()); final CountDownLatch latch = new CountDownLatch(1); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenFlatMapInParallel( "stageName", value -> { @@ -327,7 +340,8 @@ public void shouldAbortPipeline() throws Exception { final List output = synchronizedList(new ArrayList<>()); final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenProcess( "stageName", value -> { @@ -363,7 +377,8 @@ public void shouldAbortPipelineWhenFutureIsCancelled() throws Exception { final List output = synchronizedList(new ArrayList<>()); final CountDownLatch startedProcessingValueSix = new CountDownLatch(1); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenProcess( "stageName", value -> { @@ -396,7 +411,8 @@ public void shouldAbortPipelineWhenFutureIsCancelled() throws Exception { public void shouldAbortPipelineWhenProcessorThrowsException() { final RuntimeException expectedError = new RuntimeException("Oops"); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, NO_OP_LABELLED_2_COUNTER) + PipelineBuilder.createPipelineFrom( + "input", tasks, 10, NO_OP_LABELLED_2_COUNTER, false, "test") .thenProcess( "stageName", (Function) @@ -421,7 +437,7 @@ public void shouldTrackTaskCountMetrics() throws Exception { labels -> counters.computeIfAbsent(labels[0] + "-" + labels[1], label -> new SimpleCounter()); final Pipeline pipeline = - PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter) + PipelineBuilder.createPipelineFrom("input", tasks, 10, labelledCounter, false, "test") .thenProcess("map", Function.identity()) .thenProcessInParallel("parallel", Function.identity(), 3) .thenProcessAsync("async", CompletableFuture::completedFuture, 3)