Skip to content

Commit

Permalink
JmsTrace holds a reference to the Jms message
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Sep 12, 2024
1 parent caae6f8 commit 0459ff4
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private void outgoingTrace(Destination actualDestination, Message<?> message, ja
}
JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(actualDestination.toString())
.withProperties(messageProperties)
.withMessage(jmsPayload)
.build();
jmsInstrumenter.traceOutgoing(message, jmsTrace);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void incomingTrace(IncomingJmsMessage<?> jmsMessage) {

JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(queueName.orElse(null))
.withProperties(properties)
.withMessage(unwrapped)
.build();

jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.smallrye.reactive.messaging.jms.tracing;

import static io.opentelemetry.semconv.SemanticAttributes.*;

import java.util.Collections;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,73 @@
package io.smallrye.reactive.messaging.jms.tracing;

import java.util.HashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

import jakarta.jms.JMSException;
import jakarta.jms.Message;

public class JmsTrace {
private final String queue;
private final Map<String, Object> messageProperties;
private final Message jmsMessage;

private JmsTrace(final String queue, Map<String, Object> messageProperties) {
private JmsTrace(final String queue, Message jmsMessage) {
this.queue = queue;
this.messageProperties = messageProperties;
this.jmsMessage = jmsMessage;
}

public String getQueue() {
return queue;
}

public Map<String, Object> getMessageProperties() {
return messageProperties;
public Message getMessage() {
return jmsMessage;
}

public List<String> getPropertyNames() {
List<String> keys = new ArrayList<>();
Enumeration propertyNames = null;
try {
propertyNames = jmsMessage.getPropertyNames();
while (propertyNames.hasMoreElements()) {
keys.add(propertyNames.nextElement().toString());
}
} catch (JMSException ignored) {
}
return keys;
}

public String getProperty(final String key) {
try {
return jmsMessage.getStringProperty(key);
} catch (JMSException ignored) {
return null;
}
}

public void setProperty(final String key, final String value) {
try {
jmsMessage.setStringProperty(key, value);
} catch (JMSException ignored) {
}
}

public static class Builder {
private String queue;
private Map<String, Object> properties;
private Message jmsMessage;

public Builder withQueue(final String queue) {
this.queue = queue;
return this;
}

public Builder withProperties(Map<String, Object> properties) {
this.properties = new HashMap<>(properties);
public Builder withMessage(final Message message) {
this.jmsMessage = message;
return this;
}

public JmsTrace build() {
return new JmsTrace(queue, properties);
return new JmsTrace(queue, jmsMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
package io.smallrye.reactive.messaging.jms.tracing;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.opentelemetry.context.propagation.TextMapGetter;

public enum JmsTraceTextMapGetter implements TextMapGetter<JmsTrace> {
INSTANCE;

@Override
public Iterable<String> keys(final JmsTrace carrier) {
List<String> keys = new ArrayList<>();
Map<String, Object> messageProperties = carrier.getMessageProperties();
for (String key : messageProperties.keySet()) {
keys.add(key);
}
return keys;
return carrier.getPropertyNames();
}

@Override
public String get(final JmsTrace carrier, final String key) {
if (carrier != null) {
Object value = carrier.getMessageProperties().get(key);
if (value != null) {
return value.toString();
}
return carrier.getProperty(key);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.smallrye.reactive.messaging.jms.tracing;

import java.util.Map;

import io.opentelemetry.context.propagation.TextMapSetter;

public enum JmsTraceTextMapSetter implements TextMapSetter<JmsTrace> {
Expand All @@ -10,8 +8,7 @@ public enum JmsTraceTextMapSetter implements TextMapSetter<JmsTrace> {
@Override
public void set(final JmsTrace carrier, final String key, final String value) {
if (carrier != null) {
Map<String, Object> messageProperties = carrier.getMessageProperties();
messageProperties.put(key, value);
carrier.setProperty(key, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
package io.smallrye.reactive.messaging.jms.tracing;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.Map;

import jakarta.jms.JMSException;
import jakarta.jms.Message;

import org.junit.jupiter.api.Test;

class MessagePropertiesExtractAdapterTest {
@Test
public void verifyNullHeaderHandled() {
public void verifyNullHeaderHandled() throws JMSException {
Map<String, Object> messageProperties = new HashMap<>();
messageProperties.put("test_null_header", null);

JmsTrace jmsTrace = new JmsTrace.Builder().withProperties(messageProperties).build();
// Create a mock JMS message
Message msg = mock(Message.class);
doAnswer(i -> messageProperties.get(i.getArgument(0, String.class)))
.when(msg).getStringProperty(anyString());
doAnswer(i -> messageProperties.put(i.getArgument(0, String.class), i.getArgument(1, String.class)))
.when(msg).setStringProperty(anyString(), anyString());

JmsTrace jmsTrace = new JmsTrace.Builder().withMessage(msg).build();

String headerValue = JmsTraceTextMapGetter.INSTANCE.get(jmsTrace, "test_null_header");
JmsTraceTextMapSetter.INSTANCE.set(jmsTrace, "test_other_header", "value");

assertThat(headerValue).isNull();
assertThat(messageProperties.get("test_other_header")).isEqualTo("value");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.smallrye.reactive.messaging.jms.IncomingJmsMessageMetadata;
import io.smallrye.reactive.messaging.jms.JmsConnector;
import io.smallrye.reactive.messaging.jms.PayloadConsumerBean;
import io.smallrye.reactive.messaging.jms.ProducerBean;
Expand Down Expand Up @@ -103,22 +104,26 @@ public void testFromAppToJms() {
String queue = "queue-one";
Map<String, Object> map = new HashMap<>();
map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME);
map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME);
map.put("mp.messaging.incoming.jms.destination", "queue-one");
map.put("mp.messaging.incoming.stuff.connector", JmsConnector.CONNECTOR_NAME);
map.put("mp.messaging.incoming.stuff.destination", "queue-one");
MapBasedConfig config = new MapBasedConfig(map);
addConfig(config);
WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class);
WeldContainer container = deploy(MyAppReceivingData.class, ProducerBean.class);

PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get();
await().until(() -> bean.list().size() >= 10);
assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
MyAppReceivingData bean = container.select(MyAppReceivingData.class).get();
await().until(() -> bean.results().size() >= 10);
assertThat(bean.results())
.extracting(m -> m.getMetadata(IncomingJmsMessageMetadata.class).get())
.allSatisfy(m -> assertThat(m.getStringProperty("traceparent")).isNotNull())
.extracting(m -> Integer.parseInt(m.getBody(String.class)))
.containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

CompletableResultCode completableResultCode = tracerProvider.forceFlush();
completableResultCode.join(10, TimeUnit.SECONDS);
List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertEquals(20, spans.size());

assertEquals(20, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());
assertEquals(10, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());

SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
Expand Down Expand Up @@ -316,15 +321,15 @@ public List<Integer> list() {

@ApplicationScoped
public static class MyAppReceivingData {
private final List<Integer> results = new CopyOnWriteArrayList<>();
private final List<Message<Integer>> results = new CopyOnWriteArrayList<>();

@Incoming("stuff")
public CompletionStage<Void> consume(Message<Integer> input) {
results.add(input.getPayload());
results.add(input);
return input.ack();
}

public List<Integer> results() {
public List<Message<Integer>> results() {
return results;
}
}
Expand Down

0 comments on commit 0459ff4

Please sign in to comment.