From 5ac354351c767a658e3bfc44f14ffcc11d406c90 Mon Sep 17 00:00:00 2001 From: atyutin Date: Thu, 28 Mar 2024 20:57:48 +0300 Subject: [PATCH] add: instrumentation for apache cxf client --- build.sbt | 29 ++++- .../src/main/resources/reference.conf | 99 ++++++++++++++ .../cxf/client/ApacheCxfClientHelper.scala | 103 +++++++++++++++ ...lientProxyFactoryBeanInstrumentation.scala | 32 +++++ .../apache/cxf/client/TraceScopeHolder.scala | 17 +++ .../cxf/client/TracingClientFeature.scala | 22 ++++ .../TracingClientFeatureInitializer.scala | 16 +++ .../cxf/client/TracingClientInterceptor.scala | 121 ++++++++++++++++++ .../src/test/resources/application.conf | 19 +++ .../src/test/resources/logback.xml | 17 +++ .../apache/cxf/client/MessageSpec.scala | 105 +++++++++++++++ .../cxf/client/util/FailingInterceptor.scala | 25 ++++ .../cxf/client/util/HelloWorldService.scala | 9 ++ .../client/util/MockServerExpectations.scala | 104 +++++++++++++++ 14 files changed, 715 insertions(+), 3 deletions(-) create mode 100644 instrumentation/kamon-apache-cxf/src/main/resources/reference.conf create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala create mode 100644 instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala create mode 100644 instrumentation/kamon-apache-cxf/src/test/resources/application.conf create mode 100644 instrumentation/kamon-apache-cxf/src/test/resources/logback.xml create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala create mode 100644 instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala diff --git a/build.sbt b/build.sbt index a35db90e4..4240e0486 100644 --- a/build.sbt +++ b/build.sbt @@ -148,7 +148,8 @@ val instrumentationProjects = Seq[ProjectReference]( `kamon-alpakka-kafka`, `kamon-http4s-1_0`, `kamon-http4s-0_23`, - `kamon-apache-httpclient` + `kamon-apache-httpclient`, + `kamon-apache-cxf` ) lazy val instrumentation = (project in file("instrumentation")) @@ -839,6 +840,26 @@ lazy val `kamon-apache-httpclient` = (project in file("instrumentation/kamon-apa ) ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") +lazy val `kamon-apache-cxf` = (project in file("instrumentation/kamon-apache-cxf")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + crossScalaVersions := Seq(`scala_2.13_version`, `scala_3_version`), + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "org.apache.cxf" % "cxf-rt-frontend-simple" % "3.3.6" % "provided", + slf4jApi % "provided", + + scalatest % "test", + logbackClassic % "test", + "org.mock-server" % "mockserver-client-java" % "5.13.2" % "test", + "com.dimafeng" %% "testcontainers-scala" % "0.41.0" % "test", + "com.dimafeng" %% "testcontainers-scala-mockserver" % "0.41.0" % "test", + "org.apache.cxf" % "cxf-rt-frontend-jaxws" % "3.3.6" % "test", + "org.apache.cxf" % "cxf-rt-transports-http" % "3.3.6" % "test", + ) + ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") /** * Reporters @@ -1107,7 +1128,8 @@ lazy val `kamon-bundle-dependencies-all` = (project in file("bundle/kamon-bundle `kamon-caffeine`, `kamon-lagom`, `kamon-aws-sdk`, - `kamon-apache-httpclient` + `kamon-apache-httpclient`, + `kamon-apache-cxf` ) /** @@ -1172,7 +1194,8 @@ lazy val `kamon-bundle-dependencies-3` = (project in file("bundle/kamon-bundle-d `kamon-pekko`, `kamon-pekko-http`, `kamon-pekko-grpc`, - `kamon-apache-httpclient` + `kamon-apache-httpclient`, + `kamon-apache-cxf` ) lazy val `kamon-bundle` = (project in file("bundle/kamon-bundle")) diff --git a/instrumentation/kamon-apache-cxf/src/main/resources/reference.conf b/instrumentation/kamon-apache-cxf/src/main/resources/reference.conf new file mode 100644 index 000000000..cc2d862e2 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/main/resources/reference.conf @@ -0,0 +1,99 @@ +# ================================================== # +# kamon Apache CXF client reference configuration # +# ================================================== # + +# Settings to control the CXF Client instrumentation +# +# IMPORTANT: The entire configuration of the CXF Client Instrumentation is based on the constructs provided by the +# Kamon Instrumentation Common library which will always fallback to the settings found under the +# "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to +# find and understand in the context of this project and commented out so that any changes to the default settings +# will actually have effect. +# +kamon.instrumentation.apache.cxf { + + # + # Configuration for CXF context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP client instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + #channel = "default" + } + + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests + # and finish them when the response is received from the server. + #enabled = yes + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Client Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + operations { + + # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP + # Client instrumentation will always try to use the HTTP Operation Name Generator configured below to get + # a name, but if it fails to generate it then this name will be used. + default = "apache.cxf.client" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - hostname: Uses the request Host as the operation name. + # - method: Uses the request HTTP method as the operation name. + # + #name-generator = "method" + } + } + +} + +kanela { + modules { + apache-cxf { + name = "Apache CXF Client" + description = "Provides tracing of client calls made with the official Apache CXF library." + instrumentations = [ + "kamon.instrumentation.apache.cxf.client.ClientProxyFactoryBeanInstrumentation" + ] + enabled = true + within = [ + "org.apache.cxf..*", + ] + } + } +} diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala new file mode 100644 index 000000000..04cbeef17 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ApacheCxfClientHelper.scala @@ -0,0 +1,103 @@ +package kamon.instrumentation.apache.cxf.client + +import kamon.instrumentation.http.HttpMessage +import org.apache.cxf.message.Message +import org.apache.cxf.message.Message.{HTTP_REQUEST_METHOD, PROTOCOL_HEADERS, RESPONSE_CODE} +import org.slf4j.LoggerFactory + +import java.net.{URI, URISyntaxException} +import java.util.Collections.{emptyMap => jEmptyMap, singletonList => jList} +import java.util.{List => JList, Map => JMap} +import scala.collection.mutable +import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, MapHasAsScala} + +class ApacheCxfClientHelper + +object ApacheCxfClientHelper { + + private val _logger = LoggerFactory.getLogger(classOf[ApacheCxfClientHelper]) + + def toRequestBuilder(request: Message): HttpMessage.RequestBuilder[Message] = + new RequestReader with HttpMessage.RequestBuilder[Message] { + + val delegate: Message = request + + val uri: URI = getUri(request) + + override def write(header: String, value: String): Unit = { + val headers: mutable.Map[String, String] = getAllHeaders(delegate).to(mutable.Map) + headers.put(header, value) + delegate.put(Message.PROTOCOL_HEADERS, headers.map(m => m._1 -> jList(m._2)).toMap.asJava) + } + + override def build(): Message = { + _logger.trace("Prepared request for instrumentation: {}", this) + delegate + } + + override def toString(): String = s"RequestReader(host=$host,port=$port,method=$method,path=$path)" + } + + def toResponse(message: Message): HttpMessage.Response = new HttpMessage.Response { + override def statusCode: Int = message.get(RESPONSE_CODE) match { + case code: Integer => code + case _ => + _logger.debug("Not able to retrieve status code from response") + -1 + } + } + private def getUri(message: Message): URI = + try { + getUriAsString(message).map(s => new URI(s)).orNull + } catch { + case e: URISyntaxException => throw new RuntimeException(e.getMessage, e) + } + + private def safeGet(message: Message, key: String): Option[String] = + if (message.containsKey(key)) { + message.get(key) match { + case value: String => Option.apply(value) + case _ => Option.empty + } + } else Option.empty + + private def getUriAsString(message: Message): Option[String] = { + val requestUrl: Option[String] = safeGet(message, Message.REQUEST_URL).orElse { + var address = safeGet(message, Message.ENDPOINT_ADDRESS) + val requestUri = safeGet(message, Message.REQUEST_URI) + if (requestUri.exists(r => r.startsWith("/"))) { + if (!address.exists(a => a.startsWith(requestUri.get))) { + if (address.exists(t => t.endsWith("/") && t.length > 1)) { + address = address.map(a => a.substring(0, a.length)) + } + address.map(a => a + requestUri.getOrElse("")) + } else requestUri + } else address + } + safeGet(message, Message.QUERY_STRING).map(q => requestUrl.map(u => s"$u?$q")).getOrElse(requestUrl) + } + + private def getAllHeaders(message: Message): Map[String, String] = (message.get(PROTOCOL_HEADERS) match { + case hs: JMap[String, JList[String]] => hs + case _ => jEmptyMap[String, JList[String]]() + }).asScala.map { case (key, values) => key -> values.asScala.mkString(", ") }.toMap + + private trait RequestReader extends HttpMessage.Request { + def uri: URI + def delegate: Message + + override def host: String = if (uri != null) uri.getHost else null + + override def port: Int = if (uri != null) uri.getPort else 0 + + override def method: String = delegate.get(HTTP_REQUEST_METHOD).asInstanceOf[String] + + override def path: String = if (uri != null) uri.getPath else null + + override def read(header: String): Option[String] = getAllHeaders(delegate).get(header) + + override def readAll(): Map[String, String] = getAllHeaders(delegate) + + override def url: String = if (uri != null) uri.toString else null + } +} diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala new file mode 100644 index 000000000..8e0138f6b --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/ClientProxyFactoryBeanInstrumentation.scala @@ -0,0 +1,32 @@ +package kamon.instrumentation.apache.cxf.client + +import kamon.Kamon +import kamon.instrumentation.http.HttpClientInstrumentation +import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler +import kanela.agent.api.instrumentation.InstrumentationBuilder +import org.apache.cxf.message.Message + +class ClientProxyFactoryBeanInstrumentation extends InstrumentationBuilder { + + onSubTypesOf("org.apache.cxf.frontend.ClientProxyFactoryBean") + .advise(method("initFeatures"), classOf[TracingClientFeatureInitializer]) +} + +object ClientProxyFactoryBeanInstrumentation { + Kamon.onReconfigure(_ => + ClientProxyFactoryBeanInstrumentation.rebuildHttpClientInstrumentation(): Unit + ) + + @volatile var cxfClientInstrumentation: HttpClientInstrumentation = rebuildHttpClientInstrumentation() + + private def rebuildHttpClientInstrumentation(): HttpClientInstrumentation = { + val httpClientConfig = Kamon.config().getConfig("kamon.instrumentation.apache.cxf") + cxfClientInstrumentation = HttpClientInstrumentation.from(httpClientConfig, "apache.cxf.client") + cxfClientInstrumentation + } + + def processResponse(handler: RequestHandler[_], message: Message, t: Throwable = null): Unit = { + if (t != null) handler.span.fail(t).finish() + else handler.processResponse(ApacheCxfClientHelper.toResponse(message)) + } +} diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala new file mode 100644 index 000000000..0ed2ea365 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TraceScopeHolder.scala @@ -0,0 +1,17 @@ +package kamon.instrumentation.apache.cxf.client + +import kamon.context.Storage.Scope +import kamon.instrumentation.http.HttpClientInstrumentation.RequestHandler +import org.apache.cxf.message.Message + +import java.io.Closeable + +private class TraceScopeHolder(val traceScope: Option[TraceScope], val detached: Boolean = false) extends Serializable + +private case class TraceScope(handler: RequestHandler[Message], scope: Option[Scope]) extends Closeable { + + override def close(): Unit = { + if (handler != null && handler.span != null) handler.span.finish() + if (scope.nonEmpty) scope.get.close() + } +} diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala new file mode 100644 index 000000000..735c143b2 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeature.scala @@ -0,0 +1,22 @@ +package kamon.instrumentation.apache.cxf.client + +import org.apache.cxf.Bus +import org.apache.cxf.feature.AbstractFeature +import org.apache.cxf.interceptor.InterceptorProvider + +private class TracingClientFeature extends AbstractFeature { + + private val setup = new TracingClientSetupInterceptor() + private val receive = new TracingClientReceiveInterceptor() + private val postInvoke = new TracingClientPostInvokeInterceptor() + + override def initializeProvider(provider: InterceptorProvider, bus: Bus): Unit = { + provider.getOutInterceptors.add(0, setup) + provider.getOutFaultInterceptors.add(0, setup) + provider.getInInterceptors.add(0, receive) + provider.getInInterceptors.add(postInvoke) + provider.getInFaultInterceptors.add(0, receive) + provider.getInFaultInterceptors.add(postInvoke) + super.initializeProvider(provider, bus) + } +} diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala new file mode 100644 index 000000000..48708862a --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientFeatureInitializer.scala @@ -0,0 +1,16 @@ +package kamon.instrumentation.apache.cxf.client + +import kanela.agent.libs.net.bytebuddy.asm.Advice +import kanela.agent.libs.net.bytebuddy.asm.Advice.This +import org.apache.cxf.frontend.ClientProxyFactoryBean + +import scala.annotation.static + +class TracingClientFeatureInitializer +object TracingClientFeatureInitializer { + + @Advice.OnMethodEnter + @static def onEnter(@This clientProxyFactoryBean: Any) = clientProxyFactoryBean match { + case c: ClientProxyFactoryBean => c.getFeatures.add(new TracingClientFeature) + } +} diff --git a/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala new file mode 100644 index 000000000..b0281f7f8 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/main/scala/kamon/instrumentation/apache/cxf/client/TracingClientInterceptor.scala @@ -0,0 +1,121 @@ +package kamon.instrumentation.apache.cxf.client + +import kamon.Kamon +import kamon.context.Storage.Scope +import kamon.trace.Span +import org.apache.cxf.message.Message +import org.apache.cxf.phase.Phase.{POST_INVOKE, RECEIVE, SETUP} +import org.apache.cxf.phase.{PhaseInterceptor, PhaseInterceptorChain} + +import java.util.Collections + +private sealed abstract class AbstractTracingClientInterceptor(phase: String) extends PhaseInterceptor[Message] { + + protected val TRACE_SCOPE = "org.apache.cxf.tracing.client.kamon.traceScope" + + override def getAfter: java.util.Set[String] = Collections.emptySet() + + override def getBefore: java.util.Set[String] = Collections.emptySet() + + override def getId: String = getClass.getName + + override def getPhase: String = phase + + override def getAdditionalInterceptors: java.util.Collection[PhaseInterceptor[_ <: Message]] = null + + override def handleFault(message: Message): Unit = {} + + protected def isAsyncResponse: Boolean = !PhaseInterceptorChain.getCurrentMessage.getExchange.isSynchronous + + protected def processFailed(message: Message): Unit = { + message.getExchange.get(TRACE_SCOPE) match { + case holder: TraceScopeHolder => + if (holder != null) { + holder.traceScope.foreach(t => + if (t.handler != null) { + val exception = message.getContent(classOf[Exception]) + ClientProxyFactoryBeanInstrumentation.processResponse(t.handler, message, exception) + t.close() + } + ) + } + } + } +} + +private class TracingClientSetupInterceptor extends AbstractTracingClientInterceptor(SETUP) { + + override def handleMessage(message: Message): Unit = { + val parentContext = Kamon.currentContext() + val builder = ApacheCxfClientHelper.toRequestBuilder(message) + val handler = ClientProxyFactoryBeanInstrumentation.cxfClientInstrumentation.createHandler(builder, parentContext) + val scope = Kamon.storeContext(parentContext.withEntry(Span.Key, handler.span)) + + val holder: TraceScopeHolder = + // In case of asynchronous client invocation, the span should be detached as JAX-WS + // client request / response interceptors are going to be executed in different threads. + if (isAsyncResponse) new TraceScopeHolder(Option(TraceScope(handler, Option(scope))), true) + else new TraceScopeHolder(Option(TraceScope(handler, Option(scope)))) + + message.getExchange.put(TRACE_SCOPE, holder) + } + + override def handleFault(message: Message): Unit = message.getExchange.get(TRACE_SCOPE) match { + case holder: TraceScopeHolder => + if (holder != null) { + holder.traceScope.foreach(t => { + var newScope: Scope = null + try { + // If the client invocation was asynchronous, the trace span has been created + // in another thread and should be re-attached to the current one. + if (holder.detached) { + newScope = t.scope.map(s => Kamon.storeContext(s.context.withEntry(Span.Key, t.handler.span))).orNull + } + val exception = message.getContent(classOf[Exception]) + ClientProxyFactoryBeanInstrumentation.processResponse(t.handler, message, exception) + } finally { + if (newScope != null) newScope.close() + t.close() + } + }) + } + } +} + +private class TracingClientReceiveInterceptor extends AbstractTracingClientInterceptor(RECEIVE) { + + override def handleMessage(message: Message): Unit = message.getExchange.get(TRACE_SCOPE) match { + case holder: TraceScopeHolder => + if (holder != null) { + holder.traceScope.foreach(t => + // If the client invocation was asynchronous, the trace span has been created + // in another thread and should be re-attached to the current one. + if (holder.detached) { + // Close scope which has been created in another thread + t.scope.foreach(s => s.close()) + val newScope = t.scope.map(s => Kamon.storeContext(s.context.withEntry(Span.Key, t.handler.span))) + val modifyHolder = new TraceScopeHolder(Option(TraceScope(t.handler, newScope)), holder.detached) + message.getExchange.put(TRACE_SCOPE, modifyHolder) + } + ) + } + } + override def handleFault(message: Message): Unit = processFailed(message) +} + +private class TracingClientPostInvokeInterceptor extends AbstractTracingClientInterceptor(POST_INVOKE) { + + override def handleMessage(message: Message): Unit = message.getExchange.get(TRACE_SCOPE) match { + case holder: TraceScopeHolder => + if (holder != null) { + holder.traceScope.foreach(t => + try { + ClientProxyFactoryBeanInstrumentation.processResponse(t.handler, message) + } finally { + t.close() + } + ) + } + } + override def handleFault(message: Message): Unit = processFailed(message) +} diff --git a/instrumentation/kamon-apache-cxf/src/test/resources/application.conf b/instrumentation/kamon-apache-cxf/src/test/resources/application.conf new file mode 100644 index 000000000..3deb48156 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/test/resources/application.conf @@ -0,0 +1,19 @@ +kamon { + trace.sampler = "always" +} + +kanela { + # debug-mode = true + # log-level = "DEBUG" +} + +kamon.instrumentation.apache.cxf { + tracing { + operations { + mappings { + "/CustomHelloWorldService" = "custom-named-from-config" + } + } + } +} + diff --git a/instrumentation/kamon-apache-cxf/src/test/resources/logback.xml b/instrumentation/kamon-apache-cxf/src/test/resources/logback.xml new file mode 100644 index 000000000..6823d19eb --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/test/resources/logback.xml @@ -0,0 +1,17 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala new file mode 100644 index 000000000..9ddf9c379 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/MessageSpec.scala @@ -0,0 +1,105 @@ +package kamon.instrumentation.apache.cxf.client + +import com.dimafeng.testcontainers.{ForAllTestContainer, MockServerContainer} +import kamon.instrumentation.apache.cxf.client.util.MockServerExpectations +import kamon.tag.Lookups.{plain, plainBoolean, plainLong} +import kamon.testkit.{InitAndStopKamonAfterAll, Reconfigure, TestSpanReporter} +import org.scalatest.OptionValues +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.SpanSugar +import org.scalatest.wordspec.AnyWordSpec +import org.slf4j.LoggerFactory + +class MessageSpec + extends AnyWordSpec + with Matchers + with Eventually + with SpanSugar + with Reconfigure + with OptionValues + with TestSpanReporter + with InitAndStopKamonAfterAll + with ForAllTestContainer { + + private val _logger = LoggerFactory.getLogger(classOf[MessageSpec]) + + "The apache cxf client making call" should { + "create client span when call method sayHello()" in { + clientExpectation.simpleExpectation() + val client = clientExpectation.simpleClient + val target = clientExpectation.simpleTarget + val response = client.sayHello() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + _logger.info("Received Span: {}", span) + span.operationName shouldBe "apache.cxf.client" + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "apache.cxf.client" + } + } + "replace operation name from config" in { + clientExpectation.customExpectation() + val client = clientExpectation.customClient + val target = clientExpectation.customTarget + val response = client.sayHello() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + _logger.debug("Received Span: {}", span) + span.operationName shouldBe "custom-named-from-config" + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "apache.cxf.client" + } + } + "mark spans as errors when request fails" in { + clientExpectation.test500Expectation() + val client = clientExpectation.test500Client + val target = clientExpectation.test500Target + val response = client.sayHello() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + _logger.debug("Received Span: {}", span) + span.operationName shouldBe "apache.cxf.client" + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "apache.cxf.client" + span.metricTags.get(plainBoolean("error")) shouldBe true + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + span.hasError shouldBe true + } + } + "mark spans as errors when while processing the response throws an exception" in { + clientExpectation.failingExpectation() + val client = clientExpectation.failingClient + val target = clientExpectation.failingTarget + assertThrows[RuntimeException] { + val response = client.sayHello() + } + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + _logger.info("Received Span: {}", span) + span.operationName shouldBe "apache.cxf.client" + span.tags.get(plain("http.url")) shouldBe target + span.tags.get(plain("error.message")) should not be null + span.metricTags.get(plain("component")) shouldBe "apache.cxf.client" + span.metricTags.get(plainBoolean("error")) shouldBe true + } + } + } + + override val container: MockServerContainer = MockServerContainer() + lazy val clientExpectation: MockServerExpectations = new MockServerExpectations("localhost", container.serverPort) + + override protected def beforeAll(): Unit = { + super.beforeAll() + container.start() + } + + override protected def afterAll(): Unit = { + container.stop() + super.afterAll() + } +} diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala new file mode 100644 index 000000000..70015a8b7 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/FailingInterceptor.scala @@ -0,0 +1,25 @@ +package kamon.instrumentation.apache.cxf.client.util + +import org.apache.cxf.message.Message +import org.apache.cxf.phase.Phase.{POST_INVOKE, RECEIVE} +import org.apache.cxf.phase.PhaseInterceptor + +import java.util.Collections + +class FailingInterceptor extends PhaseInterceptor[Message] { + override def handleMessage(message: Message): Unit = { + throw new RuntimeException("Dummy Exception") + } + + override def getAfter: java.util.Set[String] = Collections.emptySet() + + override def getBefore: java.util.Set[String] = Collections.emptySet() + + override def getId: String = getClass.getName + + override def getPhase: String = RECEIVE + + override def getAdditionalInterceptors: java.util.Collection[PhaseInterceptor[_ <: Message]] = null + + override def handleFault(message: Message): Unit = {} +} diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala new file mode 100644 index 000000000..b0fdf5393 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/HelloWorldService.scala @@ -0,0 +1,9 @@ +package kamon.instrumentation.apache.cxf.client.util + +import javax.jws.{WebMethod, WebService} + +@WebService +trait HelloWorldService { + @WebMethod + def sayHello(): String +} diff --git a/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala new file mode 100644 index 000000000..252e15d06 --- /dev/null +++ b/instrumentation/kamon-apache-cxf/src/test/scala/kamon/instrumentation/apache/cxf/client/util/MockServerExpectations.scala @@ -0,0 +1,104 @@ +package kamon.instrumentation.apache.cxf.client.util + +import org.apache.cxf.interceptor.Interceptor +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean +import org.apache.cxf.message.Message +import org.mockserver.client.MockServerClient +import org.mockserver.model.HttpRequest.request +import org.mockserver.model.HttpResponse.response +import org.slf4j.LoggerFactory + +import scala.jdk.CollectionConverters.SeqHasAsJava + +class MockServerExpectations(private val host: String, private val port: Int) { + + private val _logger = LoggerFactory.getLogger(classOf[MockServerExpectations]) + + private[util] def client: MockServerClient = new MockServerClient(host, port) + def endpoint = s"http://$host:$port" + def simplePath: String = "/HelloWorldService" + def customPath: String = "/CustomHelloWorldService" + def test500Path: String = "/Test500HelloWorldService" + + def failingPath: String = "/FailingHelloWorldService" + + def simpleTarget = s"$endpoint$simplePath" + def customTarget = s"$endpoint$customPath" + def test500Target = s"$endpoint$test500Path" + def failingTarget = s"$endpoint$failingPath" + + val simpleClient = new HelloWorldServiceClient(simpleTarget) + + val customClient = new HelloWorldServiceClient(customTarget) + + val test500Client = new HelloWorldServiceClient(test500Target) + + val failingClient = + new HelloWorldServiceClient(address = failingTarget, inInterceptors = List(new FailingInterceptor)) + + private val RESPONSE: String = + "Hello!" + + def simpleExpectation(): Unit = client.when( + request() + .withMethod("POST") + .withPath(simplePath) + ).respond( + response() + .withStatusCode(200) + .withBody(RESPONSE) + ) + + def customExpectation(): Unit = client.when( + request() + .withMethod("POST") + .withPath(customPath) + ).respond( + response() + .withStatusCode(200) + .withBody(RESPONSE) + ) + + def test500Expectation(): Unit = client.when( + request() + .withMethod("POST") + .withPath(test500Path) + ).respond( + response() + .withStatusCode(500) + .withBody(RESPONSE) + ) + def failingExpectation(): Unit = client.when( + request() + .withMethod("POST") + .withPath(failingPath) + ).respond( + response() + .withStatusCode(200) + .withBody(RESPONSE) + ) +} + +class HelloWorldServiceClient( + address: String, + outInterceptors: List[Interceptor[Message]] = List.empty, + inInterceptors: List[Interceptor[Message]] = List.empty +) { + val client: HelloWorldService = HelloWorldServiceClientFactory.create(address, outInterceptors, inInterceptors) + def sayHello(): String = client.sayHello() +} + +object HelloWorldServiceClientFactory { + def create( + address: String, + outInterceptors: List[Interceptor[Message]], + inInterceptors: List[Interceptor[Message]] + ): HelloWorldService = { + val factory = new JaxWsProxyFactoryBean() + factory.setServiceClass(classOf[HelloWorldService]) + factory.setAddress(address) + factory.getInInterceptors.addAll(inInterceptors.asJava) + factory.getOutInterceptors.addAll(outInterceptors.asJava) + factory.create().asInstanceOf[HelloWorldService] + } +}