From 6d765c879ed0e6a9210ef6ef4a34152b5c9043cf Mon Sep 17 00:00:00 2001 From: Dan Kristensen <1816596+dankristensen@users.noreply.github.com> Date: Fri, 16 Aug 2024 12:36:03 +0200 Subject: [PATCH 1/8] Added tracing using opentelemetry for jms like kafka does. --- smallrye-reactive-messaging-jms/pom.xml | 24 ++ .../reactive/messaging/jms/JmsConnector.java | 9 +- .../reactive/messaging/jms/JmsSink.java | 45 ++- .../reactive/messaging/jms/JmsSource.java | 59 +++- .../jms/tracing/JmsAttributesExtractor.java | 114 +++++++ .../tracing/JmsOpenTelemetryInstrumenter.java | 69 ++++ .../messaging/jms/tracing/JmsTrace.java | 65 ++++ .../jms/tracing/JmsTraceTextMapGetter.java | 32 ++ .../jms/tracing/JmsTraceTextMapSetter.java | 17 + .../reactive/messaging/jms/JmsSinkTest.java | 9 + .../reactive/messaging/jms/JmsSourceTest.java | 4 +- .../messaging/jms/PayloadConsumerBean.java | 2 +- .../messaging/jms/UnsatisfiedInstance.java | 73 +++++ .../tracing/BatchTracingPropagationTest.java | 172 ++++++++++ .../MessagePropertiesExtractAdapterTest.java | 22 ++ .../jms/tracing/TracingPropagationTest.java | 299 ++++++++++++++++++ 16 files changed, 1008 insertions(+), 7 deletions(-) create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java create mode 100644 smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/UnsatisfiedInstance.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java diff --git a/smallrye-reactive-messaging-jms/pom.xml b/smallrye-reactive-messaging-jms/pom.xml index 9093296dbd..332cb46d3a 100644 --- a/smallrye-reactive-messaging-jms/pom.xml +++ b/smallrye-reactive-messaging-jms/pom.xml @@ -106,6 +106,16 @@ ${project.version} test + + io.opentelemetry + opentelemetry-sdk-trace + test + + + io.opentelemetry + opentelemetry-sdk-testing + test + io.smallrye.reactive smallrye-reactive-messaging-provider @@ -117,6 +127,20 @@ ${project.version} provided + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api + + + io.opentelemetry.instrumentation + opentelemetry-instrumentation-api-incubator + + + io.smallrye.reactive + smallrye-reactive-messaging-otel + 999-SNAPSHOT + compile + diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java index b54e0a9b91..8d3a091cac 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java @@ -28,6 +28,7 @@ import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction; @@ -50,6 +51,7 @@ @ConnectorAttribute(name = "broadcast", description = "Whether or not the JMS message should be dispatched to multiple consumers", direction = Direction.INCOMING, type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "durable", description = "Set to `true` to use a durable subscription", direction = Direction.INCOMING, type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "destination-type", description = "The type of destination. It can be either `queue` or `topic`", direction = Direction.INCOMING_AND_OUTGOING, type = "string", defaultValue = "queue") +@ConnectorAttribute(name = "tracing-enabled", type = "boolean", direction = Direction.INCOMING_AND_OUTGOING, description = "Whether tracing is enabled (default) or disabled", defaultValue = "true") @ConnectorAttribute(name = "disable-message-id", description = "Omit the message id in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean") @ConnectorAttribute(name = "disable-message-timestamp", description = "Omit the message timestamp in the outbound JMS message", direction = Direction.OUTGOING, type = "boolean") @@ -100,6 +102,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector { @ConfigProperty(name = "smallrye.jms.threads.ttl", defaultValue = DEFAULT_THREAD_TTL) int ttl; + @Inject + Instance openTelemetryInstance; + private ExecutorService executor; private JsonMapping jsonMapping; private final List sources = new CopyOnWriteArrayList<>(); @@ -134,7 +139,7 @@ public Flow.Publisher> getPublisher(Config config) { JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic)); contexts.add(holder); - JmsSource source = new JmsSource(holder, ic, jsonMapping, executor); + JmsSource source = new JmsSource(holder, ic, openTelemetryInstance, jsonMapping, executor); sources.add(source); return source.getSource(); } @@ -155,7 +160,7 @@ public Flow.Subscriber> getSubscriber(Config config) { JmsConnectorOutgoingConfiguration oc = new JmsConnectorOutgoingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(oc.getChannel(), () -> createJmsContext(oc)); contexts.add(holder); - return new JmsSink(holder, oc, jsonMapping, executor).getSink(); + return new JmsSink(holder, oc, openTelemetryInstance, jsonMapping, executor).getSink(); } private ConnectionFactory pickTheFactory(String factoryName) { diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java index 91674661c6..f0b31b793d 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java @@ -4,9 +4,13 @@ import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; import java.time.Duration; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Flow; +import jakarta.enterprise.inject.Instance; import jakarta.jms.BytesMessage; import jakarta.jms.DeliveryMode; import jakarta.jms.Destination; @@ -16,7 +20,10 @@ import org.eclipse.microprofile.reactive.messaging.Message; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter; +import io.smallrye.reactive.messaging.jms.tracing.JmsTrace; import io.smallrye.reactive.messaging.json.JsonMapping; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; @@ -25,9 +32,14 @@ class JmsSink { private final Flow.Subscriber> sink; private final JsonMapping jsonMapping; private final Executor executor; + private final JmsOpenTelemetryInstrumenter jmsInstrumenter; + private final boolean isTracingEnabled; - JmsSink(JmsResourceHolder resourceHolder, JmsConnectorOutgoingConfiguration config, JsonMapping jsonMapping, + JmsSink(JmsResourceHolder resourceHolder, JmsConnectorOutgoingConfiguration config, + Instance openTelemetryInstance, JsonMapping jsonMapping, Executor executor) { + this.isTracingEnabled = config.getTracingEnabled(); + String name = config.getDestination().orElseGet(config::getChannel); String type = config.getDestinationType(); boolean retry = config.getRetry(); @@ -73,6 +85,12 @@ class JmsSink { this.jsonMapping = jsonMapping; this.executor = executor; + if (isTracingEnabled) { + jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSink(openTelemetryInstance); + } else { + jmsInstrumenter = null; + } + sink = MultiUtils.via(m -> m.onItem().transformToUniAndConcatenate(message -> send(resourceHolder, message) .onFailure(t -> retry) .retry() @@ -90,6 +108,8 @@ private Uni> send(JmsResourceHolder resourceHo JMSContext context = resourceHolder.getContext(); // If the payload is a JMS Message, send it as it is, ignoring metadata. if (payload instanceof jakarta.jms.Message) { + outgoingTrace(resourceHolder, message, (jakarta.jms.Message) payload); + return dispatch(message, () -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload)); } @@ -150,12 +170,35 @@ private Uni> send(JmsResourceHolder resourceHo actualDestination = destination; } + outgoingTrace(resourceHolder, message, outgoing); return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing)); } catch (JMSException e) { return Uni.createFrom().failure(new IllegalStateException(e)); } } + private void outgoingTrace(JmsResourceHolder resourceHolder, Message message, jakarta.jms.Message payload) { + if (isTracingEnabled) { + jakarta.jms.Message jmsPayload = payload; + Map messageProperties = new HashMap<>(); + try { + Enumeration propertyNames = jmsPayload.getPropertyNames(); + + while (propertyNames.hasMoreElements()) { + String propertyName = (String) propertyNames.nextElement(); + messageProperties.put(propertyName, jmsPayload.getObjectProperty(propertyName)); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + JmsTrace kafkaTrace = new JmsTrace.Builder() + .withQueue(resourceHolder.getDestination().toString())//TODO Find the correct queue name + .withProperties(messageProperties) + .build(); + jmsInstrumenter.traceOutgoing(message, kafkaTrace); + } + } + private boolean isPrimitiveBoxed(Class c) { return c.equals(Boolean.class) || c.equals(Integer.class) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 4d7dbc7493..41b71742b3 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -4,6 +4,10 @@ import static io.smallrye.reactive.messaging.jms.i18n.JmsLogging.log; import java.time.Duration; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -11,10 +15,14 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import jakarta.enterprise.inject.Instance; import jakarta.jms.*; +import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter; +import io.smallrye.reactive.messaging.jms.tracing.JmsTrace; import io.smallrye.reactive.messaging.json.JsonMapping; class JmsSource { @@ -23,9 +31,13 @@ class JmsSource { private final JmsResourceHolder resourceHolder; private final JmsPublisher publisher; + private final boolean isTracingEnabled; + private final JmsOpenTelemetryInstrumenter jmsInstrumenter; - JmsSource(JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, JsonMapping jsonMapping, + JmsSource(JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, + Instance openTelemetryInstance, JsonMapping jsonMapping, Executor executor) { + this.isTracingEnabled = config.getTracingEnabled(); String channel = config.getChannel(); final String destinationName = config.getDestination().orElseGet(config::getChannel); String selector = config.getSelector().orElse(null); @@ -46,9 +58,16 @@ class JmsSource { } }); resourceHolder.getClient(); + if (isTracingEnabled) { + jmsInstrumenter = JmsOpenTelemetryInstrumenter.createForSource(openTelemetryInstance); + } else { + jmsInstrumenter = null; + } + this.publisher = new JmsPublisher(resourceHolder); source = Multi.createFrom().publisher(publisher) .> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping)) + .onItem().invoke(this::incomingTrace) .onFailure(t -> { log.terminalErrorOnChannel(channel); this.resourceHolder.close(); @@ -190,4 +209,42 @@ long add(long req) { } } } + + public void incomingTrace(IncomingJmsMessage jmsMessage) { + if (isTracingEnabled) { + Optional metadata = jmsMessage.getMetadata(IncomingJmsMessageMetadata.class); + Optional queueName = metadata.map(a -> { + Destination destination = a.getDestination(); + if (destination instanceof Queue) { + Queue queue = (Queue) destination; + try { + return queue.getQueueName(); + } catch (JMSException e) { + return null; + } + } + return null; + }); + Message unwrapped = jmsMessage.unwrap(Message.class); + + Map properties = new HashMap<>(); + try { + Enumeration propertyNames = unwrapped.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + String name = (String) propertyNames.nextElement(); + Object value = unwrapped.getObjectProperty(name); + properties.put(name, value); + } + } catch (JMSException e) { + throw new RuntimeException(e); + } + + JmsTrace jmsTrace = new JmsTrace.Builder() + .withQueue(queueName.orElse(null)) + .withProperties(properties) + .build(); + + jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace); + } + } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java new file mode 100644 index 0000000000..667fb5ab5b --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -0,0 +1,114 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import static io.opentelemetry.semconv.SemanticAttributes.*; + +import java.util.Collections; +import java.util.List; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; + +public class JmsAttributesExtractor implements AttributesExtractor { + private final MessagingAttributesGetter messagingAttributesGetter; + + public JmsAttributesExtractor() { + this.messagingAttributesGetter = new JmsMessagingAttributesGetter(); + } + + @Override + public void onStart(final AttributesBuilder attributes, final Context parentContext, final JmsTrace jmsTrace) { + String groupId = jmsTrace.getGroupId(); + String clientId = jmsTrace.getClientId(); + if (groupId != null && clientId != null) { + String consumerId = groupId; + if (!clientId.isEmpty()) { + consumerId += " - " + clientId; + } + attributes.put(MESSAGING_CONSUMER_ID, consumerId); + } + if (groupId != null) { + attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, groupId); + } + } + + @Override + public void onEnd( + final AttributesBuilder attributes, + final Context context, + final JmsTrace kafkaTrace, + final Void unused, + final Throwable error) { + + } + + public MessagingAttributesGetter getMessagingAttributesGetter() { + return messagingAttributesGetter; + } + + private static final class JmsMessagingAttributesGetter implements MessagingAttributesGetter { + @Override + public String getSystem(final JmsTrace jmsTrace) { + return "jms"; + } + + @Override + public String getDestination(final JmsTrace jmsTrace) { + return jmsTrace.getQueue(); + } + + @Override + public boolean isTemporaryDestination(final JmsTrace jmsTrace) { + return false; + } + + @Override + public String getConversationId(final JmsTrace jmsTrace) { + return null; + } + + @Override + public String getMessageId(final JmsTrace jmsTrace, final Void unused) { + return null; + } + + @Override + public List getMessageHeader(JmsTrace jmsTrace, String name) { + return Collections.emptyList(); + } + + @Override + public String getDestinationTemplate(JmsTrace jmsTrace) { + return null; + } + + @Override + public boolean isAnonymousDestination(JmsTrace jmsTrace) { + return false; + } + + @Override + public Long getMessageBodySize(JmsTrace jmsTrace) { + return null; + } + + @Override + public Long getMessageEnvelopeSize(JmsTrace jmsTrace) { + return null; + } + + @Override + public String getClientId(JmsTrace jmsTrace) { + if (jmsTrace.getClientId() == null) { + return null; + } + return jmsTrace.getClientId(); + } + + @Override + public Long getBatchMessageCount(JmsTrace jmsTrace, Void unused) { + return null; + } + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java new file mode 100644 index 0000000000..b40ec174d8 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java @@ -0,0 +1,69 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import jakarta.enterprise.inject.Instance; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.smallrye.reactive.messaging.tracing.TracingUtils; + +/** + * Encapsulates the OpenTelemetry instrumentation API so that those classes are only needed if + * users explicitly enable tracing. + */ +public class JmsOpenTelemetryInstrumenter { + + private final Instrumenter instrumenter; + + private JmsOpenTelemetryInstrumenter(Instrumenter instrumenter) { + this.instrumenter = instrumenter; + } + + public static JmsOpenTelemetryInstrumenter createForSource(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), true); + } + + public static JmsOpenTelemetryInstrumenter createForSink(Instance openTelemetryInstance) { + return create(TracingUtils.getOpenTelemetry(openTelemetryInstance), false); + } + + private static JmsOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, boolean source) { + + MessageOperation messageOperation = source ? MessageOperation.RECEIVE : MessageOperation.PUBLISH; + + JmsAttributesExtractor jmsAttributesExtractor = new JmsAttributesExtractor(); + MessagingAttributesGetter messagingAttributesGetter = jmsAttributesExtractor + .getMessagingAttributesGetter(); + InstrumenterBuilder builder = Instrumenter.builder(openTelemetry, + "io.smallrye.reactive.messaging", + MessagingSpanNameExtractor.create(messagingAttributesGetter, messageOperation)); + + builder + .addAttributesExtractor( + MessagingAttributesExtractor.create(messagingAttributesGetter, messageOperation)) + .addAttributesExtractor(jmsAttributesExtractor); + + Instrumenter instrumenter; + if (source) { + instrumenter = builder.buildConsumerInstrumenter(JmsTraceTextMapGetter.INSTANCE); + } else { + instrumenter = builder.buildProducerInstrumenter(JmsTraceTextMapSetter.INSTANCE); + } + + return new JmsOpenTelemetryInstrumenter(instrumenter); + } + + public Message traceIncoming(Message message, JmsTrace kafkaTrace) { + return TracingUtils.traceIncoming(instrumenter, message, kafkaTrace, true); + } + + public void traceOutgoing(Message message, JmsTrace kafkaTrace) { + TracingUtils.traceOutgoing(instrumenter, message, kafkaTrace); + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java new file mode 100644 index 0000000000..2428f3afe3 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java @@ -0,0 +1,65 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import java.util.HashMap; +import java.util.Map; + +public class JmsTrace { + private final String groupId; + private final String clientId; + private final String queue; + private final Map messageProperties; + + private JmsTrace(final String groupId, final String clientId, final String queue, Map messageProperties) { + this.groupId = groupId; + this.clientId = clientId; + this.queue = queue; + this.messageProperties = messageProperties; + } + + public String getGroupId() { + return groupId; + } + + public String getClientId() { + return clientId; + } + + public String getQueue() { + return queue; + } + + public Map getMessageProperties() { + return messageProperties; + } + + public static class Builder { + private String groupId; + private String clientId; + private String queue; + private Map properties; + + public Builder withGroupId(final String groupId) { + this.groupId = groupId; + return this; + } + + public Builder withClientId(final String clientId) { + this.clientId = clientId; + return this; + } + + public Builder withQueue(final String queue) { + this.queue = queue; + return this; + } + + public Builder withProperties(Map properties) { + this.properties = new HashMap<>(properties); + return this; + } + + public JmsTrace build() { + return new JmsTrace(groupId, clientId, queue, properties); + } + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java new file mode 100644 index 0000000000..b6978ef609 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java @@ -0,0 +1,32 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import io.opentelemetry.context.propagation.TextMapGetter; + +public enum JmsTraceTextMapGetter implements TextMapGetter { + INSTANCE; + + @Override + public Iterable keys(final JmsTrace carrier) { + List keys = new ArrayList<>(); + Map messageProperties = carrier.getMessageProperties(); + for (String key : messageProperties.keySet()) { + keys.add(key); + } + return keys; + } + + @Override + public String get(final JmsTrace carrier, final String key) { + if (carrier != null) { + Object value = carrier.getMessageProperties().get(key); + if (value != null) { + return value.toString(); + } + } + return null; + } +} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java new file mode 100644 index 0000000000..ffc7409b0c --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java @@ -0,0 +1,17 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import java.util.Map; + +import io.opentelemetry.context.propagation.TextMapSetter; + +public enum JmsTraceTextMapSetter implements TextMapSetter { + INSTANCE; + + @Override + public void set(final JmsTrace carrier, final String key, final String value) { + if (carrier != null) { + Map messageProperties = carrier.getMessageProperties(); + messageProperties.put(key, value); + } + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java index 542f0fc970..d831d8457c 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java @@ -65,6 +65,7 @@ public void testDefaultConfiguration() throws JMSException { .with("destination", "queue-one") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -86,6 +87,7 @@ public void testDefaultConfigurationAgainstTopic() throws JMSException { .with("destination-type", "topic") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client1 = new MyJmsClient(jms.createTopic("my-topic")); @@ -111,6 +113,7 @@ public void testWithDeliveryDelayAndMode() throws JMSException { .with("delivery-mode", "non_persistent") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -137,6 +140,7 @@ public void testDisablingMessageIdAndTimestamp() throws JMSException { .with("disable-message-timestamp", true) .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -163,6 +167,7 @@ public void testWithCorrelationIdAndPriorityAndTTL() throws JMSException { .with("ttl", 1000L) .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -187,6 +192,7 @@ public void testWithReplyTo() throws JMSException { .with("reply-to", "my-response") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -213,6 +219,7 @@ public void testWithReplyToTopic() throws JMSException { .with("reply-to-destination-type", "topic") .with("channel-name", "jms"); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); @@ -238,6 +245,7 @@ public void testWithReplyToWithInvalidDestinationType() { .with("reply-to-destination-type", "invalid") .with("channel-name", "jms"); assertThatThrownBy(() -> new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor)) .isInstanceOf(IllegalArgumentException.class); } @@ -308,6 +316,7 @@ public void testPropagation() throws JMSException { .with("channel-name", "jms") .with("ttl", 10000L); JmsSink sink = new JmsSink(getResourceHolder(), new JmsConnectorOutgoingConfiguration(config), + UnsatisfiedInstance.instance(), jsonMapping, executor); MyJmsClient client = new MyJmsClient(jms.createQueue("queue-one")); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java index ef5021320d..174a1de083 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java @@ -187,7 +187,7 @@ public void testReceptionOfMultipleMessages() { public void testMultipleRequests() { JmsSource source = new JmsSource(getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig().put("channel-name", "queue")), - null, null); + UnsatisfiedInstance.instance(), null, null); Publisher> publisher = source.getSource(); new Thread(() -> { @@ -241,7 +241,7 @@ public void testBroadcast() { JmsSource source = new JmsSource(getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig() .with("channel-name", "queue").with("broadcast", true)), - null, null); + UnsatisfiedInstance.instance(), null, null); Flow.Publisher> publisher = source.getSource(); List> list1 = new ArrayList<>(); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java index fbca51a1d7..6d7881bf07 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/PayloadConsumerBean.java @@ -18,7 +18,7 @@ public void consume(int v) { list.add(v); } - List list() { + public List list() { return new ArrayList<>(list); } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/UnsatisfiedInstance.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/UnsatisfiedInstance.java new file mode 100644 index 0000000000..6f907c5d97 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/UnsatisfiedInstance.java @@ -0,0 +1,73 @@ +package io.smallrye.reactive.messaging.jms; + +import java.lang.annotation.Annotation; +import java.util.Collections; +import java.util.Iterator; + +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.util.TypeLiteral; + +public class UnsatisfiedInstance implements Instance { + + private static final UnsatisfiedInstance INSTANCE = new UnsatisfiedInstance<>(); + + @SuppressWarnings("unchecked") + public static Instance instance() { + return (Instance) INSTANCE; + } + + private UnsatisfiedInstance() { + // avoid direct instantiation + } + + @Override + public Instance select(Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(Class subtype, Annotation... qualifiers) { + return instance(); + } + + @Override + public Instance select(TypeLiteral subtype, + Annotation... qualifiers) { + return instance(); + } + + @Override + public boolean isUnsatisfied() { + return true; + } + + @Override + public boolean isAmbiguous() { + return false; + } + + @Override + public void destroy(T instance) { + throw new UnsupportedOperationException(); + } + + @Override + public Handle getHandle() { + return null; + } + + @Override + public Iterable> handles() { + return null; + } + + @Override + public Iterator iterator() { + return Collections.emptyIterator(); + } + + @Override + public T get() { + throw new UnsupportedOperationException(); + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java new file mode 100644 index 0000000000..708ce1d1fb --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java @@ -0,0 +1,172 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Flow; + +import jakarta.jms.JMSContext; +import jakarta.jms.JMSProducer; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.smallrye.reactive.messaging.jms.*; +import io.smallrye.reactive.messaging.jms.PayloadConsumerBean; +import io.smallrye.reactive.messaging.support.JmsTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class BatchTracingPropagationTest extends JmsTestBase { + private SdkTracerProvider tracerProvider; + private InMemorySpanExporter spanExporter; + + private JMSContext jms; + private ActiveMQJMSConnectionFactory factory; + private Flow.Subscriber> subscriber; + + @BeforeEach + public void init() { + factory = new ActiveMQJMSConnectionFactory( + "tcp://localhost:61616", + null, null); + jms = factory.createContext(); + } + + @AfterEach + public void close() { + jms.close(); + factory.close(); + } + + private JmsResourceHolder getResourceHolder() { + return new JmsResourceHolder<>("jms", () -> jms); + } + + @BeforeEach + public void setup() { + GlobalOpenTelemetry.resetForTest(); + + spanExporter = InMemorySpanExporter.create(); + SpanProcessor spanProcessor = SimpleSpanProcessor.create(spanExporter); + + tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(spanProcessor) + .setSampler(Sampler.alwaysOn()) + .build(); + + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + } + + @AfterAll + static void shutdown() { + GlobalOpenTelemetry.resetForTest(); + } + + @SuppressWarnings("ConstantConditions") + @Test + public void testFromAppToJms() { + Map map = new HashMap<>(); + map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", "queue-one"); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + await().until(() -> bean.list().size() >= 10); + + assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.whenComplete(() -> { + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + SpanData spanData = spans.get(19); + assertEquals(SpanKind.CONSUMER, spanData.getKind()); + }); + } + + @Test + public void testFromKafkaToAppWithParentSpan() { + String parentTopic = "queue-one-parent"; + Map map = new HashMap<>(); + map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", "queue-one"); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + + Map headers = new HashMap<>(); + try (Scope ignored = Context.current().makeCurrent()) { + Tracer tracer = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging"); + Span span = tracer.spanBuilder("producer").setSpanKind(SpanKind.PRODUCER).startSpan(); + Context current = Context.current().with(span); + GlobalOpenTelemetry.getPropagators() + .getTextMapPropagator() + .inject(current, headers, (carrier, key, value) -> carrier.put(key, value.getBytes())); + span.end(); + } + + await().until(() -> bean.list().size() >= 10); + + assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.whenComplete(() -> { + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(21, spans.size()); + }); + } + + @Test + public void testFromKafkaToAppWithNoParent() { + Map map = new HashMap<>(); + map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", "queue-one"); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + + await().until(() -> bean.list().size() >= 10); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.whenComplete(() -> { + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + }); + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java new file mode 100644 index 0000000000..22f77681d0 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java @@ -0,0 +1,22 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +class MessagePropertiesExtractAdapterTest { + @Test + public void verifyNullHeaderHandled() { + Map messageProperties = new HashMap<>(); + messageProperties.put("test_null_header", null); + + JmsTrace jmsTrace = new JmsTrace.Builder().withProperties(messageProperties).build(); + + String headerValue = JmsTraceTextMapGetter.INSTANCE.get(jmsTrace, "test_null_header"); + + assertThat(headerValue).isNull(); + } +} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java new file mode 100644 index 0000000000..40d3f2bba6 --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java @@ -0,0 +1,299 @@ +package io.smallrye.reactive.messaging.jms.tracing; + +import static io.opentelemetry.semconv.SemanticAttributes.*; +import static java.util.stream.Collectors.toList; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.assertj.core.api.Assertions; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.SpanProcessor; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.smallrye.reactive.messaging.jms.JmsConnector; +import io.smallrye.reactive.messaging.jms.PayloadConsumerBean; +import io.smallrye.reactive.messaging.jms.ProducerBean; +import io.smallrye.reactive.messaging.support.JmsTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class TracingPropagationTest extends JmsTestBase { + private SdkTracerProvider tracerProvider; + private InMemorySpanExporter spanExporter; + + @BeforeEach + public void setup() { + GlobalOpenTelemetry.resetForTest(); + + spanExporter = InMemorySpanExporter.create(); + SpanProcessor spanProcessor = SimpleSpanProcessor.create(spanExporter); + + tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(spanProcessor) + .setSampler(Sampler.alwaysOn()) + .build(); + + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + } + + @AfterAll + static void shutdown() { + GlobalOpenTelemetry.resetForTest(); + } + + @SuppressWarnings("ConstantConditions") + @Test + public void testFromAppToJms() { + String queue = "queue-one"; + Map map = new HashMap<>(); + map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", "queue-one"); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + await().until(() -> bean.list().size() >= 10); + Assertions.assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.whenComplete(() -> { + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + + assertEquals(20, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + + SpanData span = spans.get(0); + assertEquals(SpanKind.PRODUCER, span.getKind()); + assertEquals(3, span.getAttributes().size()); + assertEquals("jms", span.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("ActiveMQQueue[" + queue + "]", span.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals("ActiveMQQueue[" + queue + "] publish", span.getName()); + }); + } + + @Test + @Disabled("Not working yet. ") + public void testFromKafkaToAppToKafka() { + String queue = "queue-one"; + String resultTopic = queue + "-result"; + String parentTopic = queue + "-parent"; + Map map = new HashMap<>(); + map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", "queue-one"); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + await().until(() -> bean.list().size() >= 10); + Assertions.assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.whenComplete(() -> { + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + + List parentSpans = spans.stream() + .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) + .collect(toList()); + assertEquals(20, parentSpans.size()); + + for (SpanData parentSpan : parentSpans) { + assertEquals(1, + spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); + } + + SpanData consumer = parentSpans.get(0); + assertEquals(SpanKind.CONSUMER, consumer.getKind()); + assertEquals(8, consumer.getAttributes().size()); + assertEquals("jms", consumer.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals("jms-consumer-source", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); + assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); + assertEquals(parentTopic + " receive", consumer.getName()); + + SpanData producer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId())) + .findFirst().get(); + assertEquals(SpanKind.PRODUCER, producer.getKind()); + assertEquals(5, producer.getAttributes().size()); + assertEquals("jms", producer.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals(resultTopic, producer.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("jms-producer-jms", producer.getAttributes().get(MESSAGING_CLIENT_ID)); + assertEquals(resultTopic + " publish", producer.getName()); + }); + } + + // @Test + // public void testFromKafkaToAppWithParentSpan() { + // String parentTopic = topic + "-parent"; + // MyAppReceivingData bean = runApplication(getKafkaSinkConfigForMyAppReceivingData(parentTopic), + // MyAppReceivingData.class); + // + // RecordHeaders headers = new RecordHeaders(); + // try (Scope ignored = Context.current().makeCurrent()) { + // Tracer tracer = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging"); + // Span span = tracer.spanBuilder("producer").setSpanKind(SpanKind.PRODUCER).startSpan(); + // Context current = Context.current().with(span); + // GlobalOpenTelemetry.getPropagators() + // .getTextMapPropagator() + // .inject(current, headers, (carrier, key, value) -> carrier.add(key, value.getBytes())); + // span.end(); + // } + // companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(parentTopic, null, null, "a-key", i, headers), 10); + // + // await().until(() -> bean.results().size() >= 10); + // assertThat(bean.results()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + // + // CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + // completableResultCode.whenComplete(() -> { + // // 1 Parent, 10 Children + // List spans = spanExporter.getFinishedSpanItems(); + // assertEquals(11, spans.size()); + // + // // All should use the same Trace + // assertEquals(1, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + // + // List parentSpans = spans.stream() + // .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())).collect(toList()); + // assertEquals(1, parentSpans.size()); + // + // for (SpanData parentSpan : parentSpans) { + // assertEquals(10, + // spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); + // } + // + // SpanData producer = parentSpans.get(0); + // assertEquals(SpanKind.PRODUCER, producer.getKind()); + // + // SpanData consumer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(producer.getSpanId())) + // .findFirst().get(); + // assertEquals(8, consumer.getAttributes().size()); + // assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM)); + // assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); + // assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); + // assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); + // assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); + // assertEquals(parentTopic + " receive", consumer.getName()); + // }); + // } + // + // @Test + // public void testFromKafkaToAppWithNoParent() { + // MyAppReceivingData bean = runApplication( + // getKafkaSinkConfigForMyAppReceivingData(topic), MyAppReceivingData.class); + // + // companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, null, null, "a-key", i), 10); + // + // await().until(() -> bean.results().size() >= 10); + // assertThat(bean.results()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + // + // CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + // completableResultCode.whenComplete(() -> { + // List spans = spanExporter.getFinishedSpanItems(); + // assertEquals(10, spans.size()); + // + // for (SpanData span : spans) { + // assertEquals(SpanKind.CONSUMER, span.getKind()); + // assertEquals(SpanId.getInvalid(), span.getParentSpanId()); + // } + // }); + // } + // + // private KafkaMapBasedConfig getKafkaSinkConfigForMyAppGeneratingData() { + // return kafkaConfig("mp.messaging.outgoing.kafka", true) + // .put("value.serializer", IntegerSerializer.class.getName()) + // .put("topic", topic); + // } + // + // private KafkaMapBasedConfig getKafkaSinkConfigForMyAppGeneratingDataWithStructuredCloudEvent(String mode) { + // return kafkaConfig("mp.messaging.outgoing.kafka", true) + // .put("value.serializer", StringSerializer.class.getName()) + // .put("topic", topic) + // .put("cloud-events-mode", mode); + // } + // + // private MapBasedConfig getKafkaSinkConfigForMyAppProcessingData(String resultTopic, String parentTopic) { + // return kafkaConfig("mp.messaging.outgoing.kafka", true) + // .put("value.serializer", IntegerSerializer.class.getName()) + // .put("topic", resultTopic) + // .withPrefix("mp.messaging.incoming.source") + // .put("value.deserializer", IntegerDeserializer.class.getName()) + // .put("key.deserializer", StringDeserializer.class.getName()) + // .put("topic", parentTopic) + // .put("auto.offset.reset", "earliest"); + // } + // + // private KafkaMapBasedConfig getKafkaSinkConfigForMyAppReceivingData(String topic) { + // return kafkaConfig("mp.messaging.incoming.stuff", true) + // .put("value.deserializer", IntegerDeserializer.class.getName()) + // .put("key.deserializer", StringDeserializer.class.getName()) + // .put("topic", topic) + // .put("auto.offset.reset", "earliest"); + // } + // + // @ApplicationScoped + // public static class MyAppGeneratingData { + // @Outgoing("kafka") + // public Flow.Publisher source() { + // return Multi.createFrom().range(0, 10); + // } + // } + // + @ApplicationScoped + public static class MyAppProcessingData { + @Incoming("source") + @Outgoing("jms") + public Message processMessage(Message input) { + return input.withPayload(input.getPayload() + 1); + } + } + // + // @ApplicationScoped + // public static class MyAppReceivingData { + // private final List results = new CopyOnWriteArrayList<>(); + // + // @Incoming("stuff") + // public CompletionStage consume(Message input) { + // results.add(input.getPayload()); + // return input.ack(); + // } + // + // public List results() { + // return results; + // } + // } +} From c8bbefa7c3da39e20c6dbc29a99f081e2511e9a6 Mon Sep 17 00:00:00 2001 From: Dan Kristensen <1816596+dankristensen@users.noreply.github.com> Date: Tue, 20 Aug 2024 15:11:27 +0200 Subject: [PATCH 2/8] Removed kafka naming --- .../java/io/smallrye/reactive/messaging/jms/JmsSink.java | 4 ++-- .../messaging/jms/tracing/JmsAttributesExtractor.java | 2 +- .../jms/tracing/JmsOpenTelemetryInstrumenter.java | 8 ++++---- .../jms/tracing/BatchTracingPropagationTest.java | 4 ++-- .../messaging/jms/tracing/TracingPropagationTest.java | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java index f0b31b793d..e9daf4056e 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java @@ -191,11 +191,11 @@ private void outgoingTrace(JmsResourceHolder resourceHolder, Messag } catch (JMSException e) { throw new RuntimeException(e); } - JmsTrace kafkaTrace = new JmsTrace.Builder() + JmsTrace jmsTrace = new JmsTrace.Builder() .withQueue(resourceHolder.getDestination().toString())//TODO Find the correct queue name .withProperties(messageProperties) .build(); - jmsInstrumenter.traceOutgoing(message, kafkaTrace); + jmsInstrumenter.traceOutgoing(message, jmsTrace); } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java index 667fb5ab5b..26659b2abd 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -37,7 +37,7 @@ public void onStart(final AttributesBuilder attributes, final Context parentCont public void onEnd( final AttributesBuilder attributes, final Context context, - final JmsTrace kafkaTrace, + final JmsTrace jmsTrace, final Void unused, final Throwable error) { diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java index b40ec174d8..f62a92564a 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java @@ -59,11 +59,11 @@ private static JmsOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, return new JmsOpenTelemetryInstrumenter(instrumenter); } - public Message traceIncoming(Message message, JmsTrace kafkaTrace) { - return TracingUtils.traceIncoming(instrumenter, message, kafkaTrace, true); + public Message traceIncoming(Message message, JmsTrace jmsTrace) { + return TracingUtils.traceIncoming(instrumenter, message, jmsTrace, true); } - public void traceOutgoing(Message message, JmsTrace kafkaTrace) { - TracingUtils.traceOutgoing(instrumenter, message, kafkaTrace); + public void traceOutgoing(Message message, JmsTrace jmsTrace) { + TracingUtils.traceOutgoing(instrumenter, message, jmsTrace); } } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java index 708ce1d1fb..dec38684e2 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java @@ -115,7 +115,7 @@ public void testFromAppToJms() { } @Test - public void testFromKafkaToAppWithParentSpan() { + public void testFromJmsToAppWithParentSpan() { String parentTopic = "queue-one-parent"; Map map = new HashMap<>(); map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); @@ -150,7 +150,7 @@ public void testFromKafkaToAppWithParentSpan() { } @Test - public void testFromKafkaToAppWithNoParent() { + public void testFromJmsToAppWithNoParent() { Map map = new HashMap<>(); map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java index 40d3f2bba6..e643623799 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java @@ -103,7 +103,7 @@ public void testFromAppToJms() { @Test @Disabled("Not working yet. ") - public void testFromKafkaToAppToKafka() { + public void testFromJmsToAppToKafka() { String queue = "queue-one"; String resultTopic = queue + "-result"; String parentTopic = queue + "-parent"; From 1a4e5c97987c314efb4e3f6fdb24e1743bf6447c Mon Sep 17 00:00:00 2001 From: Dan Kristensen <1816596+dankristensen@users.noreply.github.com> Date: Thu, 22 Aug 2024 15:57:14 +0200 Subject: [PATCH 3/8] Removed MESSAGING_KAFKA_CONSUMER_GROUP --- .../reactive/messaging/jms/tracing/JmsAttributesExtractor.java | 3 --- .../reactive/messaging/jms/tracing/TracingPropagationTest.java | 2 -- 2 files changed, 5 deletions(-) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java index 26659b2abd..2c752b4e2e 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -28,9 +28,6 @@ public void onStart(final AttributesBuilder attributes, final Context parentCont } attributes.put(MESSAGING_CONSUMER_ID, consumerId); } - if (groupId != null) { - attributes.put(MESSAGING_KAFKA_CONSUMER_GROUP, groupId); - } } @Override diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java index e643623799..82772586d3 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java @@ -141,7 +141,6 @@ public void testFromJmsToAppToKafka() { assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); assertEquals("jms-consumer-source", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); - assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); assertEquals(parentTopic + " receive", consumer.getName()); SpanData producer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId())) @@ -206,7 +205,6 @@ public void testFromJmsToAppToKafka() { // assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); // assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); // assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); - // assertEquals(0, consumer.getAttributes().get(MESSAGING_KAFKA_MESSAGE_OFFSET)); // assertEquals(parentTopic + " receive", consumer.getName()); // }); // } From 2560c11ac18563c25b3b039d43e5c4e8f353f645 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 26 Aug 2024 16:54:45 +0200 Subject: [PATCH 4/8] Jms tracing smallrye-reactive-messaging-otel dependency --- smallrye-reactive-messaging-jms/pom.xml | 11 +---------- .../io/smallrye/reactive/messaging/jms/JmsSink.java | 9 ++++----- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/smallrye-reactive-messaging-jms/pom.xml b/smallrye-reactive-messaging-jms/pom.xml index 332cb46d3a..bcbaec7988 100644 --- a/smallrye-reactive-messaging-jms/pom.xml +++ b/smallrye-reactive-messaging-jms/pom.xml @@ -127,19 +127,10 @@ ${project.version} provided - - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api - - - io.opentelemetry.instrumentation - opentelemetry-instrumentation-api-incubator - io.smallrye.reactive smallrye-reactive-messaging-otel - 999-SNAPSHOT - compile + ${project.version} diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java index e9daf4056e..8bd3cd7780 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java @@ -108,8 +108,7 @@ private Uni> send(JmsResourceHolder resourceHo JMSContext context = resourceHolder.getContext(); // If the payload is a JMS Message, send it as it is, ignoring metadata. if (payload instanceof jakarta.jms.Message) { - outgoingTrace(resourceHolder, message, (jakarta.jms.Message) payload); - + outgoingTrace(destination, message, (jakarta.jms.Message) payload); return dispatch(message, () -> resourceHolder.getClient().send(destination, (jakarta.jms.Message) payload)); } @@ -170,14 +169,14 @@ private Uni> send(JmsResourceHolder resourceHo actualDestination = destination; } - outgoingTrace(resourceHolder, message, outgoing); + outgoingTrace(actualDestination, message, outgoing); return dispatch(message, () -> resourceHolder.getClient().send(actualDestination, outgoing)); } catch (JMSException e) { return Uni.createFrom().failure(new IllegalStateException(e)); } } - private void outgoingTrace(JmsResourceHolder resourceHolder, Message message, jakarta.jms.Message payload) { + private void outgoingTrace(Destination actualDestination, Message message, jakarta.jms.Message payload) { if (isTracingEnabled) { jakarta.jms.Message jmsPayload = payload; Map messageProperties = new HashMap<>(); @@ -192,7 +191,7 @@ private void outgoingTrace(JmsResourceHolder resourceHolder, Messag throw new RuntimeException(e); } JmsTrace jmsTrace = new JmsTrace.Builder() - .withQueue(resourceHolder.getDestination().toString())//TODO Find the correct queue name + .withQueue(actualDestination.toString()) .withProperties(messageProperties) .build(); jmsInstrumenter.traceOutgoing(message, jmsTrace); From 8794852d1dba32e3204d1a890be847f9c078afc5 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 29 Aug 2024 12:26:11 +0200 Subject: [PATCH 5/8] Jms TracingPropagationTest --- .../messaging/jms/IncomingJmsMessage.java | 9 +- .../reactive/messaging/jms/JmsSource.java | 9 +- .../jms/tracing/JmsAttributesExtractor.java | 2 +- .../tracing/JmsOpenTelemetryInstrumenter.java | 2 +- .../tracing/BatchTracingPropagationTest.java | 172 -------- .../jms/tracing/TracingPropagationTest.java | 410 ++++++++++-------- 6 files changed, 239 insertions(+), 365 deletions(-) delete mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java index 69dc97c613..c6abaea919 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java @@ -13,14 +13,15 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; -public class IncomingJmsMessage implements org.eclipse.microprofile.reactive.messaging.Message { +public class IncomingJmsMessage implements MetadataInjectableMessage { private final Message delegate; private final Executor executor; private final Class clazz; private final JsonMapping jsonMapping; private final IncomingJmsMessageMetadata jmsMetadata; - private final Metadata metadata; + private Metadata metadata; IncomingJmsMessage(Message message, Executor executor, JsonMapping jsonMapping) { this.delegate = message; @@ -139,4 +140,8 @@ public C unwrap(Class unwrapType) { throw ex.illegalStateUnableToUnwrap(unwrapType); } + @Override + public void injectMetadata(Object metadataObject) { + metadata = this.metadata.with(metadataObject); + } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 41b71742b3..6a26e351ec 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -16,7 +16,14 @@ import java.util.concurrent.atomic.AtomicReference; import jakarta.enterprise.inject.Instance; -import jakarta.jms.*; +import jakarta.jms.Destination; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; +import jakarta.jms.JMSRuntimeException; +import jakarta.jms.Message; +import jakarta.jms.Queue; +import jakarta.jms.Topic; import io.opentelemetry.api.OpenTelemetry; import io.smallrye.mutiny.Multi; diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java index 2c752b4e2e..9512442398 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -26,7 +26,7 @@ public void onStart(final AttributesBuilder attributes, final Context parentCont if (!clientId.isEmpty()) { consumerId += " - " + clientId; } - attributes.put(MESSAGING_CONSUMER_ID, consumerId); + attributes.put(MESSAGING_CLIENT_ID, consumerId); } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java index f62a92564a..86aa11fbab 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsOpenTelemetryInstrumenter.java @@ -60,7 +60,7 @@ private static JmsOpenTelemetryInstrumenter create(OpenTelemetry openTelemetry, } public Message traceIncoming(Message message, JmsTrace jmsTrace) { - return TracingUtils.traceIncoming(instrumenter, message, jmsTrace, true); + return TracingUtils.traceIncoming(instrumenter, message, jmsTrace); } public void traceOutgoing(Message message, JmsTrace jmsTrace) { diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java deleted file mode 100644 index dec38684e2..0000000000 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/BatchTracingPropagationTest.java +++ /dev/null @@ -1,172 +0,0 @@ -package io.smallrye.reactive.messaging.jms.tracing; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Flow; - -import jakarta.jms.JMSContext; -import jakarta.jms.JMSProducer; - -import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; -import org.jboss.weld.environment.se.WeldContainer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; -import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.SpanProcessor; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; -import io.opentelemetry.sdk.trace.samplers.Sampler; -import io.smallrye.reactive.messaging.jms.*; -import io.smallrye.reactive.messaging.jms.PayloadConsumerBean; -import io.smallrye.reactive.messaging.support.JmsTestBase; -import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; - -public class BatchTracingPropagationTest extends JmsTestBase { - private SdkTracerProvider tracerProvider; - private InMemorySpanExporter spanExporter; - - private JMSContext jms; - private ActiveMQJMSConnectionFactory factory; - private Flow.Subscriber> subscriber; - - @BeforeEach - public void init() { - factory = new ActiveMQJMSConnectionFactory( - "tcp://localhost:61616", - null, null); - jms = factory.createContext(); - } - - @AfterEach - public void close() { - jms.close(); - factory.close(); - } - - private JmsResourceHolder getResourceHolder() { - return new JmsResourceHolder<>("jms", () -> jms); - } - - @BeforeEach - public void setup() { - GlobalOpenTelemetry.resetForTest(); - - spanExporter = InMemorySpanExporter.create(); - SpanProcessor spanProcessor = SimpleSpanProcessor.create(spanExporter); - - tracerProvider = SdkTracerProvider.builder() - .addSpanProcessor(spanProcessor) - .setSampler(Sampler.alwaysOn()) - .build(); - - OpenTelemetrySdk.builder() - .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) - .setTracerProvider(tracerProvider) - .buildAndRegisterGlobal(); - } - - @AfterAll - static void shutdown() { - GlobalOpenTelemetry.resetForTest(); - } - - @SuppressWarnings("ConstantConditions") - @Test - public void testFromAppToJms() { - Map map = new HashMap<>(); - map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.destination", "queue-one"); - MapBasedConfig config = new MapBasedConfig(map); - addConfig(config); - WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); - - PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); - await().until(() -> bean.list().size() >= 10); - - assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - - CompletableResultCode completableResultCode = tracerProvider.forceFlush(); - completableResultCode.whenComplete(() -> { - List spans = spanExporter.getFinishedSpanItems(); - assertEquals(20, spans.size()); - SpanData spanData = spans.get(19); - assertEquals(SpanKind.CONSUMER, spanData.getKind()); - }); - } - - @Test - public void testFromJmsToAppWithParentSpan() { - String parentTopic = "queue-one-parent"; - Map map = new HashMap<>(); - map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.destination", "queue-one"); - MapBasedConfig config = new MapBasedConfig(map); - addConfig(config); - WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); - - PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); - - Map headers = new HashMap<>(); - try (Scope ignored = Context.current().makeCurrent()) { - Tracer tracer = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging"); - Span span = tracer.spanBuilder("producer").setSpanKind(SpanKind.PRODUCER).startSpan(); - Context current = Context.current().with(span); - GlobalOpenTelemetry.getPropagators() - .getTextMapPropagator() - .inject(current, headers, (carrier, key, value) -> carrier.put(key, value.getBytes())); - span.end(); - } - - await().until(() -> bean.list().size() >= 10); - - assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - - CompletableResultCode completableResultCode = tracerProvider.forceFlush(); - completableResultCode.whenComplete(() -> { - List spans = spanExporter.getFinishedSpanItems(); - assertEquals(21, spans.size()); - }); - } - - @Test - public void testFromJmsToAppWithNoParent() { - Map map = new HashMap<>(); - map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.destination", "queue-one"); - MapBasedConfig config = new MapBasedConfig(map); - addConfig(config); - WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); - - PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); - - await().until(() -> bean.list().size() >= 10); - - CompletableResultCode completableResultCode = tracerProvider.forceFlush(); - completableResultCode.whenComplete(() -> { - List spans = spanExporter.getFinishedSpanItems(); - assertEquals(20, spans.size()); - }); - } -} diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java index 82772586d3..133c24c4b9 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java @@ -1,31 +1,46 @@ package io.smallrye.reactive.messaging.jms.tracing; -import static io.opentelemetry.semconv.SemanticAttributes.*; +import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.SemanticAttributes.MESSAGING_SYSTEM; import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSProducer; +import jakarta.jms.ObjectMessage; +import jakarta.jms.Queue; -import org.assertj.core.api.Assertions; +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.jboss.weld.environment.se.WeldContainer; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanId; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.common.CompletableResultCode; @@ -45,8 +60,16 @@ public class TracingPropagationTest extends JmsTestBase { private SdkTracerProvider tracerProvider; private InMemorySpanExporter spanExporter; + private JMSContext jms; + private ActiveMQJMSConnectionFactory factory; + @BeforeEach public void setup() { + factory = new ActiveMQJMSConnectionFactory( + "tcp://localhost:61616", + null, null); + jms = factory.createContext(); + GlobalOpenTelemetry.resetForTest(); spanExporter = InMemorySpanExporter.create(); @@ -63,6 +86,12 @@ public void setup() { .buildAndRegisterGlobal(); } + @AfterEach + public void close() { + jms.close(); + factory.close(); + } + @AfterAll static void shutdown() { GlobalOpenTelemetry.resetForTest(); @@ -82,216 +111,221 @@ public void testFromAppToJms() { PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); await().until(() -> bean.list().size() >= 10); - Assertions.assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); CompletableResultCode completableResultCode = tracerProvider.forceFlush(); - completableResultCode.whenComplete(() -> { - List spans = spanExporter.getFinishedSpanItems(); - assertEquals(20, spans.size()); - - assertEquals(20, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); - - SpanData span = spans.get(0); - assertEquals(SpanKind.PRODUCER, span.getKind()); - assertEquals(3, span.getAttributes().size()); - assertEquals("jms", span.getAttributes().get(MESSAGING_SYSTEM)); - assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); - assertEquals("ActiveMQQueue[" + queue + "]", span.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("ActiveMQQueue[" + queue + "] publish", span.getName()); - }); + completableResultCode.join(10, TimeUnit.SECONDS); + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + + assertEquals(20, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + + SpanData span = spans.get(0); + assertEquals(SpanKind.PRODUCER, span.getKind()); + assertEquals(3, span.getAttributes().size()); + assertEquals("jms", span.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("publish", span.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("ActiveMQQueue[" + queue + "]", span.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals("ActiveMQQueue[" + queue + "] publish", span.getName()); } @Test - @Disabled("Not working yet. ") - public void testFromJmsToAppToKafka() { + public void testFromJmsToAppToJms() { String queue = "queue-one"; - String resultTopic = queue + "-result"; - String parentTopic = queue + "-parent"; + String parentQueue = queue + "-parent"; + String resultQueue = queue + "-result"; + Map map = new HashMap<>(); + map.put("mp.messaging.incoming.source.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.source.destination", parentQueue); + map.put("mp.messaging.outgoing.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.outgoing.jms.destination", resultQueue); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(MyAppProcessingData.class); + + List results = new CopyOnWriteArrayList<>(); + Queue result = jms.createQueue(resultQueue); + JMSConsumer sink = jms.createConsumer(result); + sink.setMessageListener(results::add); + + MyAppProcessingData bean = container.select(MyAppProcessingData.class).get(); + + JMSProducer producer = jms.createProducer(); + Queue parent = jms.createQueue(parentQueue); + for (int i = 0; i < 10; i++) { + producer.send(parent, i); + } + + await().until(() -> results.size() >= 10); + assertThat(results).extracting(m -> Integer.valueOf(m.getBody(String.class))).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, + 10); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.join(10, TimeUnit.SECONDS); + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(20, spans.size()); + + List parentSpans = spans.stream() + .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) + .collect(toList()); + assertEquals(10, parentSpans.size()); + + // for (SpanData parentSpan : parentSpans) { + // assertEquals(1, + // spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); + // } + + SpanData consumer = parentSpans.get(0); + assertEquals(SpanKind.CONSUMER, consumer.getKind()); + assertEquals(3, consumer.getAttributes().size()); + assertEquals("jms", consumer.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals(parentQueue, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals(parentQueue + " receive", consumer.getName()); + + for (SpanData span : spans) { + System.out.println(span.getKind() + " " + span.getSpanId() + " -> " + span.getParentSpanId()); + } + SpanData producerSpan = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId())) + .findFirst().get(); + assertEquals(SpanKind.PRODUCER, producerSpan.getKind()); + assertEquals(3, producerSpan.getAttributes().size()); + assertEquals("jms", producerSpan.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("publish", producerSpan.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("ActiveMQQueue[" + resultQueue + "]", producerSpan.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals("publish", producerSpan.getAttributes().get(MESSAGING_OPERATION)); + assertEquals("ActiveMQQueue[" + resultQueue + "] publish", producerSpan.getName()); + } + + @Test + public void testFromJmsToAppWithParentSpan() { + String parentTopic = "queue-one-parent"; + Map map = new HashMap<>(); + map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.jms.destination", parentTopic); + MapBasedConfig config = new MapBasedConfig(map); + addConfig(config); + WeldContainer container = deploy(PayloadConsumerBean.class); + + PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + + Map properties = new HashMap<>(); + try (Scope ignored = Context.current().makeCurrent()) { + Tracer tracer = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging"); + Span span = tracer.spanBuilder("producer").setSpanKind(SpanKind.PRODUCER).startSpan(); + Context current = Context.current().with(span); + GlobalOpenTelemetry.getPropagators() + .getTextMapPropagator() + .inject(current, properties, (carrier, key, value) -> carrier.put(key, value)); + span.end(); + } + + JMSProducer producer = jms.createProducer(); + Queue queue = jms.createQueue(parentTopic); + for (int i = 0; i < 10; i++) { + ObjectMessage msg = jms.createObjectMessage(i); + properties.forEach((k, v) -> { + try { + msg.setObjectProperty(k, v); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + producer.send(queue, msg); + } + + await().until(() -> bean.list().size() >= 10); + + assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + CompletableResultCode completableResultCode = tracerProvider.forceFlush(); + completableResultCode.join(10, TimeUnit.SECONDS); + // 1 Parent, 10 Children + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(11, spans.size()); + + // All should use the same Trace + assertEquals(1, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + + List parentSpans = spans.stream() + .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())).collect(toList()); + assertEquals(1, parentSpans.size()); + + for (SpanData parentSpan : parentSpans) { + assertEquals(10, + spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); + } + + SpanData producerSpan = parentSpans.get(0); + assertEquals(SpanKind.PRODUCER, producerSpan.getKind()); + + SpanData consumer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(producerSpan.getSpanId())) + .findFirst().get(); + assertEquals(3, consumer.getAttributes().size()); + assertEquals("jms", consumer.getAttributes().get(MESSAGING_SYSTEM)); + assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); + assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); + assertEquals(parentTopic + " receive", consumer.getName()); + + } + + @Test + public void testFromJmsToAppWithNoParent() { Map map = new HashMap<>(); - map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); map.put("mp.messaging.incoming.jms.destination", "queue-one"); MapBasedConfig config = new MapBasedConfig(map); addConfig(config); - WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + WeldContainer container = deploy(PayloadConsumerBean.class); PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); + + JMSProducer producer = jms.createProducer(); + for (int i = 0; i < 10; i++) { + producer.send(jms.createQueue("queue-one"), i); + } + await().until(() -> bean.list().size() >= 10); - Assertions.assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); CompletableResultCode completableResultCode = tracerProvider.forceFlush(); - completableResultCode.whenComplete(() -> { - List spans = spanExporter.getFinishedSpanItems(); - assertEquals(20, spans.size()); - - List parentSpans = spans.stream() - .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())) - .collect(toList()); - assertEquals(20, parentSpans.size()); - - for (SpanData parentSpan : parentSpans) { - assertEquals(1, - spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); - } - - SpanData consumer = parentSpans.get(0); - assertEquals(SpanKind.CONSUMER, consumer.getKind()); - assertEquals(8, consumer.getAttributes().size()); - assertEquals("jms", consumer.getAttributes().get(MESSAGING_SYSTEM)); - assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); - assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("jms-consumer-source", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); - assertEquals(parentTopic + " receive", consumer.getName()); - - SpanData producer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(consumer.getSpanId())) - .findFirst().get(); - assertEquals(SpanKind.PRODUCER, producer.getKind()); - assertEquals(5, producer.getAttributes().size()); - assertEquals("jms", producer.getAttributes().get(MESSAGING_SYSTEM)); - assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION)); - assertEquals(resultTopic, producer.getAttributes().get(MESSAGING_DESTINATION_NAME)); - assertEquals("publish", producer.getAttributes().get(MESSAGING_OPERATION)); - assertEquals("jms-producer-jms", producer.getAttributes().get(MESSAGING_CLIENT_ID)); - assertEquals(resultTopic + " publish", producer.getName()); + completableResultCode.join(10, TimeUnit.SECONDS); + List spans = spanExporter.getFinishedSpanItems(); + assertThat(spans).hasSize(10).allSatisfy(span -> { + assertThat(span.getKind()).isEqualTo(SpanKind.CONSUMER); + assertThat(span.getSpanId()).isNotEqualTo(SpanId.getInvalid()); + assertThat(span.getParentSpanId()).isEqualTo(SpanId.getInvalid()); }); } - // @Test - // public void testFromKafkaToAppWithParentSpan() { - // String parentTopic = topic + "-parent"; - // MyAppReceivingData bean = runApplication(getKafkaSinkConfigForMyAppReceivingData(parentTopic), - // MyAppReceivingData.class); - // - // RecordHeaders headers = new RecordHeaders(); - // try (Scope ignored = Context.current().makeCurrent()) { - // Tracer tracer = GlobalOpenTelemetry.getTracerProvider().get("io.smallrye.reactive.messaging"); - // Span span = tracer.spanBuilder("producer").setSpanKind(SpanKind.PRODUCER).startSpan(); - // Context current = Context.current().with(span); - // GlobalOpenTelemetry.getPropagators() - // .getTextMapPropagator() - // .inject(current, headers, (carrier, key, value) -> carrier.add(key, value.getBytes())); - // span.end(); - // } - // companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(parentTopic, null, null, "a-key", i, headers), 10); - // - // await().until(() -> bean.results().size() >= 10); - // assertThat(bean.results()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - // - // CompletableResultCode completableResultCode = tracerProvider.forceFlush(); - // completableResultCode.whenComplete(() -> { - // // 1 Parent, 10 Children - // List spans = spanExporter.getFinishedSpanItems(); - // assertEquals(11, spans.size()); - // - // // All should use the same Trace - // assertEquals(1, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); - // - // List parentSpans = spans.stream() - // .filter(spanData -> spanData.getParentSpanId().equals(SpanId.getInvalid())).collect(toList()); - // assertEquals(1, parentSpans.size()); - // - // for (SpanData parentSpan : parentSpans) { - // assertEquals(10, - // spans.stream().filter(spanData -> spanData.getParentSpanId().equals(parentSpan.getSpanId())).count()); - // } - // - // SpanData producer = parentSpans.get(0); - // assertEquals(SpanKind.PRODUCER, producer.getKind()); - // - // SpanData consumer = spans.stream().filter(spanData -> spanData.getParentSpanId().equals(producer.getSpanId())) - // .findFirst().get(); - // assertEquals(8, consumer.getAttributes().size()); - // assertEquals("kafka", consumer.getAttributes().get(MESSAGING_SYSTEM)); - // assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION)); - // assertEquals(parentTopic, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME)); - // assertEquals("kafka-consumer-stuff", consumer.getAttributes().get(MESSAGING_CLIENT_ID)); - // assertEquals(parentTopic + " receive", consumer.getName()); - // }); - // } - // - // @Test - // public void testFromKafkaToAppWithNoParent() { - // MyAppReceivingData bean = runApplication( - // getKafkaSinkConfigForMyAppReceivingData(topic), MyAppReceivingData.class); - // - // companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, null, null, "a-key", i), 10); - // - // await().until(() -> bean.results().size() >= 10); - // assertThat(bean.results()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); - // - // CompletableResultCode completableResultCode = tracerProvider.forceFlush(); - // completableResultCode.whenComplete(() -> { - // List spans = spanExporter.getFinishedSpanItems(); - // assertEquals(10, spans.size()); - // - // for (SpanData span : spans) { - // assertEquals(SpanKind.CONSUMER, span.getKind()); - // assertEquals(SpanId.getInvalid(), span.getParentSpanId()); - // } - // }); - // } - // - // private KafkaMapBasedConfig getKafkaSinkConfigForMyAppGeneratingData() { - // return kafkaConfig("mp.messaging.outgoing.kafka", true) - // .put("value.serializer", IntegerSerializer.class.getName()) - // .put("topic", topic); - // } - // - // private KafkaMapBasedConfig getKafkaSinkConfigForMyAppGeneratingDataWithStructuredCloudEvent(String mode) { - // return kafkaConfig("mp.messaging.outgoing.kafka", true) - // .put("value.serializer", StringSerializer.class.getName()) - // .put("topic", topic) - // .put("cloud-events-mode", mode); - // } - // - // private MapBasedConfig getKafkaSinkConfigForMyAppProcessingData(String resultTopic, String parentTopic) { - // return kafkaConfig("mp.messaging.outgoing.kafka", true) - // .put("value.serializer", IntegerSerializer.class.getName()) - // .put("topic", resultTopic) - // .withPrefix("mp.messaging.incoming.source") - // .put("value.deserializer", IntegerDeserializer.class.getName()) - // .put("key.deserializer", StringDeserializer.class.getName()) - // .put("topic", parentTopic) - // .put("auto.offset.reset", "earliest"); - // } - // - // private KafkaMapBasedConfig getKafkaSinkConfigForMyAppReceivingData(String topic) { - // return kafkaConfig("mp.messaging.incoming.stuff", true) - // .put("value.deserializer", IntegerDeserializer.class.getName()) - // .put("key.deserializer", StringDeserializer.class.getName()) - // .put("topic", topic) - // .put("auto.offset.reset", "earliest"); - // } - // - // @ApplicationScoped - // public static class MyAppGeneratingData { - // @Outgoing("kafka") - // public Flow.Publisher source() { - // return Multi.createFrom().range(0, 10); - // } - // } - // @ApplicationScoped public static class MyAppProcessingData { + + private final List list = new CopyOnWriteArrayList<>(); + @Incoming("source") @Outgoing("jms") public Message processMessage(Message input) { + list.add(input.getPayload()); return input.withPayload(input.getPayload() + 1); } + + public List list() { + return list; + } + } + + @ApplicationScoped + public static class MyAppReceivingData { + private final List results = new CopyOnWriteArrayList<>(); + + @Incoming("stuff") + public CompletionStage consume(Message input) { + results.add(input.getPayload()); + return input.ack(); + } + + public List results() { + return results; + } } - // - // @ApplicationScoped - // public static class MyAppReceivingData { - // private final List results = new CopyOnWriteArrayList<>(); - // - // @Incoming("stuff") - // public CompletionStage consume(Message input) { - // results.add(input.getPayload()); - // return input.ack(); - // } - // - // public List results() { - // return results; - // } - // } } From caae6f8e12d0f05eb48cf8e8f69898b86d9614cc Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 29 Aug 2024 12:34:19 +0200 Subject: [PATCH 6/8] Remove clientid and groupid from jmstrace --- .../jms/tracing/JmsAttributesExtractor.java | 15 ++-------- .../messaging/jms/tracing/JmsTrace.java | 28 ++----------------- 2 files changed, 4 insertions(+), 39 deletions(-) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java index 9512442398..4bcca50be9 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -19,15 +19,7 @@ public JmsAttributesExtractor() { @Override public void onStart(final AttributesBuilder attributes, final Context parentContext, final JmsTrace jmsTrace) { - String groupId = jmsTrace.getGroupId(); - String clientId = jmsTrace.getClientId(); - if (groupId != null && clientId != null) { - String consumerId = groupId; - if (!clientId.isEmpty()) { - consumerId += " - " + clientId; - } - attributes.put(MESSAGING_CLIENT_ID, consumerId); - } + } @Override @@ -97,10 +89,7 @@ public Long getMessageEnvelopeSize(JmsTrace jmsTrace) { @Override public String getClientId(JmsTrace jmsTrace) { - if (jmsTrace.getClientId() == null) { - return null; - } - return jmsTrace.getClientId(); + return null; } @Override diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java index 2428f3afe3..45a9737194 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java @@ -4,26 +4,14 @@ import java.util.Map; public class JmsTrace { - private final String groupId; - private final String clientId; private final String queue; private final Map messageProperties; - private JmsTrace(final String groupId, final String clientId, final String queue, Map messageProperties) { - this.groupId = groupId; - this.clientId = clientId; + private JmsTrace(final String queue, Map messageProperties) { this.queue = queue; this.messageProperties = messageProperties; } - public String getGroupId() { - return groupId; - } - - public String getClientId() { - return clientId; - } - public String getQueue() { return queue; } @@ -33,21 +21,9 @@ public Map getMessageProperties() { } public static class Builder { - private String groupId; - private String clientId; private String queue; private Map properties; - public Builder withGroupId(final String groupId) { - this.groupId = groupId; - return this; - } - - public Builder withClientId(final String clientId) { - this.clientId = clientId; - return this; - } - public Builder withQueue(final String queue) { this.queue = queue; return this; @@ -59,7 +35,7 @@ public Builder withProperties(Map properties) { } public JmsTrace build() { - return new JmsTrace(groupId, clientId, queue, properties); + return new JmsTrace(queue, properties); } } } From 0459ff43cb013ad4db9be7cd0a4b06a4d0d27704 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 12 Sep 2024 13:49:36 +0200 Subject: [PATCH 7/8] JmsTrace holds a reference to the Jms message --- .../reactive/messaging/jms/JmsSink.java | 2 +- .../reactive/messaging/jms/JmsSource.java | 2 +- .../jms/tracing/JmsAttributesExtractor.java | 2 - .../messaging/jms/tracing/JmsTrace.java | 54 +++++++++++++++---- .../jms/tracing/JmsTraceTextMapGetter.java | 16 +----- .../jms/tracing/JmsTraceTextMapSetter.java | 5 +- .../MessagePropertiesExtractAdapterTest.java | 19 ++++++- .../jms/tracing/TracingPropagationTest.java | 25 +++++---- 8 files changed, 80 insertions(+), 45 deletions(-) diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java index 8bd3cd7780..5c628d6d07 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSink.java @@ -192,7 +192,7 @@ private void outgoingTrace(Destination actualDestination, Message message, ja } JmsTrace jmsTrace = new JmsTrace.Builder() .withQueue(actualDestination.toString()) - .withProperties(messageProperties) + .withMessage(jmsPayload) .build(); jmsInstrumenter.traceOutgoing(message, jmsTrace); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 6a26e351ec..9d285e970e 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -248,7 +248,7 @@ public void incomingTrace(IncomingJmsMessage jmsMessage) { JmsTrace jmsTrace = new JmsTrace.Builder() .withQueue(queueName.orElse(null)) - .withProperties(properties) + .withMessage(unwrapped) .build(); jmsInstrumenter.traceIncoming(jmsMessage, jmsTrace); diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java index 4bcca50be9..92b85faf2d 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsAttributesExtractor.java @@ -1,7 +1,5 @@ package io.smallrye.reactive.messaging.jms.tracing; -import static io.opentelemetry.semconv.SemanticAttributes.*; - import java.util.Collections; import java.util.List; diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java index 45a9737194..177ccbf8c0 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTrace.java @@ -1,41 +1,73 @@ package io.smallrye.reactive.messaging.jms.tracing; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import jakarta.jms.JMSException; +import jakarta.jms.Message; public class JmsTrace { private final String queue; - private final Map messageProperties; + private final Message jmsMessage; - private JmsTrace(final String queue, Map messageProperties) { + private JmsTrace(final String queue, Message jmsMessage) { this.queue = queue; - this.messageProperties = messageProperties; + this.jmsMessage = jmsMessage; } public String getQueue() { return queue; } - public Map getMessageProperties() { - return messageProperties; + public Message getMessage() { + return jmsMessage; + } + + public List getPropertyNames() { + List keys = new ArrayList<>(); + Enumeration propertyNames = null; + try { + propertyNames = jmsMessage.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + keys.add(propertyNames.nextElement().toString()); + } + } catch (JMSException ignored) { + } + return keys; + } + + public String getProperty(final String key) { + try { + return jmsMessage.getStringProperty(key); + } catch (JMSException ignored) { + return null; + } + } + + public void setProperty(final String key, final String value) { + try { + jmsMessage.setStringProperty(key, value); + } catch (JMSException ignored) { + } } public static class Builder { private String queue; - private Map properties; + private Message jmsMessage; public Builder withQueue(final String queue) { this.queue = queue; return this; } - public Builder withProperties(Map properties) { - this.properties = new HashMap<>(properties); + public Builder withMessage(final Message message) { + this.jmsMessage = message; return this; } public JmsTrace build() { - return new JmsTrace(queue, properties); + return new JmsTrace(queue, jmsMessage); } } } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java index b6978ef609..89d81be8ec 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapGetter.java @@ -1,9 +1,5 @@ package io.smallrye.reactive.messaging.jms.tracing; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import io.opentelemetry.context.propagation.TextMapGetter; public enum JmsTraceTextMapGetter implements TextMapGetter { @@ -11,21 +7,13 @@ public enum JmsTraceTextMapGetter implements TextMapGetter { @Override public Iterable keys(final JmsTrace carrier) { - List keys = new ArrayList<>(); - Map messageProperties = carrier.getMessageProperties(); - for (String key : messageProperties.keySet()) { - keys.add(key); - } - return keys; + return carrier.getPropertyNames(); } @Override public String get(final JmsTrace carrier, final String key) { if (carrier != null) { - Object value = carrier.getMessageProperties().get(key); - if (value != null) { - return value.toString(); - } + return carrier.getProperty(key); } return null; } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java index ffc7409b0c..471d38a307 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/tracing/JmsTraceTextMapSetter.java @@ -1,7 +1,5 @@ package io.smallrye.reactive.messaging.jms.tracing; -import java.util.Map; - import io.opentelemetry.context.propagation.TextMapSetter; public enum JmsTraceTextMapSetter implements TextMapSetter { @@ -10,8 +8,7 @@ public enum JmsTraceTextMapSetter implements TextMapSetter { @Override public void set(final JmsTrace carrier, final String key, final String value) { if (carrier != null) { - Map messageProperties = carrier.getMessageProperties(); - messageProperties.put(key, value); + carrier.setProperty(key, value); } } } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java index 22f77681d0..ecba271e56 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/MessagePropertiesExtractAdapterTest.java @@ -1,22 +1,37 @@ package io.smallrye.reactive.messaging.jms.tracing; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import java.util.HashMap; import java.util.Map; +import jakarta.jms.JMSException; +import jakarta.jms.Message; + import org.junit.jupiter.api.Test; class MessagePropertiesExtractAdapterTest { @Test - public void verifyNullHeaderHandled() { + public void verifyNullHeaderHandled() throws JMSException { Map messageProperties = new HashMap<>(); messageProperties.put("test_null_header", null); - JmsTrace jmsTrace = new JmsTrace.Builder().withProperties(messageProperties).build(); + // Create a mock JMS message + Message msg = mock(Message.class); + doAnswer(i -> messageProperties.get(i.getArgument(0, String.class))) + .when(msg).getStringProperty(anyString()); + doAnswer(i -> messageProperties.put(i.getArgument(0, String.class), i.getArgument(1, String.class))) + .when(msg).setStringProperty(anyString(), anyString()); + + JmsTrace jmsTrace = new JmsTrace.Builder().withMessage(msg).build(); String headerValue = JmsTraceTextMapGetter.INSTANCE.get(jmsTrace, "test_null_header"); + JmsTraceTextMapSetter.INSTANCE.set(jmsTrace, "test_other_header", "value"); assertThat(headerValue).isNull(); + assertThat(messageProperties.get("test_other_header")).isEqualTo("value"); } } diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java index 133c24c4b9..89628c8f61 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/tracing/TracingPropagationTest.java @@ -50,6 +50,7 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; +import io.smallrye.reactive.messaging.jms.IncomingJmsMessageMetadata; import io.smallrye.reactive.messaging.jms.JmsConnector; import io.smallrye.reactive.messaging.jms.PayloadConsumerBean; import io.smallrye.reactive.messaging.jms.ProducerBean; @@ -103,22 +104,26 @@ public void testFromAppToJms() { String queue = "queue-one"; Map map = new HashMap<>(); map.put("mp.messaging.outgoing.queue-one.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.connector", JmsConnector.CONNECTOR_NAME); - map.put("mp.messaging.incoming.jms.destination", "queue-one"); + map.put("mp.messaging.incoming.stuff.connector", JmsConnector.CONNECTOR_NAME); + map.put("mp.messaging.incoming.stuff.destination", "queue-one"); MapBasedConfig config = new MapBasedConfig(map); addConfig(config); - WeldContainer container = deploy(PayloadConsumerBean.class, ProducerBean.class); + WeldContainer container = deploy(MyAppReceivingData.class, ProducerBean.class); - PayloadConsumerBean bean = container.select(PayloadConsumerBean.class).get(); - await().until(() -> bean.list().size() >= 10); - assertThat(bean.list()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + MyAppReceivingData bean = container.select(MyAppReceivingData.class).get(); + await().until(() -> bean.results().size() >= 10); + assertThat(bean.results()) + .extracting(m -> m.getMetadata(IncomingJmsMessageMetadata.class).get()) + .allSatisfy(m -> assertThat(m.getStringProperty("traceparent")).isNotNull()) + .extracting(m -> Integer.parseInt(m.getBody(String.class))) + .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); CompletableResultCode completableResultCode = tracerProvider.forceFlush(); completableResultCode.join(10, TimeUnit.SECONDS); List spans = spanExporter.getFinishedSpanItems(); assertEquals(20, spans.size()); - assertEquals(20, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); + assertEquals(10, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size()); SpanData span = spans.get(0); assertEquals(SpanKind.PRODUCER, span.getKind()); @@ -316,15 +321,15 @@ public List list() { @ApplicationScoped public static class MyAppReceivingData { - private final List results = new CopyOnWriteArrayList<>(); + private final List> results = new CopyOnWriteArrayList<>(); @Incoming("stuff") public CompletionStage consume(Message input) { - results.add(input.getPayload()); + results.add(input); return input.ack(); } - public List results() { + public List> results() { return results; } } From e223309dc5ef03c17293e6878f401cacbfbcab1b Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 19 Sep 2024 13:58:40 +0200 Subject: [PATCH 8/8] JMS message context support --- .../messaging/jms/IncomingJmsMessage.java | 7 +- .../reactive/messaging/jms/JmsConnector.java | 6 +- .../reactive/messaging/jms/JmsSource.java | 8 +- .../reactive/messaging/jms/JmsSourceTest.java | 7 +- .../messaging/jms/LocalPropagationTest.java | 652 ++++++++++++++++++ 5 files changed, 674 insertions(+), 6 deletions(-) create mode 100644 smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java index c6abaea919..a4f774d288 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/IncomingJmsMessage.java @@ -1,6 +1,7 @@ package io.smallrye.reactive.messaging.jms; import static io.smallrye.reactive.messaging.jms.i18n.JmsExceptions.ex; +import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -14,8 +15,9 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.json.JsonMapping; import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; -public class IncomingJmsMessage implements MetadataInjectableMessage { +public class IncomingJmsMessage implements ContextAwareMessage, MetadataInjectableMessage { private final Message delegate; private final Executor executor; private final Class clazz; @@ -43,7 +45,7 @@ public class IncomingJmsMessage implements MetadataInjectableMessage { } this.jmsMetadata = new IncomingJmsMessageMetadata(message); - this.metadata = Metadata.of(this.jmsMetadata); + this.metadata = captureContextMetadata(this.jmsMetadata); } @SuppressWarnings("unchecked") @@ -120,6 +122,7 @@ public CompletionStage ack(Metadata metadata) { } }) .runSubscriptionOn(executor) + .emitOn(this::runOnMessageContext) .subscribeAsCompletionStage(); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java index 8d3a091cac..8a7d201a79 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsConnector.java @@ -35,6 +35,7 @@ import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; @ApplicationScoped @@ -94,6 +95,9 @@ public class JmsConnector implements InboundConnector, OutboundConnector { @Inject Instance jsonMapper; + @Inject + ExecutionHolder executionHolders; + @Inject @ConfigProperty(name = "smallrye.jms.threads.max-pool-size", defaultValue = DEFAULT_MAX_POOL_SIZE) int maxPoolSize; @@ -139,7 +143,7 @@ public Flow.Publisher> getPublisher(Config config) { JmsConnectorIncomingConfiguration ic = new JmsConnectorIncomingConfiguration(config); JmsResourceHolder holder = new JmsResourceHolder<>(ic.getChannel(), () -> createJmsContext(ic)); contexts.add(holder); - JmsSource source = new JmsSource(holder, ic, openTelemetryInstance, jsonMapping, executor); + JmsSource source = new JmsSource(executionHolders.vertx(), holder, ic, openTelemetryInstance, jsonMapping, executor); sources.add(source); return source.getSource(); } diff --git a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java index 9d285e970e..c6f69e5ded 100644 --- a/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java +++ b/smallrye-reactive-messaging-jms/src/main/java/io/smallrye/reactive/messaging/jms/JmsSource.java @@ -31,6 +31,9 @@ import io.smallrye.reactive.messaging.jms.tracing.JmsOpenTelemetryInstrumenter; import io.smallrye.reactive.messaging.jms.tracing.JmsTrace; import io.smallrye.reactive.messaging.json.JsonMapping; +import io.vertx.core.impl.VertxInternal; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; class JmsSource { @@ -40,8 +43,9 @@ class JmsSource { private final JmsPublisher publisher; private final boolean isTracingEnabled; private final JmsOpenTelemetryInstrumenter jmsInstrumenter; + private final Context context; - JmsSource(JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, + JmsSource(Vertx vertx, JmsResourceHolder resourceHolder, JmsConnectorIncomingConfiguration config, Instance openTelemetryInstance, JsonMapping jsonMapping, Executor executor) { this.isTracingEnabled = config.getTracingEnabled(); @@ -72,7 +76,9 @@ class JmsSource { } this.publisher = new JmsPublisher(resourceHolder); + this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); source = Multi.createFrom().publisher(publisher) + .emitOn(context::runOnContext) .> map(m -> new IncomingJmsMessage<>(m, executor, jsonMapping)) .onItem().invoke(this::incomingTrace) .onFailure(t -> { diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java index 174a1de083..e711e1765b 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSourceTest.java @@ -27,15 +27,18 @@ import io.smallrye.mutiny.Multi; import io.smallrye.reactive.messaging.support.JmsTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; @SuppressWarnings("ReactiveStreamsSubscriberImplementation") public class JmsSourceTest extends JmsTestBase { private JMSContext jms; private ActiveMQJMSConnectionFactory factory; + private Vertx vertx; @BeforeEach public void init() { + vertx = Vertx.vertx(); factory = new ActiveMQJMSConnectionFactory( "tcp://localhost:61616", null, null); @@ -185,7 +188,7 @@ public void testReceptionOfMultipleMessages() { @Test public void testMultipleRequests() { - JmsSource source = new JmsSource(getResourceHolder("queue"), + JmsSource source = new JmsSource(vertx, getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig().put("channel-name", "queue")), UnsatisfiedInstance.instance(), null, null); Publisher> publisher = source.getSource(); @@ -238,7 +241,7 @@ public void onComplete() { @Test public void testBroadcast() { - JmsSource source = new JmsSource(getResourceHolder("queue"), + JmsSource source = new JmsSource(vertx, getResourceHolder("queue"), new JmsConnectorIncomingConfiguration(new MapBasedConfig() .with("channel-name", "queue").with("broadcast", true)), UnsatisfiedInstance.instance(), null, null); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java new file mode 100644 index 0000000000..71d8c481dd --- /dev/null +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/LocalPropagationTest.java @@ -0,0 +1,652 @@ +package io.smallrye.reactive.messaging.jms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSProducer; +import jakarta.jms.ObjectMessage; +import jakarta.jms.Queue; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.common.vertx.ContextLocals; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.reactive.messaging.annotations.Blocking; +import io.smallrye.reactive.messaging.annotations.Broadcast; +import io.smallrye.reactive.messaging.annotations.Merge; +import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; +import io.smallrye.reactive.messaging.support.JmsTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.core.impl.ConcurrentHashSet; +import io.vertx.mutiny.core.Vertx; + +public class LocalPropagationTest extends JmsTestBase { + + private JMSContext jms; + private ActiveMQJMSConnectionFactory factory; + private ExecutorService executor; + private Queue queue; + + @BeforeEach + public void init() { + factory = new ActiveMQJMSConnectionFactory( + "tcp://localhost:61616", + null, null); + jms = factory.createContext(); + executor = Executors.newFixedThreadPool(3); + queue = jms.createQueue("queue-one"); + } + + @AfterEach + public void close() { + jms.close(); + factory.close(); + executor.shutdown(); + } + + private MapBasedConfig dataconfig() { + return new MapBasedConfig() + .with("mp.messaging.incoming.data.connector", JmsConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data.destination", "queue-one") + .with("mp.messaging.incoming.data.tracing.enabled", false); + } + + private void produceIntegers() { + JMSProducer producer = jms.createProducer(); + for (int i = 1; i < 6; i++) { + ObjectMessage message = jms.createObjectMessage(i); + producer.send(queue, message); + } + } + + private T runApplication(MapBasedConfig config, Class beanClass) { + addConfig(config); + WeldContainer container = deploy(beanClass); + return container.getBeanManager().createInstance().select(beanClass).get(); + } + + @Test + public void testLinearPipeline() { + LinearPipeline bean = runApplication(dataconfig(), LinearPipeline.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + } + + @Test + public void testPipelineWithABlockingStage() { + PipelineWithABlockingStage bean = runApplication(dataconfig(), PipelineWithABlockingStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithAnUnorderedBlockingStage() { + PipelineWithAnUnorderedBlockingStage bean = runApplication(dataconfig(), PipelineWithAnUnorderedBlockingStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithMultipleBlockingStages() { + PipelineWithMultipleBlockingStages bean = runApplication(dataconfig(), PipelineWithMultipleBlockingStages.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(2, 3, 4, 5, 6); + } + + @Test + public void testPipelineWithBroadcastAndMerge() { + PipelineWithBroadcastAndMerge bean = runApplication(dataconfig(), PipelineWithBroadcastAndMerge.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 10; + }); + assertThat(bean.getResults()).hasSize(10).contains(2, 3, 4, 5, 6); + } + + @Test + public void testLinearPipelineWithAckOnCustomThread() { + LinearPipelineWithAckOnCustomThread bean = runApplication(dataconfig(), LinearPipelineWithAckOnCustomThread.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @Test + public void testPipelineWithAnAsyncStage() { + PipelineWithAnAsyncStage bean = runApplication(dataconfig(), PipelineWithAnAsyncStage.class); + produceIntegers(); + + await().atMost(20, TimeUnit.SECONDS).until(() -> { + List results = bean.getResults(); + System.out.println(results); + return results.size() >= 5; + }); + assertThat(bean.getResults()).containsExactly(2, 3, 4, 5, 6); + + } + + @ApplicationScoped + public static class LinearPipeline { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + public Integer handle(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class LinearPipelineWithAckOnCustomThread { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + private final Executor executor = Executors.newFixedThreadPool(4); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + return input.withPayload(input.getPayload() + 1) + .withAck(() -> { + CompletableFuture cf = new CompletableFuture<>(); + executor.execute(() -> { + cf.complete(null); + }); + return cf; + }); + } + + @Incoming("process") + @Outgoing("after-process") + public Integer handle(int payload) { + try { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + } catch (Exception e) { + e.printStackTrace(); + } + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + try { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + } catch (Exception e) { + e.printStackTrace(); + } + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithABlockingStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + @Blocking + public Integer handle(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithAnUnorderedBlockingStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("after-process") + @Blocking(ordered = false) + public Integer handle(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithMultipleBlockingStages { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + System.out.println("Processing " + input.getPayload()); + String value = UUID.randomUUID().toString(); + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("second-blocking") + @Blocking(ordered = false) + public Integer handle(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(uuids.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("second-blocking") + @Outgoing("after-process") + @Blocking + public Integer handle2(int payload) throws InterruptedException { + Thread.sleep(random.nextInt(10)); + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithBroadcastAndMerge { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set branch1 = new ConcurrentHashSet<>(); + private final Set branch2 = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + @Broadcast(2) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + + assertThat((String) Vertx.currentContext().getLocal("uuid")).isNull(); + Vertx.currentContext().putLocal("uuid", value); + Vertx.currentContext().putLocal("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + private final Random random = new Random(); + + @Incoming("process") + @Outgoing("after-process") + public Integer branch1(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(branch1.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("process") + @Outgoing("after-process") + public Integer branch2(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + assertThat(branch2.add(uuid)).isTrue(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("after-process") + @Outgoing("sink") + @Merge + public Integer afterProcess(int payload) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = Vertx.currentContext().getLocal("uuid"); + assertThat(uuid).isNotNull(); + + int p = Vertx.currentContext().getLocal("input"); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + + @ApplicationScoped + public static class PipelineWithAnAsyncStage { + + private final List list = new CopyOnWriteArrayList<>(); + private final Set uuids = new ConcurrentHashSet<>(); + + @Incoming("data") + @Outgoing("process") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Message process(Message input) { + String value = UUID.randomUUID().toString(); + assertThat((String) ContextLocals.get("uuid", null)).isNull(); + ContextLocals.put("uuid", value); + ContextLocals.put("input", input.getPayload()); + + assertThat(input.getMetadata(LocalContextMetadata.class)).isPresent(); + + return input.withPayload(input.getPayload() + 1); + } + + @Incoming("process") + @Outgoing("after-process") + public Uni handle(int payload) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + assertThat(uuids.add(uuid)).isTrue(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(payload); + return Uni.createFrom().item(() -> payload) + .runSubscriptionOn(Infrastructure.getDefaultExecutor()); + } + + @Incoming("after-process") + @Outgoing("sink") + public Integer afterProcess(int payload) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(payload); + return payload; + } + + @Incoming("sink") + public void sink(int val) { + String uuid = ContextLocals.get("uuid", null); + assertThat(uuid).isNotNull(); + + int p = ContextLocals.get("input", null); + assertThat(p + 1).isEqualTo(val); + list.add(val); + } + + public List getResults() { + return list; + } + } + +}