diff --git a/smallrye-reactive-messaging-jms/pom.xml b/smallrye-reactive-messaging-jms/pom.xml index f460e2383..f1e713573 100644 --- a/smallrye-reactive-messaging-jms/pom.xml +++ b/smallrye-reactive-messaging-jms/pom.xml @@ -106,6 +106,16 @@ ${project.version} test + + io.opentelemetry + opentelemetry-sdk-trace + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + io.smallrye.reactive smallrye-reactive-messaging-provider @@ -117,6 +127,11 @@ ${project.version} provided + + io.smallrye.reactive + smallrye-reactive-messaging-otel + ${project.version} + diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java index 69dc97c61..a4f774d28 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.jms; import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex; +import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -13,14 +14,16 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; -public class IncomingJmsMessage implements org.eclipse.microprofile.reactive.messaging.Message { +public class IncomingJmsMessage implements ContextAwareMessage, MetadataInjectableMessage { private final Message delegate; private final Executor executor; private final Class clazz; private final JsonMapping jsonMapping; private final IncomingJmsMessageMetadata jmsMetadata; - private final Metadata metadata; + private Metadata metadata; IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping) { this.delegate = message; @@ -42,7 +45,7 @@ public class IncomingJmsMessage implements org.eclipse.microprofile.reactive. } this.jmsMetadata = new IncomingJmsMessageMetadata(message); - this.metadata = Metadata.of(this.jmsMetadata); + this.metadata = captureContextMetadata(this.jmsMetadata); } @SuppressWarnings("unchecked") @@ -119,6 +122,7 @@ public CompletionStage ack(Metadata metadata) { } }) .runSubscriptionOn(executor) + .emitOn(this::runOnMessageContext) .subscribeAsCompletionStage(); } @@ -139,4 +143,8 @@ public C unwrap(Class unwrapType) { throw ex.illegalStateUnableToUnwrap(unwrapType); } + @Override + public void injectMetadata(Object metadataObject) { + metadata = this.metadata.with(metadataObject); + } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java index b54e0a9b9..8a7d201a7 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java @@ -28,12 +28,14 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; @ApplicationScoped @@ -50,6 +52,7 @@ @ConnectorAttribute(name = "broadcast", description = "Whether or not the JMS message should be dispatched to multiple consumers", direction = Direction.INCOMING, type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "durable", description = "Set to `true` to use a durable subscription", direction = Direction.INCOMING, type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "destination-type", description = "The type of destination. It can be either `queue` or `topic`", direction = Direction.INCOMING_AND_OUTGOING, type = "string", defaultValue = "queue") +@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true") @ConnectorAttribute(name = "disable-message-id", description = "Omit the message id in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean") @ConnectorAttribute(name = "disable-message-timestamp", description = "Omit the message timestamp in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean") @@ -92,6 +95,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector { @Inject Instance jsonMapper; + @Inject + ExecutionHolder executionHolders; + @Inject @ConfigProperty(name = "smallrye.jms.threads.max-pool-size", defaultValue = DEFAULT_MAX_POOL_SIZE) int maxPoolSize; @@ -100,6 +106,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector { @ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL) int ttl; + @Inject + Instance openTelemetryInstance; + private ExecutorService executor; private JsonMapping jsonMapping; private final List sources = new CopyOnWriteArrayList<>(); @@ -134,7 +143,7 @@ public Flow.Publisher> getPublisher(Config config) { JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic)); contexts.add(holder); - JmsSource source = new JmsSource(holder, ic, jsonMapping, executor); + JmsSource source = new JmsSource(executionHolders.vertx(), holder, ic, openTelemetryInstance, jsonMapping, executor); sources.add(source); return source.getSource(); } @@ -155,7 +164,7 @@ public Flow.Subscriber> getSubscriber(Config config) { JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(oc.getChannel(), () -> createJmsContext(oc)); contexts.add(holder); - return new JmsSink(holder, oc, jsonMapping, executor).getSink(); + return new JmsSink(holder, oc, openTelemetryInstance, jsonMapping, executor).getSink(); } private ConnectionFactory pickTheFactory(String factoryName) { diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java index 91674661c..5c628d6d0 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java @@ -4,9 +4,13 @@ import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; import java.time.Duration; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Flow; +import jakarta.enterprise.inject.Instance; import jakarta.jms.BytesMessage; import jakarta.jms.DeliveryMode; import jakarta.jms.Destination; @@ -16,7 +20,10 @@ import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter; +import io.smallrye.reactive.messaging.jms.tracing.JmsTrace; import io.smallrye.reactive.messaging.json.JsonMapping; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; @@ -25,9 +32,14 @@ class JmsSink { private final Flow.Subscriber> sink; private final JsonMapping jsonMapping; private final Executor executor; + private final JmsOpenTelemetryInstrumenter jmsInstrumenter; + private final boolean isTracingEnabled; - JmsSink(JmsResourceHolder resourceHolder, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping, + JmsSink(JmsResourceHolder resourceHolder, JmsConnectorOutgoingConfiguration config, + Instance openTelemetryInstance, JsonMapping jsonMapping, Executor executor) { + this.isTracingEnabled = config.getTracingEnabled(); + String name = config.getDestination().orElseGet(config::getChannel); String type = config.getDestinationType(); boolean retry = config.getRetry(); @@ -73,6 +85,12 @@ class JmsSink { this.jsonMapping = jsonMapping; this.executor = executor; + if (isTracingEnabled) { + jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance); + } else { + jmsInstrumenter = null; + } + sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> send(resourceHolder, message) .onFailure(t -> retry) .retry() @@ -90,6 +108,7 @@ private Uni> send(JmsResourceHolder resourceHo JMSContext context = resourceHolder.getContext(); // If the payload is a JMS Message, send it as it is, ignoring metadata. if (payload instanceof jakarta.jms.Message) { + outgoingTrace(destination, message, (jakarta.jms.Message) payload); return dispatch(message, () -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload)); } @@ -150,12 +169,35 @@ private Uni> send(JmsResourceHolder resourceHo actualDestination = destination; } + outgoingTrace(actualDestination, message, outgoing); return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing)); } catch (JMSException e) { return Uni.createFrom().failure(new IllegalStateException(e)); } } + private void outgoingTrace(Destination actualDestination, Message message, jakarta.jms.Message payload) { + if (isTracingEnabled) { + jakarta.jms.Message jmsPayload = payload; + Map messageProperties = new HashMap<>(); + try { + Enumeration propertyNames = jmsPayload.getPropertyNames(); + + while (propertyNames.hasMoreElements()) { + String propertyName = (String) propertyNames.nextElement(); + messageProperties.put(propertyName, jmsPayload.getObjectProperty(propertyName)); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + JmsTrace jmsTrace = new JmsTrace.Builder() + .withQueue(actualDestination.toString()) + .withMessage(jmsPayload) + .build(); + jmsInstrumenter.traceOutgoing(message, jmsTrace); + } + } + private boolean isPrimitiveBoxed(Class c) { return c.equals(Boolean.class) || c.equals(Integer.class) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 4d7dbc749..c6f69e5de 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -4,6 +4,10 @@ import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; import java.time.Duration; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -11,11 +15,25 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import jakarta.jms.*; +import jakarta.enterprise.inject.Instance; +import jakarta.jms.Destination; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; +import jakarta.jms.JMSRuntimeException; +import jakarta.jms.Message; +import jakarta.jms.Queue; +import jakarta.jms.Topic; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter; +import io.smallrye.reactive.messaging.jms.tracing.JmsTrace; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.vertx.core.impl.VertxInternal; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; class JmsSource { @@ -23,9 +41,14 @@ class JmsSource { private final JmsResourceHolder resourceHolder; private final JmsPublisher publisher; + private final boolean isTracingEnabled; + private final JmsOpenTelemetryInstrumenter jmsInstrumenter; + private final Context context; - JmsSource(JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping, + JmsSource(Vertx vertx, JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, + Instance openTelemetryInstance, JsonMapping jsonMapping, Executor executor) { + this.isTracingEnabled = config.getTracingEnabled(); String channel = config.getChannel(); final String destinationName = config.getDestination().orElseGet(config::getChannel); String selector = config.getSelector().orElse(null); @@ -46,9 +69,18 @@ class JmsSource { } }); resourceHolder.getClient(); + if (isTracingEnabled) { + jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance); + } else { + jmsInstrumenter = null; + } + this.publisher = new JmsPublisher(resourceHolder); + this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); source = Multi.createFrom().publisher(publisher) + .emitOn(context::runOnContext) .> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping)) + .onItem().invoke(this::incomingTrace) .onFailure(t -> { log.terminalErrorOnChannel(channel); this.resourceHolder.close(); @@ -190,4 +222,42 @@ long add(long req) { } } } + + public void incomingTrace(IncomingJmsMessage jmsMessage) { + if (isTracingEnabled) { + Optional metadata = jmsMessage.getMetadata(IncomingJmsMessageMetadata.class); + Optional queueName = metadata.map(a -> { + Destination destination = a.getDestination(); + if (destination instanceof Queue) { + Queue queue = (Queue) destination; + try { + return queue.getQueueName(); + } catch (JMSException e) { + return null; + } + } + return null; + }); + Message unwrapped = jmsMessage.unwrap(Message.class); + + Map properties = new HashMap<>(); + try { + Enumeration propertyNames = unwrapped.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + String name = (String) propertyNames.nextElement(); + Object value = unwrapped.getObjectProperty(name); + properties.put(name, value); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + + JmsTrace jmsTrace = new JmsTrace.Builder() + .withQueue(queueName.orElse(null)) + .withMessage(unwrapped) + .build(); + + jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace); + } + } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java new file mode 100644 index 000000000..92b85faf2 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -0,0 +1,98 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import java.util.Collections; +import java.util.List; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; + +public class JmsAttributesExtractor implements AttributesExtractor { + private final MessagingAttributesGetter messagingAttributesGetter; + + public JmsAttributesExtractor() { + this.messagingAttributesGetter = new JmsMessagingAttributesGetter(); + } + + @Override + public void onStart(final AttributesBuilder attributes, final Context parentContext, final JmsTrace jmsTrace) { + + } + + @Override + public void onEnd( + final AttributesBuilder attributes, + final Context context, + final JmsTrace jmsTrace, + final Void unused, + final Throwable error) { + + } + + public MessagingAttributesGetter getMessagingAttributesGetter() { + return messagingAttributesGetter; + } + + private static final class JmsMessagingAttributesGetter implements MessagingAttributesGetter { + @Override + public String getSystem(final JmsTrace jmsTrace) { + return "jms"; + } + + @Override + public String getDestination(final JmsTrace jmsTrace) { + return jmsTrace.getQueue(); + } + + @Override + public boolean isTemporaryDestination(final JmsTrace jmsTrace) { + return false; + } + + @Override + public String getConversationId(final JmsTrace jmsTrace) { + return null; + } + + @Override + public String getMessageId(final JmsTrace jmsTrace, final Void unused) { + return null; + } + + @Override + public List getMessageHeader(JmsTrace jmsTrace, String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(JmsTrace jmsTrace) { + return null; + } + + @Override + public boolean isAnonymousDestination(JmsTrace jmsTrace) { + return false; + } + + @Override + public Long getMessageBodySize(JmsTrace jmsTrace) { + return null; + } + + @Override + public Long getMessageEnvelopeSize(JmsTrace jmsTrace) { + return null; + } + + @Override + public String getClientId(JmsTrace jmsTrace) { + return null; + } + + @Override + public Long getBatchMessageCount(JmsTrace jmsTrace, Void unused) { + return null; + } + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java new file mode 100644 index 000000000..86aa11fba --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java @@ -0,0 +1,69 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import jakarta.enterprise.inject.Instance; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.smallrye.reactive.messaging.tracing.TracingUtils; + +/** + * Encapsulates the OpenTelemetry instrumentation API so that those classes are only needed if + * users explicitly enable tracing. + */ +public class JmsOpenTelemetryInstrumenter { + + private final Instrumenter instrumenter; + + private JmsOpenTelemetryInstrumenter(Instrumenter instrumenter) { + this.instrumenter = instrumenter; + } + + public static JmsOpenTelemetryInstrumenter createForSource(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true); + } + + public static JmsOpenTelemetryInstrumenter createForSink(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false); + } + + private static JmsOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) { + + MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH; + + JmsAttributesExtractor jmsAttributesExtractor = new JmsAttributesExtractor(); + MessagingAttributesGetter messagingAttributesGetter = jmsAttributesExtractor + .getMessagingAttributesGetter(); + InstrumenterBuilder builder = Instrumenter.builder(openTelemetry, + "io.smallrye.reactive.messaging", + MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation)); + + builder + .addAttributesExtractor( + MessagingAttributesExtractor.create(messagingAttributesGetter, messageOperation)) + .addAttributesExtractor(jmsAttributesExtractor); + + Instrumenter instrumenter; + if (source) { + instrumenter = builder.buildConsumerInstrumenter(JmsTraceTextMapGetter.INSTANCE); + } else { + instrumenter = builder.buildProducerInstrumenter(JmsTraceTextMapSetter.INSTANCE); + } + + return new JmsOpenTelemetryInstrumenter(instrumenter); + } + + public Message traceIncoming(Message message, JmsTrace jmsTrace) { + return TracingUtils.traceIncoming(instrumenter, message, jmsTrace); + } + + public void traceOutgoing(Message message, JmsTrace jmsTrace) { + TracingUtils.traceOutgoing(instrumenter, message, jmsTrace); + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java new file mode 100644 index 000000000..177ccbf8c --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java @@ -0,0 +1,73 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import jakarta.jms.JMSException; +import jakarta.jms.Message; + +public class JmsTrace { + private final String queue; + private final Message jmsMessage; + + private JmsTrace(final String queue, Message jmsMessage) { + this.queue = queue; + this.jmsMessage = jmsMessage; + } + + public String getQueue() { + return queue; + } + + public Message getMessage() { + return jmsMessage; + } + + public List getPropertyNames() { + List keys = new ArrayList<>(); + Enumeration propertyNames = null; + try { + propertyNames = jmsMessage.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + keys.add(propertyNames.nextElement().toString()); + } + } catch (JMSException ignored) { + } + return keys; + } + + public String getProperty(final String key) { + try { + return jmsMessage.getStringProperty(key); + } catch (JMSException ignored) { + return null; + } + } + + public void setProperty(final String key, final String value) { + try { + jmsMessage.setStringProperty(key, value); + } catch (JMSException ignored) { + } + } + + public static class Builder { + private String queue; + private Message jmsMessage; + + public Builder withQueue(final String queue) { + this.queue = queue; + return this; + } + + public Builder withMessage(final Message message) { + this.jmsMessage = message; + return this; + } + + public JmsTrace build() { + return new JmsTrace(queue, jmsMessage); + } + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java new file mode 100644 index 000000000..89d81be8e --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java @@ -0,0 +1,20 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import io.opentelemetry.context.propagation.TextMapGetter; + +public enum JmsTraceTextMapGetter implements TextMapGetter { + INSTANCE; + + @Override + public Iterable keys(final JmsTrace carrier) { + return carrier.getPropertyNames(); + } + + @Override + public String get(final JmsTrace carrier, final String key) { + if (carrier != null) { + return carrier.getProperty(key); + } + return null; + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java new file mode 100644 index 000000000..471d38a30 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java @@ -0,0 +1,14 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import io.opentelemetry.context.propagation.TextMapSetter; + +public enum JmsTraceTextMapSetter implements TextMapSetter { + INSTANCE; + + @Override + public void set(final JmsTrace carrier, final String key, final String value) { + if (carrier != null) { + carrier.setProperty(key, value); + } + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java index 542f0fc97..d831d8457 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java @@ -65,6 +65,7 @@ public void testDefaultConfiguration() throws JMSException { .with("destination", "queue-one") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -86,6 +87,7 @@ public void testDefaultConfigurationAgainstTopic() throws JMSException { .with("destination-type", "topic") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client1 = new MyJmsClient(jms.createTopic("my-topic")); @@ -111,6 +113,7 @@ public void testWithDeliveryDelayAndMode() throws JMSException { .with("delivery-mode", "non_persistent") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -137,6 +140,7 @@ public void testDisablingMessageIdAndTimestamp() throws JMSException { .with("disable-message-timestamp", true) .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -163,6 +167,7 @@ public void testWithCorrelationIdAndPriorityAndTTL() throws JMSException { .with("ttl", 1000L) .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -187,6 +192,7 @@ public void testWithReplyTo() throws JMSException { .with("reply-to", "my-response") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -213,6 +219,7 @@ public void testWithReplyToTopic() throws JMSException { .with("reply-to-destination-type", "topic") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -238,6 +245,7 @@ public void testWithReplyToWithInvalidDestinationType() { .with("reply-to-destination-type", "invalid") .with("channel-name", "jms"); assertThatThrownBy(() -> new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor)) .isInstanceOf(IllegalArgumentException.class); } @@ -308,6 +316,7 @@ public void testPropagation() throws JMSException { .with("channel-name", "jms") .with("ttl", 10000L); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java index ef5021320..e711e1765 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java @@ -27,15 +27,18 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.support.JmsTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; @SuppressWarnings("ReactiveStreamsSubscriberImplementation") public class JmsSourceTest extends JmsTestBase { private JMSContext jms; private ActiveMQJMSConnectionFactory factory; + private Vertx vertx; @BeforeEach public void init() { + vertx = Vertx.vertx(); factory = new ActiveMQJMSConnectionFactory( "tcp://localhost:61616", null, null); @@ -185,9 +188,9 @@ public void testReceptionOfMultipleMessages() { @Test public void testMultipleRequests() { - JmsSource source = new JmsSource(getResourceHolder("queue"), + JmsSource source = new JmsSource(vertx, getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig().put("channel-name", "queue")), - null, null); + UnsatisfiedInstance.instance(), null, null); Publisher> publisher = source.getSource(); new Thread(() -> { @@ -238,10 +241,10 @@ public void onComplete() { @Test public void testBroadcast() { - JmsSource source = new JmsSource(getResourceHolder("queue"), + JmsSource source = new JmsSource(vertx, getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig() .with("channel-name", "queue").with("broadcast", true)), - null, null); + UnsatisfiedInstance.instance(), null, null); Flow.Publisher> publisher = source.getSource(); List> list1 = new ArrayList<>(); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java new file mode 100644 index 000000000..71d8c481d --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java @@ -0,0 +1,652 @@ +package io.smallrye.reactive.messaging.jms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSProducer; +import jakarta.jms.ObjectMessage; +import jakarta.jms.Queue; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.common.vertx.ContextLocals; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.annotations.Broadcast; +import io.smallrye.reactive.messaging.annotations.Merge; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.smallrye.reactive.messaging.support.JmsTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.core.impl.ConcurrentHashSet; +import io.vertx.mutiny.core.Vertx; + +public class LocalPropagationTest extends JmsTestBase { + + private JMSContext jms; + private ActiveMQJMSConnectionFactory factory; + private ExecutorService executor; + private Queue queue; + + @BeforeEach + public void init() { + factory = new ActiveMQJMSConnectionFactory( + "tcp://localhost:61616", + null, null); + jms = factory.createContext(); + executor = Executors.newFixedThreadPool(3); + queue = jms.createQueue("queue-one"); + } + + @AfterEach + public void close() { + jms.close(); + factory.close(); + executor.shutdown(); + } + + private MapBasedConfig dataconfig() { + return new MapBasedConfig() + .with("mp.messaging.incoming.data.connector", JmsConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data.destination", "queue-one") + .with("mp.messaging.incoming.data.tracing.enabled", false); + } + + private void produceIntegers() { + JMSProducer producer = jms.createProducer(); + for (int i = 1; i < 6; i++) { + ObjectMessage message = jms.createObjectMessage(i); + producer.send(queue, message); + } + } + + private T runApplication(MapBasedConfig config, Class beanClass) { + addConfig(config); + WeldContainer container = deploy(beanClass); + return container.getBeanManager().createInstance().select(beanClass).get(); + } + + @Test + public void testLinearPipeline() { + LinearPipeline bean = runApplication(dataconfig(), LinearPipeline.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + } + + @Test + public void testPipelineWithABlockingStage() { + PipelineWithABlockingStage bean = runApplication(dataconfig(), PipelineWithABlockingStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithAnUnorderedBlockingStage() { + PipelineWithAnUnorderedBlockingStage bean = runApplication(dataconfig(), PipelineWithAnUnorderedBlockingStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithMultipleBlockingStages() { + PipelineWithMultipleBlockingStages bean = runApplication(dataconfig(), PipelineWithMultipleBlockingStages.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(2, 3, 4, 5, 6); + } + + @Test + public void testPipelineWithBroadcastAndMerge() { + PipelineWithBroadcastAndMerge bean = runApplication(dataconfig(), PipelineWithBroadcastAndMerge.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 10; + }); + assertThat(bean.getResults()).hasSize(10).contains(2, 3, 4, 5, 6); + } + + @Test + public void testLinearPipelineWithAckOnCustomThread() { + LinearPipelineWithAckOnCustomThread bean = runApplication(dataconfig(), LinearPipelineWithAckOnCustomThread.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithAnAsyncStage() { + PipelineWithAnAsyncStage bean = runApplication(dataconfig(), PipelineWithAnAsyncStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @ApplicationScoped + public static class LinearPipeline { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + public Integer handle(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class LinearPipelineWithAckOnCustomThread { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + private final Executor executor = Executors.newFixedThreadPool(4); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + return input.withPayload(input.getPayload() + 1) + .withAck(() -> { + CompletableFuture cf = new CompletableFuture<>(); + executor.execute(() -> { + cf.complete(null); + }); + return cf; + }); + } + + @Incoming("process") + @Outgoing("after-process") + public Integer handle(int payload) { + try { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + } catch (Exception e) { + e.printStackTrace(); + } + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + try { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + } catch (Exception e) { + e.printStackTrace(); + } + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithABlockingStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + @Blocking + public Integer handle(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithAnUnorderedBlockingStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("after-process") + @Blocking(ordered = false) + public Integer handle(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithMultipleBlockingStages { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + System.out.println("Processing " + input.getPayload()); + String value = UUID.randomUUID().toString(); + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("second-blocking") + @Blocking(ordered = false) + public Integer handle(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("second-blocking") + @Outgoing("after-process") + @Blocking + public Integer handle2(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithBroadcastAndMerge { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set branch1 = new ConcurrentHashSet<>(); + private final Set branch2 = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + @Broadcast(2) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("after-process") + public Integer branch1(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(branch1.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("process") + @Outgoing("after-process") + public Integer branch2(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(branch2.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + @Merge + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithAnAsyncStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + assertThat((String) ContextLocals.get("uuid", null)).isNull(); + ContextLocals.put("uuid", value); + ContextLocals.put("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + public Uni handle(int payload) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(payload); + return Uni.createFrom().item(() -> payload) + .runSubscriptionOn(Infrastructure.getDefaultExecutor()); + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java index fbca51a1d..6d7881bf0 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java @@ -18,7 +18,7 @@ public void consume(int v) { list.add(v); } - List list() { + public List list() { return new ArrayList<>(list); } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/UnsatisfiedInstance.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/UnsatisfiedInstance.java new file mode 100644 index 000000000..6f907c5d9 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/UnsatisfiedInstance.java @@ -0,0 +1,73 @@ +package io.smallrye.reactive.messaging.jms; + +import java.lang.annotation.Annotation; +import java.util.Collections; +import java.util.Iterator; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.util.TypeLiteral; + +public class UnsatisfiedInstance implements Instance { + + private static final UnsatisfiedInstance INSTANCE = new UnsatisfiedInstance<>(); + + @SuppressWarnings("unchecked") + public static Instance instance() { + return (Instance) INSTANCE; + } + + private UnsatisfiedInstance() { + // avoid direct instantiation + } + + @Override + public Instance select(Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(Class subtype, Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(TypeLiteral subtype, + Annotation... qualifiers) { + return instance(); + } + + @Override + public boolean isUnsatisfied() { + return true; + } + + @Override + public boolean isAmbiguous() { + return false; + } + + @Override + public void destroy(T instance) { + throw new UnsupportedOperationException(); + } + + @Override + public Handle getHandle() { + return null; + } + + @Override + public Iterable> handles() { + return null; + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override + public T get() { + throw new UnsupportedOperationException(); + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java new file mode 100644 index 000000000..ecba271e5 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java @@ -0,0 +1,37 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.Map; + +import jakarta.jms.JMSException; +import jakarta.jms.Message; + +import org.junit.jupiter.api.Test; + +class MessagePropertiesExtractAdapterTest { + @Test + public void verifyNullHeaderHandled() throws JMSException { + Map messageProperties = new HashMap<>(); + messageProperties.put("test_null_header", null); + + // Create a mock JMS message + Message msg = mock(Message.class); + doAnswer(i -> messageProperties.get(i.getArgument(0, String.class))) + .when(msg).getStringProperty(anyString()); + doAnswer(i -> messageProperties.put(i.getArgument(0, String.class), i.getArgument(1, String.class))) + .when(msg).setStringProperty(anyString(), anyString()); + + JmsTrace jmsTrace = new JmsTrace.Builder().withMessage(msg).build(); + + String headerValue = JmsTraceTextMapGetter.INSTANCE.get(jmsTrace, "test_null_header"); + JmsTraceTextMapSetter.INSTANCE.set(jmsTrace, "test_other_header", "value"); + + assertThat(headerValue).isNull(); + assertThat(messageProperties.get("test_other_header")).isEqualTo("value"); + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java new file mode 100644 index 000000000..89628c8f6 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java @@ -0,0 +1,336 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_SYSTEM; +import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSProducer; +import jakarta.jms.ObjectMessage; +import jakarta.jms.Queue; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.smallrye.reactive.messaging.jms.IncomingJmsMessageMetadata; +import io.smallrye.reactive.messaging.jms.JmsConnector; +import io.smallrye.reactive.messaging.jms.PayloadConsumerBean; +import io.smallrye.reactive.messaging.jms.ProducerBean; +import io.smallrye.reactive.messaging.support.JmsTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class TracingPropagationTest extends JmsTestBase { + private SdkTracerProvider tracerProvider; + private InMemorySpanExporter spanExporter; + + private JMSContext jms; + private ActiveMQJMSConnectionFactory factory; + + @BeforeEach + public void setup() { + factory = new ActiveMQJMSConnectionFactory( + "tcp://localhost:61616", + null, null); + jms = factory.createContext(); + + GlobalOpenTelemetry.resetForTest(); + + spanExporter = InMemorySpanExporter.create(); + SpanProcessor spanProcessor = SimpleSpanProcessor.create(spanExporter); + + tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(spanProcessor) + .setSampler(Sampler.alwaysOn()) + .build(); + + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + } + + @AfterEach + public void close() { + jms.close(); + factory.close(); + } + + @AfterAll + static void shutdown() { + GlobalOpenTelemetry.resetForTest(); + } + + @SuppressWarnings("ConstantConditions") + @Test + public void testFromAppToJms() { + String queue = "queue-one"; + Map map = new HashMap<>(); + map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.stuff.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.stuff.destination", "queue-one"); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(MyAppReceivingData.class, ProducerBean.class); + + MyAppReceivingData bean = container.select(MyAppReceivingData.class).get(); + await().until(() -> bean.results().size() >= 10); + assertThat(bean.results()) + .extracting(m -> m.getMetadata(IncomingJmsMessageMetadata.class).get()) + .allSatisfy(m -> assertThat(m.getStringProperty("traceparent")).isNotNull()) + .extracting(m -> Integer.parseInt(m.getBody(String.class))) + .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.join(10, TimeUnit.SECONDS); + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + + assertEquals(10, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + + SpanData span = spans.get(0); + assertEquals(SpanKind.PRODUCER, span.getKind()); + assertEquals(3, span.getAttributes().size()); + assertEquals("jms", span.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("ActiveMQQueue[" + queue + "]", span.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals("ActiveMQQueue[" + queue + "] publish", span.getName()); + } + + @Test + public void testFromJmsToAppToJms() { + String queue = "queue-one"; + String parentQueue = queue + "-parent"; + String resultQueue = queue + "-result"; + Map map = new HashMap<>(); + map.put("mp.messaging.incoming.source.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.source.destination", parentQueue); + map.put("mp.messaging.outgoing.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.outgoing.jms.destination", resultQueue); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(MyAppProcessingData.class); + + List results = new CopyOnWriteArrayList<>(); + Queue result = jms.createQueue(resultQueue); + JMSConsumer sink = jms.createConsumer(result); + sink.setMessageListener(results::add); + + MyAppProcessingData bean = container.select(MyAppProcessingData.class).get(); + + JMSProducer producer = jms.createProducer(); + Queue parent = jms.createQueue(parentQueue); + for (int i = 0; i < 10; i++) { + producer.send(parent, i); + } + + await().until(() -> results.size() >= 10); + assertThat(results).extracting(m -> Integer.valueOf(m.getBody(String.class))).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, + 10); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.join(10, TimeUnit.SECONDS); + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + + List parentSpans = spans.stream() + .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) + .collect(toList()); + assertEquals(10, parentSpans.size()); + + // for (SpanData parentSpan : parentSpans) { + // assertEquals(1, + // spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); + // } + + SpanData consumer = parentSpans.get(0); + assertEquals(SpanKind.CONSUMER, consumer.getKind()); + assertEquals(3, consumer.getAttributes().size()); + assertEquals("jms", consumer.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals(parentQueue, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals(parentQueue + " receive", consumer.getName()); + + for (SpanData span : spans) { + System.out.println(span.getKind() + " " + span.getSpanId() + " -> " + span.getParentSpanId()); + } + SpanData producerSpan = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId())) + .findFirst().get(); + assertEquals(SpanKind.PRODUCER, producerSpan.getKind()); + assertEquals(3, producerSpan.getAttributes().size()); + assertEquals("jms", producerSpan.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("publish", producerSpan.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("ActiveMQQueue[" + resultQueue + "]", producerSpan.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals("publish", producerSpan.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("ActiveMQQueue[" + resultQueue + "] publish", producerSpan.getName()); + } + + @Test + public void testFromJmsToAppWithParentSpan() { + String parentTopic = "queue-one-parent"; + Map map = new HashMap<>(); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", parentTopic); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + + Map properties = new HashMap<>(); + try (Scope ignored = Context.current().makeCurrent()) { + Tracer tracer = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging"); + Span span = tracer.spanBuilder("producer").setSpanKind(SpanKind.PRODUCER).startSpan(); + Context current = Context.current().with(span); + GlobalOpenTelemetry.getPropagators() + .getTextMapPropagator() + .inject(current, properties, (carrier, key, value) -> carrier.put(key, value)); + span.end(); + } + + JMSProducer producer = jms.createProducer(); + Queue queue = jms.createQueue(parentTopic); + for (int i = 0; i < 10; i++) { + ObjectMessage msg = jms.createObjectMessage(i); + properties.forEach((k, v) -> { + try { + msg.setObjectProperty(k, v); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + producer.send(queue, msg); + } + + await().until(() -> bean.list().size() >= 10); + + assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.join(10, TimeUnit.SECONDS); + // 1 Parent, 10 Children + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(11, spans.size()); + + // All should use the same Trace + assertEquals(1, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + + List parentSpans = spans.stream() + .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())).collect(toList()); + assertEquals(1, parentSpans.size()); + + for (SpanData parentSpan : parentSpans) { + assertEquals(10, + spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); + } + + SpanData producerSpan = parentSpans.get(0); + assertEquals(SpanKind.PRODUCER, producerSpan.getKind()); + + SpanData consumer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(producerSpan.getSpanId())) + .findFirst().get(); + assertEquals(3, consumer.getAttributes().size()); + assertEquals("jms", consumer.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals(parentTopic + " receive", consumer.getName()); + + } + + @Test + public void testFromJmsToAppWithNoParent() { + Map map = new HashMap<>(); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", "queue-one"); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + + JMSProducer producer = jms.createProducer(); + for (int i = 0; i < 10; i++) { + producer.send(jms.createQueue("queue-one"), i); + } + + await().until(() -> bean.list().size() >= 10); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.join(10, TimeUnit.SECONDS); + List spans = spanExporter.getFinishedSpanItems(); + assertThat(spans).hasSize(10).allSatisfy(span -> { + assertThat(span.getKind()).isEqualTo(SpanKind.CONSUMER); + assertThat(span.getSpanId()).isNotEqualTo(SpanId.getInvalid()); + assertThat(span.getParentSpanId()).isEqualTo(SpanId.getInvalid()); + }); + } + + @ApplicationScoped + public static class MyAppProcessingData { + + private final List list = new CopyOnWriteArrayList<>(); + + @Incoming("source") + @Outgoing("jms") + public Message processMessage(Message input) { + list.add(input.getPayload()); + return input.withPayload(input.getPayload() + 1); + } + + public List list() { + return list; + } + } + + @ApplicationScoped + public static class MyAppReceivingData { + private final List> results = new CopyOnWriteArrayList<>(); + + @Incoming("stuff") + public CompletionStage consume(Message input) { + results.add(input); + return input.ack(); + } + + public List> results() { + return results; + } + } +}