From 0459ff43cb013ad4db9be7cd0a4b06a4d0d27704 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 12 Sep 2024 13:49:36 +0200 Subject: [PATCH] JmsTrace holds a reference to the Jms message --- .../reactive/messaging/jms/JmsSink.java | 2 +- .../reactive/messaging/jms/JmsSource.java | 2 +- .../jms/tracing/JmsAttributesExtractor.java | 2 - .../messaging/jms/tracing/JmsTrace.java | 54 +++++++++++++++---- .../jms/tracing/JmsTraceTextMapGetter.java | 16 +----- .../jms/tracing/JmsTraceTextMapSetter.java | 5 +- .../MessagePropertiesExtractAdapterTest.java | 19 ++++++- .../jms/tracing/TracingPropagationTest.java | 25 +++++---- 8 files changed, 80 insertions(+), 45 deletions(-) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java index 8bd3cd778..5c628d6d0 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java @@ -192,7 +192,7 @@ private void outgoingTrace(Destination actualDestination, Message message, ja } JmsTrace jmsTrace = new JmsTrace.Builder() .withQueue(actualDestination.toString()) - .withProperties(messageProperties) + .withMessage(jmsPayload) .build(); jmsInstrumenter.traceOutgoing(message, jmsTrace); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 6a26e351e..9d285e970 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -248,7 +248,7 @@ public void incomingTrace(IncomingJmsMessage jmsMessage) { JmsTrace jmsTrace = new JmsTrace.Builder() .withQueue(queueName.orElse(null)) - .withProperties(properties) + .withMessage(unwrapped) .build(); jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace); diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java index 4bcca50be..92b85faf2 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -1,7 +1,5 @@ package io.smallrye.reactive.messaging.jms.tracing; -import static io.opentelemetry.semconv.SemanticAttributes.*; - import java.util.Collections; import java.util.List; diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java index 45a973719..177ccbf8c 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java @@ -1,41 +1,73 @@ package io.smallrye.reactive.messaging.jms.tracing; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import jakarta.jms.JMSException; +import jakarta.jms.Message; public class JmsTrace { private final String queue; - private final Map messageProperties; + private final Message jmsMessage; - private JmsTrace(final String queue, Map messageProperties) { + private JmsTrace(final String queue, Message jmsMessage) { this.queue = queue; - this.messageProperties = messageProperties; + this.jmsMessage = jmsMessage; } public String getQueue() { return queue; } - public Map getMessageProperties() { - return messageProperties; + public Message getMessage() { + return jmsMessage; + } + + public List getPropertyNames() { + List keys = new ArrayList<>(); + Enumeration propertyNames = null; + try { + propertyNames = jmsMessage.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + keys.add(propertyNames.nextElement().toString()); + } + } catch (JMSException ignored) { + } + return keys; + } + + public String getProperty(final String key) { + try { + return jmsMessage.getStringProperty(key); + } catch (JMSException ignored) { + return null; + } + } + + public void setProperty(final String key, final String value) { + try { + jmsMessage.setStringProperty(key, value); + } catch (JMSException ignored) { + } } public static class Builder { private String queue; - private Map properties; + private Message jmsMessage; public Builder withQueue(final String queue) { this.queue = queue; return this; } - public Builder withProperties(Map properties) { - this.properties = new HashMap<>(properties); + public Builder withMessage(final Message message) { + this.jmsMessage = message; return this; } public JmsTrace build() { - return new JmsTrace(queue, properties); + return new JmsTrace(queue, jmsMessage); } } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java index b6978ef60..89d81be8e 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java @@ -1,9 +1,5 @@ package io.smallrye.reactive.messaging.jms.tracing; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import io.opentelemetry.context.propagation.TextMapGetter; public enum JmsTraceTextMapGetter implements TextMapGetter { @@ -11,21 +7,13 @@ public enum JmsTraceTextMapGetter implements TextMapGetter { @Override public Iterable keys(final JmsTrace carrier) { - List keys = new ArrayList<>(); - Map messageProperties = carrier.getMessageProperties(); - for (String key : messageProperties.keySet()) { - keys.add(key); - } - return keys; + return carrier.getPropertyNames(); } @Override public String get(final JmsTrace carrier, final String key) { if (carrier != null) { - Object value = carrier.getMessageProperties().get(key); - if (value != null) { - return value.toString(); - } + return carrier.getProperty(key); } return null; } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java index ffc7409b0..471d38a30 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java @@ -1,7 +1,5 @@ package io.smallrye.reactive.messaging.jms.tracing; -import java.util.Map; - import io.opentelemetry.context.propagation.TextMapSetter; public enum JmsTraceTextMapSetter implements TextMapSetter { @@ -10,8 +8,7 @@ public enum JmsTraceTextMapSetter implements TextMapSetter { @Override public void set(final JmsTrace carrier, final String key, final String value) { if (carrier != null) { - Map messageProperties = carrier.getMessageProperties(); - messageProperties.put(key, value); + carrier.setProperty(key, value); } } } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java index 22f77681d..ecba271e5 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java @@ -1,22 +1,37 @@ package io.smallrye.reactive.messaging.jms.tracing; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import java.util.HashMap; import java.util.Map; +import jakarta.jms.JMSException; +import jakarta.jms.Message; + import org.junit.jupiter.api.Test; class MessagePropertiesExtractAdapterTest { @Test - public void verifyNullHeaderHandled() { + public void verifyNullHeaderHandled() throws JMSException { Map messageProperties = new HashMap<>(); messageProperties.put("test_null_header", null); - JmsTrace jmsTrace = new JmsTrace.Builder().withProperties(messageProperties).build(); + // Create a mock JMS message + Message msg = mock(Message.class); + doAnswer(i -> messageProperties.get(i.getArgument(0, String.class))) + .when(msg).getStringProperty(anyString()); + doAnswer(i -> messageProperties.put(i.getArgument(0, String.class), i.getArgument(1, String.class))) + .when(msg).setStringProperty(anyString(), anyString()); + + JmsTrace jmsTrace = new JmsTrace.Builder().withMessage(msg).build(); String headerValue = JmsTraceTextMapGetter.INSTANCE.get(jmsTrace, "test_null_header"); + JmsTraceTextMapSetter.INSTANCE.set(jmsTrace, "test_other_header", "value"); assertThat(headerValue).isNull(); + assertThat(messageProperties.get("test_other_header")).isEqualTo("value"); } } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java index 133c24c4b..89628c8f6 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java @@ -50,6 +50,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.smallrye.reactive.messaging.jms.IncomingJmsMessageMetadata; import io.smallrye.reactive.messaging.jms.JmsConnector; import io.smallrye.reactive.messaging.jms.PayloadConsumerBean; import io.smallrye.reactive.messaging.jms.ProducerBean; @@ -103,22 +104,26 @@ public void testFromAppToJms() { String queue = "queue-one"; Map map = new HashMap<>(); map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.destination", "queue-one"); + map.put("mp.messaging.incoming.stuff.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.stuff.destination", "queue-one"); MapBasedConfig config = new MapBasedConfig(map); addConfig(config); - WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + WeldContainer container = deploy(MyAppReceivingData.class, ProducerBean.class); - PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); - await().until(() -> bean.list().size() >= 10); - assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + MyAppReceivingData bean = container.select(MyAppReceivingData.class).get(); + await().until(() -> bean.results().size() >= 10); + assertThat(bean.results()) + .extracting(m -> m.getMetadata(IncomingJmsMessageMetadata.class).get()) + .allSatisfy(m -> assertThat(m.getStringProperty("traceparent")).isNotNull()) + .extracting(m -> Integer.parseInt(m.getBody(String.class))) + .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); CompletableResultCode completableResultCode = tracerProvider.forceFlush(); completableResultCode.join(10, TimeUnit.SECONDS); List spans = spanExporter.getFinishedSpanItems(); assertEquals(20, spans.size()); - assertEquals(20, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + assertEquals(10, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); SpanData span = spans.get(0); assertEquals(SpanKind.PRODUCER, span.getKind()); @@ -316,15 +321,15 @@ public List list() { @ApplicationScoped public static class MyAppReceivingData { - private final List results = new CopyOnWriteArrayList<>(); + private final List> results = new CopyOnWriteArrayList<>(); @Incoming("stuff") public CompletionStage consume(Message input) { - results.add(input.getPayload()); + results.add(input); return input.ack(); } - public List results() { + public List> results() { return results; } }