Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tracing using opentelemetry for jms like kafka does. #2727

Merged
merged 9 commits into from
Sep 24, 2024
24 changes: 24 additions & 0 deletions smallrye-reactive-messaging-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
Expand All @@ -117,6 +127,20 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-instrumentation-api-incubator</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>999-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
Expand All @@ -50,6 +51,7 @@
@ConnectorAttribute(name = "broadcast", description = "Whether or not the JMS message should be dispatched to multiple consumers", direction = Direction.INCOMING, type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "durable", description = "Set to `true` to use a durable subscription", direction = Direction.INCOMING, type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "destination-type", description = "The type of destination. It can be either `queue` or `topic`", direction = Direction.INCOMING_AND_OUTGOING, type = "string", defaultValue = "queue")
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")

@ConnectorAttribute(name = "disable-message-id", description = "Omit the message id in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean")
@ConnectorAttribute(name = "disable-message-timestamp", description = "Omit the message timestamp in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean")
Expand Down Expand Up @@ -100,6 +102,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL)
int ttl;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -134,7 +139,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor);
JmsSource source = new JmsSource(holder, ic, openTelemetryInstance, jsonMapping, executor);
sources.add(source);
return source.getSource();
}
Expand All @@ -155,7 +160,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config);
JmsResourceHolder<JMSProducer> holder = new JmsResourceHolder<>(oc.getChannel(), () -> createJmsContext(oc));
contexts.add(holder);
return new JmsSink(holder, oc, jsonMapping, executor).getSink();
return new JmsSink(holder, oc, openTelemetryInstance, jsonMapping, executor).getSink();
}

private ConnectionFactory pickTheFactory(String factoryName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

import jakarta.enterprise.inject.Instance;
import jakarta.jms.BytesMessage;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
Expand All @@ -16,7 +20,10 @@

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;

Expand All @@ -25,9 +32,14 @@ class JmsSink {
private final Flow.Subscriber<Message<?>> sink;
private final JsonMapping jsonMapping;
private final Executor executor;
private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
private final boolean isTracingEnabled;

JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping,
JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping,
Executor executor) {
this.isTracingEnabled = config.getTracingEnabled();

String name = config.getDestination().orElseGet(config::getChannel);
String type = config.getDestinationType();
boolean retry = config.getRetry();
Expand Down Expand Up @@ -73,6 +85,12 @@ class JmsSink {
this.jsonMapping = jsonMapping;
this.executor = executor;

if (isTracingEnabled) {
jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
} else {
jmsInstrumenter = null;
}

sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> send(resourceHolder, message)
.onFailure(t -> retry)
.retry()
Expand All @@ -90,6 +108,8 @@ private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHo
JMSContext context = resourceHolder.getContext();
// If the payload is a JMS Message, send it as it is, ignoring metadata.
if (payload instanceof jakarta.jms.Message) {
outgoingTrace(resourceHolder, message, (jakarta.jms.Message) payload);

return dispatch(message,
() -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload));
}
Expand Down Expand Up @@ -150,12 +170,35 @@ private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHo
actualDestination = destination;
}

outgoingTrace(resourceHolder, message, outgoing);
return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing));
} catch (JMSException e) {
return Uni.createFrom().failure(new IllegalStateException(e));
}
}

private void outgoingTrace(JmsResourceHolder<JMSProducer> resourceHolder, Message<?> message, jakarta.jms.Message payload) {
if (isTracingEnabled) {
jakarta.jms.Message jmsPayload = payload;
Map<String, Object> messageProperties = new HashMap<>();
try {
Enumeration<?> propertyNames = jmsPayload.getPropertyNames();

while (propertyNames.hasMoreElements()) {
String propertyName = (String) propertyNames.nextElement();
messageProperties.put(propertyName, jmsPayload.getObjectProperty(propertyName));
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
JmsTrace kafkaTrace = new JmsTrace.Builder()
.withQueue(resourceHolder.getDestination().toString())//TODO Find the correct queue name
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved
.withProperties(messageProperties)
.build();
jmsInstrumenter.traceOutgoing(message, kafkaTrace);
}
}

private boolean isPrimitiveBoxed(Class<?> c) {
return c.equals(Boolean.class)
|| c.equals(Integer.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.inject.Instance;
import jakarta.jms.*;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;

class JmsSource {
Expand All @@ -23,9 +31,13 @@ class JmsSource {
private final JmsResourceHolder<JMSConsumer> resourceHolder;

private final JmsPublisher publisher;
private final boolean isTracingEnabled;
private final JmsOpenTelemetryInstrumenter jmsInstrumenter;

JmsSource(JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping,
JmsSource(JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping,
Executor executor) {
this.isTracingEnabled = config.getTracingEnabled();
String channel = config.getChannel();
final String destinationName = config.getDestination().orElseGet(config::getChannel);
String selector = config.getSelector().orElse(null);
Expand All @@ -46,9 +58,16 @@ class JmsSource {
}
});
resourceHolder.getClient();
if (isTracingEnabled) {
jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
} else {
jmsInstrumenter = null;
}

this.publisher = new JmsPublisher(resourceHolder);
source = Multi.createFrom().publisher(publisher)
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping))
.onItem().invoke(this::incomingTrace)
.onFailure(t -> {
log.terminalErrorOnChannel(channel);
this.resourceHolder.close();
Expand Down Expand Up @@ -190,4 +209,42 @@ long add(long req) {
}
}
}

public void incomingTrace(IncomingJmsMessage<?> jmsMessage) {
if (isTracingEnabled) {
Optional<IncomingJmsMessageMetadata> metadata = jmsMessage.getMetadata(IncomingJmsMessageMetadata.class);
Optional<String> queueName = metadata.map(a -> {
Destination destination = a.getDestination();
if (destination instanceof Queue) {
Queue queue = (Queue) destination;
try {
return queue.getQueueName();
} catch (JMSException e) {
return null;
}
}
return null;
});
Message unwrapped = jmsMessage.unwrap(Message.class);

Map<String, Object> properties = new HashMap<>();
try {
Enumeration<?> propertyNames = unwrapped.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String name = (String) propertyNames.nextElement();
Object value = unwrapped.getObjectProperty(name);
properties.put(name, value);
}
} catch (JMSException e) {
throw new RuntimeException(e);
}

JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(queueName.orElse(null))
.withProperties(properties)
.build();

jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.smallrye.reactive.messaging.jms.tracing;

import static io.opentelemetry.semconv.SemanticAttributes.*;

import java.util.Collections;
import java.util.List;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;

public class JmsAttributesExtractor implements AttributesExtractor<JmsTrace, Void> {
private final MessagingAttributesGetter<JmsTrace, Void> messagingAttributesGetter;

public JmsAttributesExtractor() {
this.messagingAttributesGetter = new JmsMessagingAttributesGetter();
}

@Override
public void onStart(final AttributesBuilder attributes, final Context parentContext, final JmsTrace jmsTrace) {
String groupId = jmsTrace.getGroupId();
String clientId = jmsTrace.getClientId();
if (groupId != null && clientId != null) {
String consumerId = groupId;
if (!clientId.isEmpty()) {
consumerId += " - " + clientId;
}
attributes.put(MESSAGING_CONSUMER_ID, consumerId);
}
if (groupId != null) {
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved
attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, groupId);
}
}

@Override
public void onEnd(
final AttributesBuilder attributes,
final Context context,
final JmsTrace kafkaTrace,
final Void unused,
final Throwable error) {

}

public MessagingAttributesGetter<JmsTrace, Void> getMessagingAttributesGetter() {
return messagingAttributesGetter;
}

private static final class JmsMessagingAttributesGetter implements MessagingAttributesGetter<JmsTrace, Void> {
@Override
public String getSystem(final JmsTrace jmsTrace) {
return "jms";
}

@Override
public String getDestination(final JmsTrace jmsTrace) {
return jmsTrace.getQueue();
}

@Override
public boolean isTemporaryDestination(final JmsTrace jmsTrace) {
return false;
}

@Override
public String getConversationId(final JmsTrace jmsTrace) {
return null;
}

@Override
public String getMessageId(final JmsTrace jmsTrace, final Void unused) {
return null;
}

@Override
public List<String> getMessageHeader(JmsTrace jmsTrace, String name) {
return Collections.emptyList();
}

@Override
public String getDestinationTemplate(JmsTrace jmsTrace) {
return null;
}

@Override
public boolean isAnonymousDestination(JmsTrace jmsTrace) {
return false;
}

@Override
public Long getMessageBodySize(JmsTrace jmsTrace) {
return null;
}

@Override
public Long getMessageEnvelopeSize(JmsTrace jmsTrace) {
return null;
}

@Override
public String getClientId(JmsTrace jmsTrace) {
if (jmsTrace.getClientId() == null) {
return null;
}
return jmsTrace.getClientId();
}

@Override
public Long getBatchMessageCount(JmsTrace jmsTrace, Void unused) {
return null;
}
}
}
Loading