diff --git a/build.sbt b/build.sbt index 8a31a09fb..1a47a4922 100644 --- a/build.sbt +++ b/build.sbt @@ -139,6 +139,7 @@ val instrumentationProjects = Seq[ProjectReference]( `kamon-akka-grpc`, `kamon-pekko`, `kamon-pekko-http`, + `kamon-pekko-grpc`, `kamon-play`, `kamon-okhttp`, `kamon-tapir`, @@ -526,6 +527,31 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http ), )).dependsOn(`kamon-pekko`, `kamon-testkit` % "test") +lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc")) + .enablePlugins(JavaAgent, PekkoGrpcPlugin) + .disablePlugins(AssemblyPlugin) + .settings(instrumentationSettings) + .settings(Seq( + PB.additionalDependencies := Seq.empty, + crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`), + libraryDependencies ++= Seq( + kanelaAgent % "provided", + + "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided", + "org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided", + "org.apache.pekko" %% "pekko-discovery"% "1.0.0" % "provided", + + "com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.8" % "provided", + "org.apache.pekko" %% "pekko-grpc-runtime" % "1.0.0" % "provided", + "io.grpc" % "grpc-stub" % "1.43.2" % "provided", + + + scalatest % "test", + slf4jApi % "test", + logbackClassic % "test", + ) + )).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test") + lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc")) .enablePlugins(JavaAgent, AkkaGrpcPlugin) .disablePlugins(AssemblyPlugin) @@ -998,6 +1024,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo `kamon-finagle`, `kamon-pekko`, `kamon-pekko-http`, + `kamon-pekko-grpc`, `kamon-tapir`, `kamon-alpakka-kafka` ) diff --git a/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java b/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java new file mode 100644 index 000000000..5810d1dc0 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/java/kamon/instrumentation/pekko/grpc/PekkoGRPCUnmarshallingContextPropagation.java @@ -0,0 +1,49 @@ +package kamon.instrumentation.pekko.grpc; + +import org.apache.pekko.http.javadsl.model.HttpEntity; +import kamon.Kamon; +import kamon.context.Context; +import kanela.agent.libs.net.bytebuddy.asm.Advice; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +public class PekkoGRPCUnmarshallingContextPropagation { + + @Advice.OnMethodExit() + public static void onExit( + @Advice.Return(readOnly = false) CompletionStage returnValue, + @Advice.Argument(0) Object firstArgument) { + + if(firstArgument instanceof HttpEntity && returnValue instanceof CompletableFuture) { + final Context currentContext = Kamon.currentContext(); + + // NOTES: The wrapper is only overriding thenCompose because it is the only function that gets called + // after GrpcMarshalling.unmarshall in the auto-generated HandlerFactory for gRPC services. In + // the future this might be removed if we instrument CompletionStage directly. + returnValue = new ContextPropagatingCompletionStage<>((CompletableFuture) returnValue, currentContext); + } + } + + + public static class ContextPropagatingCompletionStage extends CompletableFuture { + private final CompletableFuture wrapped; + private final Context context; + + public ContextPropagatingCompletionStage(CompletableFuture wrapped, Context context) { + this.wrapped = wrapped; + this.context = context; + } + + @Override + public CompletableFuture thenCompose(Function> fn) { + Function> wrapperFunction = (t) -> { + return Kamon.runWithContext(context, () -> fn.apply(t)); + }; + + return wrapped.thenCompose(wrapperFunction); + } + } + +} diff --git a/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf b/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf new file mode 100644 index 000000000..ba773a8b0 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf @@ -0,0 +1,20 @@ +# ======================================= # +# Kamon-Pekko-gRPC Reference Configuration # +# ======================================= # + +kanela.modules { + pekko-grpc { + name = "Pekko gRPC Instrumentation" + description = "Context propagation and tracing for Pekko gRPC" + enabled = yes + + instrumentations = [ + "kamon.instrumentation.pekko.grpc.PekkoGrpcServerInstrumentation" + ] + + within = [ + "^org.apache.pekko.grpc.internal..*", + "^org.apache.pekko.grpc.scaladsl.GrpcMarshalling$" + ] + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala b/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala new file mode 100644 index 000000000..a7cbbc7c2 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/main/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcServerInstrumentation.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.pekko.grpc + +import kamon.Kamon +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + +class PekkoGrpcServerInstrumentation extends InstrumentationBuilder { + + /** + * Support for Pekko gRPC servers. + * + * gRPC requests get their spans started by the ServerFlowWrapper in the Pekko HTTP instrumentation like any other + * requests, but they never go through any instrumentation that gives a good operation name to the Span and forces + * taking a sampling decision. + * + * This instrumentation gives a proper name and tags to the span when it matches one of the exposed services, + * otherwise the span remains unchanged. Assumes no actual implementation of `pekko.grpc.internal.TelemetrySpi` is + * configured. + */ + onType("org.apache.pekko.grpc.internal.NoOpTelemetry$") + .advise(method("onRequest"), PekkoGRPCServerRequestHandler) + + + onType("org.apache.pekko.grpc.scaladsl.GrpcMarshalling") + .advise(method("unmarshal"), classOf[PekkoGRPCUnmarshallingContextPropagation]) +} + +object PekkoGRPCServerRequestHandler { + + @Advice.OnMethodEnter() + def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = { + val fullSpanName = serviceName + "/" + method + Kamon.currentSpan() + .name(fullSpanName) + .tagMetrics("component", "pekko.grpc.server") + .tagMetrics("rpc.system", "grpc") + .tagMetrics("rpc.service", serviceName) + .tagMetrics("rpc.method", method) + .takeSamplingDecision() + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto b/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto new file mode 100644 index 000000000..bf204a6d0 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/protobuf/helloworld.proto @@ -0,0 +1,41 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "kamon.instrumentation.pekko.grpc"; +option java_outer_classname = "HelloWorldProto"; + +package helloworld; + +////////////////////////////////////// The greeting service definition. +service GreeterService { + ////////////////////// + // Sends a greeting // + ////////*****///////// + // HELLO // + ////////*****///////// + rpc SayHello (HelloRequest) returns (HelloReply) {} + + // Comment spanning + // on several lines + rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {} + + /* + * C style comments + */ + rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {} + + /* C style comments + * on several lines + * with non-empty heading/trailing line */ + rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf b/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf new file mode 100644 index 000000000..5d726de4a --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/resources/application.conf @@ -0,0 +1 @@ +pekko.http.server.preview.enable-http2 = on \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml b/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml new file mode 100644 index 000000000..742815603 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + \ No newline at end of file diff --git a/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala new file mode 100644 index 000000000..1b59ca4b5 --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/GreeterServiceImpl.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.pekko.grpc + +import scala.concurrent.Future +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.Source + + +class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService { + import mat.executionContext + + override def sayHello(in: HelloRequest): Future[HelloReply] = { + Future.successful(HelloReply(s"Hello, ${in.name}")) + } + + override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = { + in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}")) + } + + override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = { + Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString)) + } + + override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { + in.map(request => HelloReply(s"Hello, ${request.name}")) + } +} diff --git a/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala new file mode 100644 index 000000000..bcdcee56b --- /dev/null +++ b/instrumentation/kamon-pekko-grpc/src/test/scala/kamon/instrumentation/pekko/grpc/PekkoGrpcTracingSpec.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2013-2021 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.pekko.grpc + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.grpc.GrpcClientSettings +import org.apache.pekko.http.scaladsl.Http +import kamon.tag.Lookups.plain +import kamon.testkit.{InitAndStopKamonAfterAll, TestSpanReporter} +import org.scalatest.OptionValues +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.concurrent.duration._ + +class PekkoGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually + with TestSpanReporter with OptionValues { + + implicit val system = ActorSystem("pekko-grpc-instrumentation") + implicit val ec = system.dispatcher + + val greeterService = GreeterServiceHandler(new GreeterServiceImpl()) + val serverBinding = Http() + .newServerAt("127.0.0.1", 8598) + .bind(greeterService) + + + val client = GreeterServiceClient(GrpcClientSettings.connectToServiceAt("127.0.0.1", 8598).withTls(false)) + + "the Pekko gRPC instrumentation" should { + "create spans for the server-side" in { + client.sayHello(HelloRequest("kamon")) + + eventually(timeout(5 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "helloworld.GreeterService/SayHello" + span.metricTags.get(plain("component")) shouldBe "pekko.grpc.server" + span.metricTags.get(plain("rpc.system")) shouldBe "grpc" + span.metricTags.get(plain("rpc.service")) shouldBe "helloworld.GreeterService" + span.metricTags.get(plain("rpc.method")) shouldBe "SayHello" + } + } + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + enableFastSpanFlushing() + } +} diff --git a/project/plugins.sbt b/project/plugins.sbt index 4f0653992..437e1df70 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -14,3 +14,4 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.5.0") addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.3") +addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.0.0")