diff --git a/build.sbt b/build.sbt index a69dc4b5a..77a99a2e6 100644 --- a/build.sbt +++ b/build.sbt @@ -465,31 +465,45 @@ lazy val `kamon-akka` = (project in file("instrumentation/kamon-akka")) def akkaHttpVersion(scalaVersion: String) = scalaVersion match { case "2.11" => "10.1.12" + case "3" => "10.5.0" case _ => "10.2.8" } +def akkaStreamVersion(scalaVersion: String) = scalaVersion match { + case "3" => "2.7.0" + case _ => "2.5.32" +} + +def versionedScalaSourceDirectories(sourceDir: File, scalaVersion: String): List[File] = + scalaVersion match { + case "3" => List(sourceDir / "scala-2.13+") + case "2.13" => List(sourceDir / "scala-2.13+") + case _ => Nil + } lazy val `kamon-akka-http` = (project in file("instrumentation/kamon-akka-http")) .enablePlugins(JavaAgent) .disablePlugins(AssemblyPlugin) .settings(instrumentationSettings) .settings(Seq( + Compile / unmanagedSourceDirectories ++= versionedScalaSourceDirectories((Compile / sourceDirectory).value, scalaBinaryVersion.value), resolvers += Resolver.bintrayRepo("hseeberger", "maven"), javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.10" % "test", libraryDependencies ++= Seq( kanelaAgent % "provided", "com.typesafe.akka" %% "akka-http" % akkaHttpVersion(scalaBinaryVersion.value) % "provided", "com.typesafe.akka" %% "akka-http2-support" % akkaHttpVersion(scalaBinaryVersion.value) % "provided", - "com.typesafe.akka" %% "akka-stream" % "2.5.32" % "provided", + "com.typesafe.akka" %% "akka-stream" % akkaStreamVersion(scalaBinaryVersion.value) % "provided", scalatest % "test", slf4jApi % "test", slf4jnop % "test", okHttp % "test", "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion(scalaBinaryVersion.value) % "test", - "de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" % "test", - "org.json4s" %% "json4s-native" % "3.6.7" % "test", - ), - )).dependsOn(`kamon-akka`, `kamon-testkit` % "test") + "de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" % "test" cross CrossVersion.for3Use2_13 intransitive(), + "org.json4s" %% "json4s-native" % "4.0.6" % "test", + ))) + .settings(crossScalaVersions += `scala_3_version`) + .dependsOn(`kamon-akka`, `kamon-testkit` % "test") diff --git a/instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala similarity index 94% rename from instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala rename to instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 563f687c3..3249bb74b 100644 --- a/instrumentation/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -8,7 +8,7 @@ import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched} import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} import akka.http.scaladsl.server.directives.RouteDirectives.reject import akka.http.scaladsl.server._ -import akka.http.scaladsl.server.util.Tupler +import akka.http.scaladsl.server.util.{Tuple, Tupler} import akka.http.scaladsl.util.FastFuture import kamon.Kamon import kamon.instrumentation.akka.http.HasMatchingContext.PathMatchingContext @@ -27,6 +27,7 @@ import akka.stream.scaladsl.Flow import kamon.context.Context import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic +import scala.annotation.static import scala.collection.immutable @@ -53,7 +54,7 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) onType("akka.http.impl.engine.http2.Http2Blueprint$") - .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) + .intercept(method("handleWithStreamIdHeader"), classOf[Http2BlueprintInterceptor]) /** * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free @@ -61,7 +62,7 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { */ onType("akka.http.scaladsl.server.RequestContextImpl") .mixin(classOf[HasMatchingContext.Mixin]) - .intercept(method("copy"), RequestContextCopyInterceptor) + .intercept(method("copy"), classOf[RequestContextCopyInterceptor]) onType("akka.http.scaladsl.server.directives.PathDirectives") .intercept(method("rawPathPrefix"), classOf[PathDirectivesRawPathPrefixInterceptor]) @@ -263,10 +264,11 @@ object LastAutomaticOperationNameEdit { new LastAutomaticOperationNameEdit(operationName, allowAutomaticChanges) } +class RequestContextCopyInterceptor object RequestContextCopyInterceptor { @RuntimeType - def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = { + @static def copy(@This context: RequestContext, @SuperCall copyCall: Callable[RequestContext]): RequestContext = { val copiedRequestContext = copyCall.call() copiedRequestContext.asInstanceOf[HasMatchingContext].setMatchingContext(context.asInstanceOf[HasMatchingContext].matchingContext) copiedRequestContext @@ -277,8 +279,8 @@ class PathDirectivesRawPathPrefixInterceptor object PathDirectivesRawPathPrefixInterceptor { import BasicDirectives._ - def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = { - implicit val LIsTuple = matcher.ev + @static def rawPathPrefix[T](@Argument(0) matcher: PathMatcher[T]): Directive[T] = { + implicit val LIsTuple: Tuple[T] = matcher.ev extract { ctx => val fullPath = ctx.unmatchedPath.toString() @@ -294,7 +296,7 @@ object PathDirectivesRawPathPrefixInterceptor { (ctx, matching) } flatMap { case (ctx, Matched(rest, values)) => - tprovide(values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult => + tprovide[T](values) & mapRequestContext(_ withUnmatchedPath rest) & mapRouteResult { routeResult => if(routeResult.isInstanceOf[Rejected]) ctx.asInstanceOf[HasMatchingContext].popOneMatchingContext() @@ -307,6 +309,7 @@ object PathDirectivesRawPathPrefixInterceptor { } } +class Http2BlueprintInterceptor object Http2BlueprintInterceptor { case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) @@ -316,7 +319,7 @@ object Http2BlueprintInterceptor { } @RuntimeType - def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], + @static def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { handler match { diff --git a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpClientTracingSpec.scala b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpClientTracingSpec.scala index 1b4b036d8..0322f9de1 100644 --- a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpClientTracingSpec.scala +++ b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpClientTracingSpec.scala @@ -31,6 +31,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.OptionValues +import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ class AkkaHttpClientTracingSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with MetricInspection.Syntax @@ -38,9 +39,9 @@ class AkkaHttpClientTracingSpec extends AnyWordSpecLike with Matchers with InitA import TestWebServer.Endpoints._ - implicit private val system = ActorSystem("http-client-instrumentation-spec") - implicit private val executor = system.dispatcher - implicit private val materializer = ActorMaterializer() + implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec") + implicit private val executor: ExecutionContextExecutor = system.dispatcher + implicit private val materializer: ActorMaterializer = ActorMaterializer() val timeoutTest: FiniteDuration = 5 second val interface = "127.0.0.1" diff --git a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerMetricsSpec.scala b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerMetricsSpec.scala index 2ed25dd7e..d76e48e25 100644 --- a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerMetricsSpec.scala +++ b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerMetricsSpec.scala @@ -29,7 +29,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.OptionValues -import scala.concurrent.Future +import scala.concurrent.{ExecutionContextExecutor, Future} import scala.concurrent.duration._ class AkkaHttpServerMetricsSpec extends AnyWordSpecLike with Matchers with InitAndStopKamonAfterAll with InstrumentInspection.Syntax @@ -37,9 +37,9 @@ class AkkaHttpServerMetricsSpec extends AnyWordSpecLike with Matchers with InitA import TestWebServer.Endpoints._ - implicit private val system = ActorSystem("http-server-metrics-instrumentation-spec") - implicit private val executor = system.dispatcher - implicit private val materializer = ActorMaterializer() + implicit private val system: ActorSystem = ActorSystem("http-server-metrics-instrumentation-spec") + implicit private val executor: ExecutionContextExecutor = system.dispatcher + implicit private val materializer: ActorMaterializer = ActorMaterializer() val port = 8083 val interface = "127.0.0.1" diff --git a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala index 00a5cefc8..2f2883ab3 100644 --- a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala +++ b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala @@ -31,15 +31,17 @@ import java.util.UUID import javax.net.ssl.{HostnameVerifier, SSLSession} import scala.concurrent.duration._ import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContextExecutor +import scala.util.Try class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with ScalaFutures with Inside with InitAndStopKamonAfterAll with MetricInspection.Syntax with Reconfigure with TestWebServer with Eventually with OptionValues with TestSpanReporter { import TestWebServer.Endpoints._ - implicit private val system = ActorSystem("http-server-instrumentation-spec") - implicit private val executor = system.dispatcher - implicit private val materializer = ActorMaterializer() + implicit private val system: ActorSystem = ActorSystem("http-server-instrumentation-spec") + implicit private val executor: ExecutionContextExecutor = system.dispatcher + implicit private val materializer: ActorMaterializer = ActorMaterializer() val (sslSocketFactory, trustManager) = clientSSL() val okHttp = new OkHttpClient.Builder() @@ -228,7 +230,12 @@ class AkkaHttpServerTracingSpec extends AnyWordSpecLike with Matchers with Scala "correctly time entity transfer timings" in { val target = s"$protocol://$interface:$port/$stream" - client.newCall(new Request.Builder().url(target).build()).execute() + def probablyScala3 = util.Properties.releaseVersion.contains("2.13.10") + + def makeCall = client.newCall(new Request.Builder().url(target).build()).execute() + // akka 2.7.0 is flaky on this + if (probablyScala3) Try(makeCall).orElse(Try(makeCall)) + else makeCall val span = eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value diff --git a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/ServerFlowWrapperSpec.scala b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/ServerFlowWrapperSpec.scala index 27803f3e4..f8ac71d5e 100644 --- a/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/ServerFlowWrapperSpec.scala +++ b/instrumentation/kamon-akka-http/src/test/scala/kamon/akka/http/ServerFlowWrapperSpec.scala @@ -11,11 +11,13 @@ import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.ExecutionContextExecutor + class ServerFlowWrapperSpec extends AnyWordSpecLike with Matchers with ScalaFutures with InitAndStopKamonAfterAll { - implicit private val system = ActorSystem("http-client-instrumentation-spec") - implicit private val executor = system.dispatcher - implicit private val materializer = ActorMaterializer() + implicit private val system: ActorSystem = ActorSystem("http-client-instrumentation-spec") + implicit private val executor: ExecutionContextExecutor = system.dispatcher + implicit private val materializer: ActorMaterializer = ActorMaterializer() private val okReturningFlow = Flow[HttpRequest].map { _ => HttpResponse(status = StatusCodes.OK, entity = HttpEntity("OK")) diff --git a/instrumentation/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala b/instrumentation/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala index ebf719bbb..b55551733 100644 --- a/instrumentation/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala +++ b/instrumentation/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala @@ -36,18 +36,19 @@ import kamon.instrumentation.akka.http.TracingDirectives import org.json4s.{DefaultFormats, native} import kamon.tag.Lookups.plain import kamon.trace.Trace +import org.json4s.native.Serialization import scala.concurrent.{ExecutionContext, Future} trait TestWebServer extends TracingDirectives { - implicit val serialization = native.Serialization - implicit val formats = DefaultFormats + implicit val serialization: Serialization.type = native.Serialization + implicit val formats: DefaultFormats.type = DefaultFormats import Json4sSupport._ def startServer(interface: String, port: Int, https: Boolean = false)(implicit system: ActorSystem): WebServer = { import Endpoints._ implicit val ec: ExecutionContext = system.dispatcher - implicit val materializer = ActorMaterializer() + implicit val materializer: ActorMaterializer = ActorMaterializer() val routes = logRequest("routing-request") { get {