diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala index 646d5cb61..139c49fc2 100644 --- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/KafkaInstrumentation.scala @@ -17,7 +17,7 @@ package kamon.instrumentation.kafka.client import com.typesafe.config.Config -import kamon.Kamon +import kamon.{ClassLoading, Kamon} import kamon.context.Context import kamon.instrumentation.context.HasContext import kamon.trace.Span @@ -50,7 +50,10 @@ object KafkaInstrumentation { log.warn("W3C TraceContext propagation should be used only with identifier-scheme = double") } SpanPropagation.W3CTraceContext() - case other => sys.error(s"Unrecognized option [$other] for the kamon.instrumentation.kafka.client.tracing.propagator config.") + case fqcn => try ClassLoading.createInstance[KafkaPropagator](fqcn) catch { + case t: Throwable => + sys.error(s"Failed to create kafka propagator instance from FQCN [$fqcn]. Reason: ${t.getMessage}") + } } ) } diff --git a/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/client/KafkaClientsTracingInstrumentationSpec.scala b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/client/KafkaClientsTracingInstrumentationSpec.scala index e5dfd542c..a8a18948f 100644 --- a/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/client/KafkaClientsTracingInstrumentationSpec.scala +++ b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/client/KafkaClientsTracingInstrumentationSpec.scala @@ -243,6 +243,45 @@ class KafkaClientsTracingInstrumentationSpec extends AnyWordSpec with Matchers span.parentId shouldBe sendingSpan.get.id } } + + "create a Producer/Consumer Span when publish/consume a message with custom format" in new SpanReportingTestScope(reporter) { + applyConfig("kamon.instrumentation.kafka.client.tracing.propagator = kamon.instrumentation.kafka.testutil.CustomPropagationImplementation") + + val testTopicName = "custom-context-propagation" + publishStringMessageToKafka(testTopicName, "Hello world!!!") + + val consumedRecord = consumeFirstRawRecord(testTopicName) + + consumedRecord.headers().lastHeader("x-trace-id").value() should not be empty + consumedRecord.headers().lastHeader("traceparent") shouldBe null + consumedRecord.headers().lastHeader("kctx") shouldBe null + consumedRecord.value() shouldBe "Hello world!!!" + + awaitNumReportedSpans(2) + + var sendingSpan: Option[Span.Finished] = None + assertReportedSpan(_.operationName == "producer.send") { span => + span.metricTags.get(plain("component")) shouldBe "kafka.producer" + span.metricTags.get(plain("span.kind")) shouldBe "producer" + span.tags.get(plain("kafka.topic")) shouldBe testTopicName + span.tags.get(plain("kafka.key")) shouldBe KafkaInstrumentation.Keys.Null + span.tags.get(plainLong("kafka.partition")) shouldBe 0L + sendingSpan = Some(span) + } + + assertReportedSpan(_.operationName == "consumer.process") { span => + span.metricTags.get(plain("component")) shouldBe "kafka.consumer" + span.metricTags.get(plain("span.kind")) shouldBe "consumer" + span.tags.get(plain("kafka.topic")) shouldBe testTopicName + span.tags.get(plain("kafka.client-id")) should not be empty + span.tags.get(plain("kafka.group-id")) should not be empty + span.tags.get(plainLong("kafka.partition")) shouldBe 0L + span.tags.get(plainLong("kafka.timestamp")) shouldBe consumedRecord.timestamp() + span.tags.get(plain("kafka.timestamp-type")) shouldBe consumedRecord.timestampType().name + span.trace.id shouldBe sendingSpan.get.trace.id + span.parentId shouldBe sendingSpan.get.id + } + } } private def publishStringMessageToKafka(topicName: String, message: String): Unit = { diff --git a/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/testutil/CustomPropagationImplementation.scala b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/testutil/CustomPropagationImplementation.scala new file mode 100644 index 000000000..d8c852054 --- /dev/null +++ b/instrumentation/kamon-kafka/src/test/scala/kamon/instrumentation/kafka/testutil/CustomPropagationImplementation.scala @@ -0,0 +1,39 @@ +package kamon.instrumentation.kafka.testutil + +import kamon.context.Context +import kamon.instrumentation.kafka.client.KafkaPropagator +import kamon.trace.Trace.SamplingDecision +import kamon.trace.{Identifier, Span, Trace} + +import org.apache.kafka.common.header.{Headers => KafkaHeaders} + +class CustomPropagationImplementation extends KafkaPropagator { + + override def read(medium: KafkaHeaders, context: Context): Context = { + + val contextWithParent = for { + traceId <- Option(medium.lastHeader("x-trace-id")).map(_.value()) + traceIdStr = new String(traceId, "utf-8") + spanId <- Option(medium.lastHeader("x-span-id")).map(_.value()) + spanIdStr = new String(spanId, "utf-8") + sampled <- Option(medium.lastHeader("x-trace-sampled")).map(_.value()).map{ + case Array(1) => SamplingDecision.Sample + case Array(0) => SamplingDecision.DoNotSample + case _ => SamplingDecision.Unknown + } + span = Span.Remote(Identifier(spanIdStr, spanId), Identifier.Empty, Trace(Identifier(traceIdStr, traceId), sampled)) + } yield context.withEntry(Span.Key, span) + + contextWithParent.getOrElse(context) + } + + override def write(context: Context, medium: KafkaHeaders): Unit = { + val span = context.get(Span.Key) + + if (span != Span.Empty) { + medium.add("x-trace-id", span.trace.id.string.getBytes("utf-8")) + medium.add("x-span-id", span.id.string.getBytes("utf-8")) + medium.add("x-trace-sampled", if (span.trace.samplingDecision == SamplingDecision.Sample) Array(1) else Array(0)) + } + } +} \ No newline at end of file