Skip to content

Commit

Permalink
akka-grpc and pekko-grpc scala 3 support (#1312)
Browse files Browse the repository at this point in the history
* pekko-grpc tests passing

* akka-grpc tests passing
  • Loading branch information
hughsimpson authored Dec 1, 2023
1 parent eb3628c commit 5f36f94
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 25 deletions.
14 changes: 9 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,10 @@ def akkaStreamVersion(scalaVersion: String) = scalaVersion match {
case "3" => "2.7.0"
case _ => "2.5.32"
}
def akkaGrpcRuntimeVersion(scalaVersion: String) = scalaVersion match {
case "3" => "2.3.0"
case _ => "2.1.3"
}

def versionedScalaSourceDirectories(sourceDir: File, scalaVersion: String): List[File] =
scalaVersion match {
Expand Down Expand Up @@ -550,7 +554,7 @@ lazy val `kamon-pekko-grpc` = (project in file("instrumentation/kamon-pekko-grpc
.settings(instrumentationSettings)
.settings(Seq(
PB.additionalDependencies := Seq.empty,
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",

Expand All @@ -575,18 +579,18 @@ lazy val `kamon-akka-grpc` = (project in file("instrumentation/kamon-akka-grpc")
.settings(instrumentationSettings)
.settings(Seq(
PB.additionalDependencies := Seq.empty,
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`),
crossScalaVersions := Seq(`scala_2.12_version`, `scala_2.13_version`, scala_3_version),
libraryDependencies ++= Seq(
kanelaAgent % "provided",

"com.typesafe.akka" %% "akka-http" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
"com.typesafe.akka" %% "akka-http2-support" % akkaHttpVersion(scalaBinaryVersion.value) % "provided",
"com.typesafe.akka" %% "akka-stream" % "2.5.32" % "provided",
"com.typesafe.akka" %% "akka-discovery" % "2.5.32" % "provided",
"com.typesafe.akka" %% "akka-stream" % akkaStreamVersion(scalaBinaryVersion.value) % "provided",
"com.typesafe.akka" %% "akka-discovery" % akkaStreamVersion(scalaBinaryVersion.value) % "provided",

// gRPC-specific dependencies provided by the sbt-akka-grpc plugin. We
"com.thesamet.scalapb" %% "scalapb-runtime" % "0.11.8" % "provided",
"com.lightbend.akka.grpc" %% "akka-grpc-runtime" % "2.1.3" % "provided",
"com.lightbend.akka.grpc" %% "akka-grpc-runtime" % akkaGrpcRuntimeVersion(scalaBinaryVersion.value) % "provided",
"io.grpc" % "grpc-stub" % "1.43.2" % "provided",

scalatest % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import kamon.Kamon
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.annotation.static

class AkkaGrpcServerInstrumentation extends InstrumentationBuilder {

/**
Expand All @@ -33,18 +35,19 @@ class AkkaGrpcServerInstrumentation extends InstrumentationBuilder {
* otherwise the span remains unchanged. Assumes no actual implementation of `akka.grpc.internal.TelemetrySpi` is
* configured.
*/
onType("akka.grpc.internal.NoOpTelemetry$")
.advise(method("onRequest"), AkkaGRPCServerRequestHandler)
onType("akka.grpc.internal.TelemetrySpi")
.advise(method("onRequest"), classOf[AkkaGRPCServerRequestHandler])


onType("akka.grpc.javadsl.GrpcMarshalling")
.advise(method("unmarshal"), classOf[AkkaGRPCUnmarshallingContextPropagation])
}

class AkkaGRPCServerRequestHandler
object AkkaGRPCServerRequestHandler {

@Advice.OnMethodEnter()
def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = {
@static def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = {
val fullSpanName = serviceName + "/" + method
Kamon.currentSpan()
.name(fullSpanName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

class AkkaGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually
with TestSpanReporter with OptionValues {

implicit val system = ActorSystem("akka-grpc-instrumentation")
implicit val ec = system.dispatcher
implicit val system: ActorSystem = ActorSystem("akka-grpc-instrumentation")
implicit val ec: ExecutionContextExecutor = system.dispatcher

val greeterService = GreeterServiceHandler(new GreeterServiceImpl())
val serverBinding = Http()
Expand All @@ -56,9 +57,4 @@ class AkkaGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with
}
}
}

override protected def beforeAll(): Unit = {
super.beforeAll()
enableFastSpanFlushing()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import kamon.Kamon
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.libs.net.bytebuddy.asm.Advice

import scala.annotation.static

class PekkoGrpcServerInstrumentation extends InstrumentationBuilder {

/**
Expand All @@ -33,18 +35,19 @@ class PekkoGrpcServerInstrumentation extends InstrumentationBuilder {
* otherwise the span remains unchanged. Assumes no actual implementation of `pekko.grpc.internal.TelemetrySpi` is
* configured.
*/
onType("org.apache.pekko.grpc.internal.NoOpTelemetry$")
.advise(method("onRequest"), PekkoGRPCServerRequestHandler)
onType("org.apache.pekko.grpc.internal.TelemetrySpi")
.advise(method("onRequest"), classOf[PekkoGRPCServerRequestHandler])


onType("org.apache.pekko.grpc.scaladsl.GrpcMarshalling")
.advise(method("unmarshal"), classOf[PekkoGRPCUnmarshallingContextPropagation])
}

class PekkoGRPCServerRequestHandler
object PekkoGRPCServerRequestHandler {

@Advice.OnMethodEnter()
def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = {
@static def enter(@Advice.Argument(0) serviceName: String, @Advice.Argument(1) method: String): Unit = {
val fullSpanName = serviceName + "/" + method
Kamon.currentSpan()
.name(fullSpanName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._

class PekkoGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll with Matchers with Eventually
with TestSpanReporter with OptionValues {

implicit val system = ActorSystem("pekko-grpc-instrumentation")
implicit val ec = system.dispatcher
implicit val system: ActorSystem = ActorSystem("pekko-grpc-instrumentation")
implicit val ec: ExecutionContextExecutor = system.dispatcher

val greeterService = GreeterServiceHandler(new GreeterServiceImpl())
val serverBinding = Http()
Expand All @@ -56,9 +57,4 @@ class PekkoGrpcTracingSpec extends AnyWordSpec with InitAndStopKamonAfterAll wit
}
}
}

override protected def beforeAll(): Unit = {
super.beforeAll()
enableFastSpanFlushing()
}
}

0 comments on commit 5f36f94

Please sign in to comment.