Skip to content

Commit

Permalink
feat: add pekko-grpc instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
inobu committed Oct 20, 2023
1 parent a96a353 commit 1bd9cd0
Show file tree
Hide file tree
Showing 10 changed files with 316 additions and 0 deletions.
27 changes: 27 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ val instrumentationProjects = Seq[ProjectReference](
`kamon-akka-grpc`,
`kamon-pekko`,
`kamon-pekko-http`,
`kamon-pekko-grpc`,
`kamon-play`,
`kamon-okhttp`,
`kamon-tapir`,
Expand Down Expand Up @@ -526,6 +527,31 @@ lazy val `kamon-pekko-http` = (project in file("instrumentation/kamon-pekko-http
),
)).dependsOn(`kamon-pekko`, `kamon-testkit` % "test")

lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc"))
.enablePlugins(JavaAgent, PekkoGrpcPlugin)
.disablePlugins(AssemblyPlugin)
.settings(instrumentationSettings)
.settings(Seq(
PB.additionalDependencies := Seq.empty,
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
libraryDependencies ++= Seq(
kanelaAgent % "provided",

"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion % "provided",
"org.apache.pekko" %% "pekko-stream" % "1.0.1" % "provided",
"org.apache.pekko" %% "pekko-discovery"% "1.0.0" % "provided",

"com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.8" % "provided",
"org.apache.pekko" %% "pekko-grpc-runtime" % "1.0.0" % "provided",
"io.grpc" % "grpc-stub" % "1.43.2" % "provided",


scalatest % "test",
slf4jApi % "test",
logbackClassic % "test",
)
)).dependsOn(`kamon-pekko-http`, `kamon-testkit` % "test")

lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc"))
.enablePlugins(JavaAgent, AkkaGrpcPlugin)
.disablePlugins(AssemblyPlugin)
Expand Down Expand Up @@ -998,6 +1024,7 @@ lazy val `kamon-bundle-dependencies-2-12-and-up` = (project in file("bundle/kamo
`kamon-finagle`,
`kamon-pekko`,
`kamon-pekko-http`,
`kamon-pekko-grpc`,
`kamon-tapir`,
`kamon-alpakka-kafka`
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kamon.instrumentation.pekko.grpc;

import org.apache.pekko.http.javadsl.model.HttpEntity;
import kamon.Kamon;
import kamon.context.Context;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class PekkoGRPCUnmarshallingContextPropagation {

@Advice.OnMethodExit()
public static void onExit(
@Advice.Return(readOnly = false) CompletionStage<?> returnValue,
@Advice.Argument(0) Object firstArgument) {

if(firstArgument instanceof HttpEntity && returnValue instanceof CompletableFuture) {
final Context currentContext = Kamon.currentContext();

// NOTES: The wrapper is only overriding thenCompose because it is the only function that gets called
// after GrpcMarshalling.unmarshall in the auto-generated HandlerFactory for gRPC services. In
// the future this might be removed if we instrument CompletionStage directly.
returnValue = new ContextPropagatingCompletionStage<>((CompletableFuture) returnValue, currentContext);
}
}


public static class ContextPropagatingCompletionStage<T> extends CompletableFuture<T> {
private final CompletableFuture<T> wrapped;
private final Context context;

public ContextPropagatingCompletionStage(CompletableFuture<T> wrapped, Context context) {
this.wrapped = wrapped;
this.context = context;
}

@Override
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
Function<? super T, ? extends CompletionStage<U>> wrapperFunction = (t) -> {
return Kamon.runWithContext(context, () -> fn.apply(t));
};

return wrapped.thenCompose(wrapperFunction);
}
}

}
20 changes: 20 additions & 0 deletions instrumentation/kamon-pekko-grpc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# ======================================= #
# Kamon-Pekko-gRPC Reference Configuration #
# ======================================= #

kanela.modules {
pekko-grpc {
name = "Pekko gRPC Instrumentation"
description = "Context propagation and tracing for Pekko gRPC"
enabled = yes

instrumentations = [
"kamon.instrumentation.pekko.grpc.PekkoGrpcServerInstrumentation"
]

within = [
"^org.apache.pekko.grpc.internal..*",
"^org.apache.pekko.grpc.scaladsl.GrpcMarshalling$"
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.grpc

import kamon.Kamon
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

class PekkoGrpcServerInstrumentation extends InstrumentationBuilder {

/**
* Support for Pekko gRPC servers.
*
* gRPC requests get their spans started by the ServerFlowWrapper in the Pekko HTTP instrumentation like any other
* requests, but they never go through any instrumentation that gives a good operation name to the Span and forces
* taking a sampling decision.
*
* This instrumentation gives a proper name and tags to the span when it matches one of the exposed services,
* otherwise the span remains unchanged. Assumes no actual implementation of `pekko.grpc.internal.TelemetrySpi` is
* configured.
*/
onType("org.apache.pekko.grpc.internal.NoOpTelemetry$")
.advise(method("onRequest"), PekkoGRPCServerRequestHandler)


onType("org.apache.pekko.grpc.scaladsl.GrpcMarshalling")
.advise(method("unmarshal"), classOf[PekkoGRPCUnmarshallingContextPropagation])
}

object PekkoGRPCServerRequestHandler {

@Advice.OnMethodEnter()
def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = {
val fullSpanName = serviceName + "/" + method
Kamon.currentSpan()
.name(fullSpanName)
.tagMetrics("component", "pekko.grpc.server")
.tagMetrics("rpc.system", "grpc")
.tagMetrics("rpc.service", serviceName)
.tagMetrics("rpc.method", method)
.takeSamplingDecision()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "kamon.instrumentation.pekko.grpc";
option java_outer_classname = "HelloWorldProto";

package helloworld;

////////////////////////////////////// The greeting service definition.
service GreeterService {
//////////////////////
// Sends a greeting //
////////*****/////////
// HELLO //
////////*****/////////
rpc SayHello (HelloRequest) returns (HelloReply) {}

// Comment spanning
// on several lines
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}

/*
* C style comments
*/
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}

/* C style comments
* on several lines
* with non-empty heading/trailing line */
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pekko.http.server.preview.enable-http2 = on
12 changes: 12 additions & 0 deletions instrumentation/kamon-pekko-grpc/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<configuration>
<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.grpc

import scala.concurrent.Future
import org.apache.pekko.NotUsed
import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source


class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService {
import mat.executionContext

override def sayHello(in: HelloRequest): Future[HelloReply] = {
Future.successful(HelloReply(s"Hello, ${in.name}"))
}

override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = {
in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}"))
}

override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString))
}

override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = {
in.map(request => HelloReply(s"Hello, ${request.name}"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2013-2021 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.pekko.grpc

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.grpc.GrpcClientSettings
import org.apache.pekko.http.scaladsl.Http
import kamon.tag.Lookups.plain
import kamon.testkit.{InitAndStopKamonAfterAll, TestSpanReporter}
import org.scalatest.OptionValues
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.duration._

class PekkoGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually
with TestSpanReporter with OptionValues {

implicit val system = ActorSystem("pekko-grpc-instrumentation")
implicit val ec = system.dispatcher

val greeterService = GreeterServiceHandler(new GreeterServiceImpl())
val serverBinding = Http()
.newServerAt("127.0.0.1", 8598)
.bind(greeterService)


val client = GreeterServiceClient(GrpcClientSettings.connectToServiceAt("127.0.0.1", 8598).withTls(false))

"the Pekko gRPC instrumentation" should {
"create spans for the server-side" in {
client.sayHello(HelloRequest("kamon"))

eventually(timeout(5 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "helloworld.GreeterService/SayHello"
span.metricTags.get(plain("component")) shouldBe "pekko.grpc.server"
span.metricTags.get(plain("rpc.system")) shouldBe "grpc"
span.metricTags.get(plain("rpc.service")) shouldBe "helloworld.GreeterService"
span.metricTags.get(plain("rpc.method")) shouldBe "SayHello"
}
}
}

override protected def beforeAll(): Unit = {
super.beforeAll()
enableFastSpanFlushing()
}
}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.5.0")

addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "2.1.3")
addSbtPlugin("org.apache.pekko" % "pekko-grpc-sbt-plugin" % "1.0.0")

0 comments on commit 1bd9cd0

Please sign in to comment.