Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tracing using opentelemetry for jms like kafka does. #2727

Merged
merged 9 commits into from
Sep 24, 2024
15 changes: 15 additions & 0 deletions smallrye-reactive-messaging-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
Expand All @@ -117,6 +127,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-otel</artifactId>
<version>${project.version}</version>
</dependency>
ozangunalp marked this conversation as resolved.
Show resolved Hide resolved

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.smallrye.reactive.messaging.jms;

import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex;
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
Expand All @@ -13,14 +14,16 @@

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;

public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.messaging.Message<T> {
public class IncomingJmsMessage<T> implements ContextAwareMessage<T>, MetadataInjectableMessage<T> {
private final Message delegate;
private final Executor executor;
private final Class<T> clazz;
private final JsonMapping jsonMapping;
private final IncomingJmsMessageMetadata jmsMetadata;
private final Metadata metadata;
private Metadata metadata;

IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping) {
this.delegate = message;
Expand All @@ -42,7 +45,7 @@ public class IncomingJmsMessage<T> implements org.eclipse.microprofile.reactive.
}

this.jmsMetadata = new IncomingJmsMessageMetadata(message);
this.metadata = Metadata.of(this.jmsMetadata);
this.metadata = captureContextMetadata(this.jmsMetadata);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -119,6 +122,7 @@ public CompletionStage<Void> ack(Metadata metadata) {
}
})
.runSubscriptionOn(executor)
.emitOn(this::runOnMessageContext)
.subscribeAsCompletionStage();
}

Expand All @@ -139,4 +143,8 @@ public <C> C unwrap(Class<C> unwrapType) {
throw ex.illegalStateUnableToUnwrap(unwrapType);
}

@Override
public void injectMetadata(Object metadataObject) {
metadata = this.metadata.with(metadataObject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;

@ApplicationScoped
Expand All @@ -50,6 +52,7 @@
@ConnectorAttribute(name = "broadcast", description = "Whether or not the JMS message should be dispatched to multiple consumers", direction = Direction.INCOMING, type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "durable", description = "Set to `true` to use a durable subscription", direction = Direction.INCOMING, type = "boolean", defaultValue = "false")
@ConnectorAttribute(name = "destination-type", description = "The type of destination. It can be either `queue` or `topic`", direction = Direction.INCOMING_AND_OUTGOING, type = "string", defaultValue = "queue")
@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true")

@ConnectorAttribute(name = "disable-message-id", description = "Omit the message id in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean")
@ConnectorAttribute(name = "disable-message-timestamp", description = "Omit the message timestamp in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean")
Expand Down Expand Up @@ -92,6 +95,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@Inject
Instance<JsonMapping> jsonMapper;

@Inject
ExecutionHolder executionHolders;

@Inject
@ConfigProperty(name = "smallrye.jms.threads.max-pool-size", defaultValue = DEFAULT_MAX_POOL_SIZE)
int maxPoolSize;
Expand All @@ -100,6 +106,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector {
@ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL)
int ttl;

@Inject
Instance<OpenTelemetry> openTelemetryInstance;

private ExecutorService executor;
private JsonMapping jsonMapping;
private final List<JmsSource> sources = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -134,7 +143,7 @@ public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config);
JmsResourceHolder<JMSConsumer> holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic));
contexts.add(holder);
JmsSource source = new JmsSource(holder, ic, jsonMapping, executor);
JmsSource source = new JmsSource(executionHolders.vertx(), holder, ic, openTelemetryInstance, jsonMapping, executor);
sources.add(source);
return source.getSource();
}
Expand All @@ -155,7 +164,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config);
JmsResourceHolder<JMSProducer> holder = new JmsResourceHolder<>(oc.getChannel(), () -> createJmsContext(oc));
contexts.add(holder);
return new JmsSink(holder, oc, jsonMapping, executor).getSink();
return new JmsSink(holder, oc, openTelemetryInstance, jsonMapping, executor).getSink();
}

private ConnectionFactory pickTheFactory(String factoryName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;

import jakarta.enterprise.inject.Instance;
import jakarta.jms.BytesMessage;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
Expand All @@ -16,7 +20,10 @@

import org.eclipse.microprofile.reactive.messaging.Message;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;

Expand All @@ -25,9 +32,14 @@ class JmsSink {
private final Flow.Subscriber<Message<?>> sink;
private final JsonMapping jsonMapping;
private final Executor executor;
private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
private final boolean isTracingEnabled;

JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping,
JmsSink(JmsResourceHolder<JMSProducer> resourceHolder, JmsConnectorOutgoingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping,
Executor executor) {
this.isTracingEnabled = config.getTracingEnabled();

String name = config.getDestination().orElseGet(config::getChannel);
String type = config.getDestinationType();
boolean retry = config.getRetry();
Expand Down Expand Up @@ -73,6 +85,12 @@ class JmsSink {
this.jsonMapping = jsonMapping;
this.executor = executor;

if (isTracingEnabled) {
jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance);
} else {
jmsInstrumenter = null;
}

sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> send(resourceHolder, message)
.onFailure(t -> retry)
.retry()
Expand All @@ -90,6 +108,7 @@ private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHo
JMSContext context = resourceHolder.getContext();
// If the payload is a JMS Message, send it as it is, ignoring metadata.
if (payload instanceof jakarta.jms.Message) {
outgoingTrace(destination, message, (jakarta.jms.Message) payload);
return dispatch(message,
() -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload));
}
Expand Down Expand Up @@ -150,12 +169,35 @@ private Uni<? extends Message<?>> send(JmsResourceHolder<JMSProducer> resourceHo
actualDestination = destination;
}

outgoingTrace(actualDestination, message, outgoing);
return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing));
} catch (JMSException e) {
return Uni.createFrom().failure(new IllegalStateException(e));
}
}

private void outgoingTrace(Destination actualDestination, Message<?> message, jakarta.jms.Message payload) {
if (isTracingEnabled) {
jakarta.jms.Message jmsPayload = payload;
Map<String, Object> messageProperties = new HashMap<>();
try {
Enumeration<?> propertyNames = jmsPayload.getPropertyNames();

while (propertyNames.hasMoreElements()) {
String propertyName = (String) propertyNames.nextElement();
messageProperties.put(propertyName, jmsPayload.getObjectProperty(propertyName));
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(actualDestination.toString())
.withMessage(jmsPayload)
.build();
jmsInstrumenter.traceOutgoing(message, jmsTrace);
}
}

private boolean isPrimitiveBoxed(Class<?> c) {
return c.equals(Boolean.class)
|| c.equals(Integer.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,51 @@
import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log;

import java.time.Duration;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.jms.*;
import jakarta.enterprise.inject.Instance;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.Message;
import jakarta.jms.Queue;
import jakarta.jms.Topic;

import io.opentelemetry.api.OpenTelemetry;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.jms.tracing.JmsTrace;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;

class JmsSource {

private final Multi<IncomingJmsMessage<?>> source;
private final JmsResourceHolder<JMSConsumer> resourceHolder;

private final JmsPublisher publisher;
private final boolean isTracingEnabled;
private final JmsOpenTelemetryInstrumenter jmsInstrumenter;
private final Context context;

JmsSource(JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping,
JmsSource(Vertx vertx, JmsResourceHolder<JMSConsumer> resourceHolder, JmsConnectorIncomingConfiguration config,
Instance<OpenTelemetry> openTelemetryInstance, JsonMapping jsonMapping,
Executor executor) {
this.isTracingEnabled = config.getTracingEnabled();
String channel = config.getChannel();
final String destinationName = config.getDestination().orElseGet(config::getChannel);
String selector = config.getSelector().orElse(null);
Expand All @@ -46,9 +69,18 @@ class JmsSource {
}
});
resourceHolder.getClient();
if (isTracingEnabled) {
jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance);
} else {
jmsInstrumenter = null;
}

this.publisher = new JmsPublisher(resourceHolder);
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
source = Multi.createFrom().publisher(publisher)
.emitOn(context::runOnContext)
.<IncomingJmsMessage<?>> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping))
.onItem().invoke(this::incomingTrace)
.onFailure(t -> {
log.terminalErrorOnChannel(channel);
this.resourceHolder.close();
Expand Down Expand Up @@ -190,4 +222,42 @@ long add(long req) {
}
}
}

public void incomingTrace(IncomingJmsMessage<?> jmsMessage) {
if (isTracingEnabled) {
Optional<IncomingJmsMessageMetadata> metadata = jmsMessage.getMetadata(IncomingJmsMessageMetadata.class);
Optional<String> queueName = metadata.map(a -> {
Destination destination = a.getDestination();
if (destination instanceof Queue) {
Queue queue = (Queue) destination;
try {
return queue.getQueueName();
} catch (JMSException e) {
return null;
}
}
return null;
});
Message unwrapped = jmsMessage.unwrap(Message.class);

Map<String, Object> properties = new HashMap<>();
try {
Enumeration<?> propertyNames = unwrapped.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String name = (String) propertyNames.nextElement();
Object value = unwrapped.getObjectProperty(name);
properties.put(name, value);
}
} catch (JMSException e) {
throw new RuntimeException(e);
}

JmsTrace jmsTrace = new JmsTrace.Builder()
.withQueue(queueName.orElse(null))
.withMessage(unwrapped)
.build();

jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace);
}
}
}
Loading