Skip to content

Commit

Permalink
JmsTrace holds a reference to the Jms message
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Sep 12, 2024
1 parent caae6f8 commit 1e7221e
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private void outgoingTrace(Destination actualDestination, Message<?> message, ja
}
JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(actualDestination.toString())
.withProperties(messageProperties)
.withMessage(jmsPayload)
.build();
jmsInstrumenter.traceOutgoing(message, jmsTrace);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void incomingTrace(IncomingJmsMessage<?> jmsMessage) {

JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(queueName.orElse(null))
.withProperties(properties)
.withMessage(unwrapped)
.build();

jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.smallrye.reactive.messaging.jms.tracing;

import static io.opentelemetry.semconv.SemanticAttributes.*;

import java.util.Collections;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,73 @@
package io.smallrye.reactive.messaging.jms.tracing;

import java.util.HashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

import jakarta.jms.JMSException;
import jakarta.jms.Message;

public class JmsTrace {
private final String queue;
private final Map<String, Object> messageProperties;
private final Message jmsMessage;

private JmsTrace(final String queue, Map<String, Object> messageProperties) {
private JmsTrace(final String queue, Message jmsMessage) {
this.queue = queue;
this.messageProperties = messageProperties;
this.jmsMessage = jmsMessage;
}

public String getQueue() {
return queue;
}

public Map<String, Object> getMessageProperties() {
return messageProperties;
public Message getMessage() {
return jmsMessage;
}

public List<String> getPropertyNames() {
List<String> keys = new ArrayList<>();
Enumeration propertyNames = null;
try {
propertyNames = jmsMessage.getPropertyNames();
while (propertyNames.hasMoreElements()) {
keys.add(propertyNames.nextElement().toString());
}
} catch (JMSException ignored) {
}
return keys;
}

public String getProperty(final String key) {
try {
return jmsMessage.getStringProperty(key);
} catch (JMSException ignored) {
return null;
}
}

public void setProperty(final String key, final String value) {
try {
jmsMessage.setStringProperty(key, value);
} catch (JMSException ignored) {
}
}

public static class Builder {
private String queue;
private Map<String, Object> properties;
private Message jmsMessage;

public Builder withQueue(final String queue) {
this.queue = queue;
return this;
}

public Builder withProperties(Map<String, Object> properties) {
this.properties = new HashMap<>(properties);
public Builder withMessage(final Message message) {
this.jmsMessage = message;
return this;
}

public JmsTrace build() {
return new JmsTrace(queue, properties);
return new JmsTrace(queue, jmsMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
package io.smallrye.reactive.messaging.jms.tracing;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.opentelemetry.context.propagation.TextMapGetter;

public enum JmsTraceTextMapGetter implements TextMapGetter<JmsTrace> {
INSTANCE;

@Override
public Iterable<String> keys(final JmsTrace carrier) {
List<String> keys = new ArrayList<>();
Map<String, Object> messageProperties = carrier.getMessageProperties();
for (String key : messageProperties.keySet()) {
keys.add(key);
}
return keys;
return carrier.getPropertyNames();
}

@Override
public String get(final JmsTrace carrier, final String key) {
if (carrier != null) {
Object value = carrier.getMessageProperties().get(key);
if (value != null) {
return value.toString();
}
return carrier.getProperty(key);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.smallrye.reactive.messaging.jms.tracing;

import java.util.Map;

import io.opentelemetry.context.propagation.TextMapSetter;

public enum JmsTraceTextMapSetter implements TextMapSetter<JmsTrace> {
Expand All @@ -10,8 +8,7 @@ public enum JmsTraceTextMapSetter implements TextMapSetter<JmsTrace> {
@Override
public void set(final JmsTrace carrier, final String key, final String value) {
if (carrier != null) {
Map<String, Object> messageProperties = carrier.getMessageProperties();
messageProperties.put(key, value);
carrier.setProperty(key, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,37 @@
package io.smallrye.reactive.messaging.jms.tracing;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

import java.util.HashMap;
import java.util.Map;

import jakarta.jms.JMSException;
import jakarta.jms.Message;

import org.junit.jupiter.api.Test;

class MessagePropertiesExtractAdapterTest {
@Test
public void verifyNullHeaderHandled() {
public void verifyNullHeaderHandled() throws JMSException {
Map<String, Object> messageProperties = new HashMap<>();
messageProperties.put("test_null_header", null);

JmsTrace jmsTrace = new JmsTrace.Builder().withProperties(messageProperties).build();
// Create a mock JMS message
Message msg = mock(Message.class);
doAnswer(i -> messageProperties.get(i.getArgument(0, String.class)))
.when(msg).getStringProperty(anyString());
doAnswer(i -> messageProperties.put(i.getArgument(0, String.class), i.getArgument(1, String.class)))
.when(msg).setStringProperty(anyString(), anyString());

JmsTrace jmsTrace = new JmsTrace.Builder().withMessage(msg).build();

String headerValue = JmsTraceTextMapGetter.INSTANCE.get(jmsTrace, "test_null_header");
JmsTraceTextMapSetter.INSTANCE.set(jmsTrace, "test_other_header", "value");

assertThat(headerValue).isNull();
assertThat(messageProperties.get("test_other_header")).isEqualTo("value");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void testFromAppToJms() {
List<SpanData> spans = spanExporter.getFinishedSpanItems();
assertEquals(20, spans.size());

assertEquals(20, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());
assertEquals(10, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());

SpanData span = spans.get(0);
assertEquals(SpanKind.PRODUCER, span.getKind());
Expand Down

0 comments on commit 1e7221e

Please sign in to comment.