Skip to content

Commit

Permalink
Merge pull request #2727 from dankristensen/feature/jms_tracing
Browse files Browse the repository at this point in the history
Added tracing using opentelemetry for jms like kafka does.
  • Loading branch information
ozangunalp authored Sep 24, 2024
2 parents a320780 + e223309 commit ecd7904
Show file tree
Hide file tree
Showing 17 changed files with 1,541 additions and 13 deletions.
15 changes: 15 additions & 0 deletions smallrye-reactive-messaging-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
Expand All @@ -117,6 +127,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.jms;

import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -13,14 +14,16 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T> {
public class IncomingJmsMessage<T> implements ContextAwareMessage<T>, MetadataInjectableMessage<T> {
private final Message delegate;
private final Executor executor;
private final Class<T> clazz;
private final JsonMapping jsonMapping;
private final IncomingJmsMessageMetadata jmsMetadata;
private final Metadata metadata;
private Metadata metadata;

IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping) {
this.delegate = message;
Expand All @@ -42,7 +45,7 @@ public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.
}

this.jmsMetadata = new IncomingJmsMessageMetadata(message);
this.metadata = Metadata.of(this.jmsMetadata);
this.metadata = captureContextMetadata(this.jmsMetadata);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -119,6 +122,7 @@ public CompletionStage<Void> ack(Metadata metadata) {
}
})
.runSubscriptionOn(executor)
.emitOn(this::runOnMessageContext)
.subscribeAsCompletionStage();
}

Expand All @@ -139,4 +143,8 @@ public <C> C unwrap(Class<C> unwrapType) {
throw ex.illegalStateUnableToUnwrap(unwrapType);
}

@Override
public void injectMetadata(Object metadataObject) {
metadata = this.metadata.with(metadataObject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;

@ApplicationScoped
Expand All @@ -50,6 +52,7 @@
@ConnectorAttribute(name = "broadcast", description = "Whether or not the JMS message should be dispatched to multiple consumers", direction = Direction.INCOMING, type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "durable", description = "Set to `true` to use a durable subscription", direction = Direction.INCOMING, type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "destination-type", description = "The type of destination. It can be either `queue` or `topic`", direction = Direction.INCOMING_AND_OUTGOING, type = "string", defaultValue = "queue")
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")

@ConnectorAttribute(name = "disable-message-id", description = "Omit the message id in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean")
@ConnectorAttribute(name = "disable-message-timestamp", description = "Omit the message timestamp in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean")
Expand Down Expand Up @@ -92,6 +95,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@Inject
Instance<JsonMapping> jsonMapper;

@Inject
ExecutionHolder executionHolders;

@Inject
@ConfigProperty(name = "smallrye.jms.threads.max-pool-size", defaultValue = DEFAULT_MAX_POOL_SIZE)
int maxPoolSize;
Expand All @@ -100,6 +106,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL)
int ttl;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -134,7 +143,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor);
JmsSource source = new JmsSource(executionHolders.vertx(), holder, ic, openTelemetryInstance, jsonMapping, executor);
sources.add(source);
return source.getSource();
}
Expand All @@ -155,7 +164,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config);
JmsResourceHolder<JMSProducer> holder = new JmsResourceHolder<>(oc.getChannel(), () -> createJmsContext(oc));
contexts.add(holder);
return new JmsSink(holder, oc, jsonMapping, executor).getSink();
return new JmsSink(holder, oc, openTelemetryInstance, jsonMapping, executor).getSink();
}

private ConnectionFactory pickTheFactory(String factoryName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

import jakarta.enterprise.inject.Instance;
import jakarta.jms.BytesMessage;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
Expand All @@ -16,7 +20,10 @@

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;

Expand All @@ -25,9 +32,14 @@ class JmsSink {
private final Flow.Subscriber<Message<?>> sink;
private final JsonMapping jsonMapping;
private final Executor executor;
private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
private final boolean isTracingEnabled;

JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping,
JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping,
Executor executor) {
this.isTracingEnabled = config.getTracingEnabled();

String name = config.getDestination().orElseGet(config::getChannel);
String type = config.getDestinationType();
boolean retry = config.getRetry();
Expand Down Expand Up @@ -73,6 +85,12 @@ class JmsSink {
this.jsonMapping = jsonMapping;
this.executor = executor;

if (isTracingEnabled) {
jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
} else {
jmsInstrumenter = null;
}

sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> send(resourceHolder, message)
.onFailure(t -> retry)
.retry()
Expand All @@ -90,6 +108,7 @@ private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHo
JMSContext context = resourceHolder.getContext();
// If the payload is a JMS Message, send it as it is, ignoring metadata.
if (payload instanceof jakarta.jms.Message) {
outgoingTrace(destination, message, (jakarta.jms.Message) payload);
return dispatch(message,
() -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload));
}
Expand Down Expand Up @@ -150,12 +169,35 @@ private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHo
actualDestination = destination;
}

outgoingTrace(actualDestination, message, outgoing);
return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing));
} catch (JMSException e) {
return Uni.createFrom().failure(new IllegalStateException(e));
}
}

private void outgoingTrace(Destination actualDestination, Message<?> message, jakarta.jms.Message payload) {
if (isTracingEnabled) {
jakarta.jms.Message jmsPayload = payload;
Map<String, Object> messageProperties = new HashMap<>();
try {
Enumeration<?> propertyNames = jmsPayload.getPropertyNames();

while (propertyNames.hasMoreElements()) {
String propertyName = (String) propertyNames.nextElement();
messageProperties.put(propertyName, jmsPayload.getObjectProperty(propertyName));
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(actualDestination.toString())
.withMessage(jmsPayload)
.build();
jmsInstrumenter.traceOutgoing(message, jmsTrace);
}
}

private boolean isPrimitiveBoxed(Class<?> c) {
return c.equals(Boolean.class)
|| c.equals(Integer.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,51 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.jms.*;
import jakarta.enterprise.inject.Instance;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import jakarta.jms.Topic;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

class JmsSource {

private final Multi<IncomingJmsMessage<?>> source;
private final JmsResourceHolder<JMSConsumer> resourceHolder;

private final JmsPublisher publisher;
private final boolean isTracingEnabled;
private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
private final Context context;

JmsSource(JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping,
JmsSource(Vertx vertx, JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping,
Executor executor) {
this.isTracingEnabled = config.getTracingEnabled();
String channel = config.getChannel();
final String destinationName = config.getDestination().orElseGet(config::getChannel);
String selector = config.getSelector().orElse(null);
Expand All @@ -46,9 +69,18 @@ class JmsSource {
}
});
resourceHolder.getClient();
if (isTracingEnabled) {
jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
} else {
jmsInstrumenter = null;
}

this.publisher = new JmsPublisher(resourceHolder);
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
source = Multi.createFrom().publisher(publisher)
.emitOn(context::runOnContext)
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping))
.onItem().invoke(this::incomingTrace)
.onFailure(t -> {
log.terminalErrorOnChannel(channel);
this.resourceHolder.close();
Expand Down Expand Up @@ -190,4 +222,42 @@ long add(long req) {
}
}
}

public void incomingTrace(IncomingJmsMessage<?> jmsMessage) {
if (isTracingEnabled) {
Optional<IncomingJmsMessageMetadata> metadata = jmsMessage.getMetadata(IncomingJmsMessageMetadata.class);
Optional<String> queueName = metadata.map(a -> {
Destination destination = a.getDestination();
if (destination instanceof Queue) {
Queue queue = (Queue) destination;
try {
return queue.getQueueName();
} catch (JMSException e) {
return null;
}
}
return null;
});
Message unwrapped = jmsMessage.unwrap(Message.class);

Map<String, Object> properties = new HashMap<>();
try {
Enumeration<?> propertyNames = unwrapped.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String name = (String) propertyNames.nextElement();
Object value = unwrapped.getObjectProperty(name);
properties.put(name, value);
}
} catch (JMSException e) {
throw new RuntimeException(e);
}

JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(queueName.orElse(null))
.withMessage(unwrapped)
.build();

jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace);
}
}
}
Loading

0 comments on commit ecd7904

Please sign in to comment.