From d29836bd5e2e28bcca78ac72b368ed64e0d6f486 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 21 Mar 2024 12:24:26 -0400 Subject: [PATCH] GH-3146: Propagate KT observation callback Fixes: #3146 Propagate `KafkaTemplate` observation callback. There is a requirement to see logs correlated in the `CompletableFuture` result after `KafkaTemplate.send()` operation. * Add `observation.openScope()` into the `buildCallback()` logic **Auto-cherry-pick to `3.1.x` & `3.0.x`** --- .../org/springframework/kafka/core/KafkaTemplate.java | 6 +++--- .../kafka/support/micrometer/ObservationTests.java | 10 +++++++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 2162e44c5f..f6377ed961 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -829,6 +829,7 @@ private ProducerRecord interceptorProducerRecord(ProducerRecord prod return producerRecord; } + @SuppressWarnings("try") private Callback buildCallback(final ProducerRecord producerRecord, final Producer producer, final CompletableFuture> future, @Nullable Object sample, Observation observation) { @@ -841,10 +842,9 @@ private Callback buildCallback(final ProducerRecord producerRecord, final catch (Exception e) { this.logger.warn(e, () -> "Error executing interceptor onAcknowledgement callback"); } - try { + try (Observation.Scope ignored = observation.openScope()) { if (exception == null) { successTimer(sample, producerRecord); - observation.stop(); future.complete(new SendResult<>(producerRecord, metadata)); if (this.producerListener != null) { this.producerListener.onSuccess(producerRecord, metadata); @@ -855,7 +855,6 @@ private Callback buildCallback(final ProducerRecord producerRecord, final else { failureTimer(sample, exception, producerRecord); observation.error(exception); - observation.stop(); future.completeExceptionally( new KafkaProducerException(producerRecord, "Failed to send", exception)); if (this.producerListener != null) { @@ -865,6 +864,7 @@ private Callback buildCallback(final ProducerRecord producerRecord, final } } finally { + observation.stop(); closeProducer(producer, this.transactional); } }; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index b5caf4e51e..45fa015ac7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -108,7 +109,14 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate spanFromCallback = new AtomicReference<>(); + + template.send("observation.testT1", "test") + .thenAccept((sendResult) -> spanFromCallback.set(tracer.currentSpan())) + .get(10, TimeUnit.SECONDS); + + assertThat(spanFromCallback.get()).isNotNull(); + assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(listener.record).isNotNull(); Headers headers = listener.record.headers();