From 12377982b369c6434f1c39143d8020293ae972cf Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 10 Sep 2024 12:53:54 +0200 Subject: [PATCH] Added access to individual messages from incoming batch metadata for Kafka and Pulsar --- .../main/docs/jms/receiving-jms-messages.md | 5 +-- .../src/main/docs/jms/sending-jms-messages.md | 8 ++-- .../docs/kafka/receiving-kafka-records.md | 13 +++++- .../docs/pulsar/receiving-pulsar-messages.md | 14 +++++- .../inbound/KafkaRecordBatchExample.java | 39 ++++++++++------ .../inbound/PulsarMessageBatchExample.java | 18 +++++++- .../api/IncomingKafkaRecordBatchMetadata.java | 40 ++++++++++++++++- .../kafka/IncomingKafkaRecordBatch.java | 5 ++- .../messaging/kafka/BatchConsumerTest.java | 8 ++++ .../providers/ProcessorMediator.java | 6 +-- .../AsynchronousPayloadProcessorAckTest.java | 4 +- .../SynchronousPayloadProcessorAckTest.java | 9 ++-- .../pulsar/PulsarIncomingBatchMessage.java | 3 +- .../PulsarIncomingBatchMessageMetadata.java | 31 +++++++++++++ .../messaging/pulsar/base/WeldTestBase.java | 2 + .../pulsar/batch/PulsarBatchReceiveTest.java | 44 +++++++++++++++++++ 16 files changed, 210 insertions(+), 39 deletions(-) diff --git a/documentation/src/main/docs/jms/receiving-jms-messages.md b/documentation/src/main/docs/jms/receiving-jms-messages.md index c1426b5f9e..67283792a1 100644 --- a/documentation/src/main/docs/jms/receiving-jms-messages.md +++ b/documentation/src/main/docs/jms/receiving-jms-messages.md @@ -60,9 +60,8 @@ If the target type is a primitive type ort `String`, the resulting message contains the mapped payload. If the target type is a class, the object is built using included JSON -deserializer (JSON-B and Jackson provided OOB, for more details see -[Serde](serde)), from the `JMSType`. If not, the default behavior is -used (Java deserialization). +deserializer (JSON-B and Jackson provided OOB from the `JMSType`. +If not, the default behavior is used (Java deserialization). ## Inbound Metadata diff --git a/documentation/src/main/docs/jms/sending-jms-messages.md b/documentation/src/main/docs/jms/sending-jms-messages.md index c0d447deac..46c04c590c 100644 --- a/documentation/src/main/docs/jms/sending-jms-messages.md +++ b/documentation/src/main/docs/jms/sending-jms-messages.md @@ -49,10 +49,10 @@ as `String` and the `JMSType` is set to the target class. The If the payload is a `byte[]`, it’s passed as `byte[]` in a JMS `BytesMessage`. -Otherwise, the payload is encoded using included JSON serializer (JSON-B -and Jackson provided OOB, for more details see [Serde](serde)). The -`JMSType` is set to the target class. The `_classname` property is also -set. The JMS Message is a `TextMessage`. +Otherwise, the payload is encoded using included JSON serializer (JSON-B and Jackson provided OOB. +The `JMSType` is set to the target class. +The `_classname` property is also set. +The JMS Message is a `TextMessage`. For example, the following code serialize the produced `Person` using JSON-B. diff --git a/documentation/src/main/docs/kafka/receiving-kafka-records.md b/documentation/src/main/docs/kafka/receiving-kafka-records.md index df1008fb7b..8af345c199 100644 --- a/documentation/src/main/docs/kafka/receiving-kafka-records.md +++ b/documentation/src/main/docs/kafka/receiving-kafka-records.md @@ -602,8 +602,7 @@ container type to receive all the data: {{ insert('kafka/inbound/KafkaRecordBatchPayloadExample.java', 'code') }} ``` -The incoming method can also receive `Message`, -`KafkaBatchRecords` `ConsumerRecords` types, They +The incoming method can also receive `Message` or `ConsumerRecords` types, They give access to record details such as offset or timestamp : ``` java @@ -618,6 +617,16 @@ Conversely, if the processing throws an exception, all messages are *nacked*, applying the failure strategy for all the records inside the batch. +### Accessing metadata of batch records + +When receiving records in batch mode, the metadata of each record is accessible through the `IncomingKafkaRecordBatchMetadata` : + +``` java +{{ insert('kafka/inbound/KafkaRecordBatchExample.java', 'batch') }} +``` + +Like in this example, this can be useful to propagate the tracing information of each record. + ## Manual topic-partition assignment The default behavior of Kafka incoming channels is to subscribe to one or more topics in order to receive records from the Kafka broker. diff --git a/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md b/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md index 3c45cefb30..e433003378 100644 --- a/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md +++ b/documentation/src/main/docs/pulsar/receiving-pulsar-messages.md @@ -142,16 +142,26 @@ mp.messaging.incoming.data.subscriptionType=Shared By default, incoming methods receive each Pulsar message individually. You can enable batch mode using `batchReceive=true` property, or setting a `batchReceivePolicy` in consumer configuration. -```java +``` java {{ insert('pulsar/inbound/PulsarMessageBatchExample.java', 'code') }} ``` Or you can directly receive the list of payloads to the consume method: -```java +``` java {{ insert('pulsar/inbound/PulsarMessageBatchPayloadExample.java', 'code') }} ``` +### Accessing metadata of batch records + +When receiving records in batch mode, the metadata of each record is accessible through the `PulsarIncomingBatchMessageMetadata` : + +``` java +{{ insert('pulsar/inbound/PulsarMessageBatchExample.java', 'batch') }} +``` + +Like in this example, this can be useful to propagate the tracing information of each record. + ## Configuration Reference {{ insert('../../../target/connectors/smallrye-pulsar-incoming.md') }} diff --git a/documentation/src/main/java/kafka/inbound/KafkaRecordBatchExample.java b/documentation/src/main/java/kafka/inbound/KafkaRecordBatchExample.java index 7689fc0fe8..0878eb1c15 100644 --- a/documentation/src/main/java/kafka/inbound/KafkaRecordBatchExample.java +++ b/documentation/src/main/java/kafka/inbound/KafkaRecordBatchExample.java @@ -1,6 +1,6 @@ package kafka.inbound; -import java.time.Instant; +import java.util.List; import java.util.concurrent.CompletionStage; import jakarta.enterprise.context.ApplicationScoped; @@ -9,27 +9,25 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; -import io.smallrye.reactive.messaging.kafka.KafkaRecord; -import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch; -import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.TracingMetadata; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata; @ApplicationScoped public class KafkaRecordBatchExample { // @Incoming("prices") - public CompletionStage consumeMessage(KafkaRecordBatch records) { - for (KafkaRecord record : records) { - record.getMetadata(IncomingKafkaRecordMetadata.class).ifPresent(metadata -> { - int partition = metadata.getPartition(); - long offset = metadata.getOffset(); - Instant timestamp = metadata.getTimestamp(); - //... process messages - }); + public CompletionStage consumeMessage(Message> msg) { + IncomingKafkaRecordBatchMetadata batchMetadata = msg.getMetadata(IncomingKafkaRecordBatchMetadata.class).get(); + for (ConsumerRecord record : batchMetadata.getRecords()) { + int partition = record.partition(); + long offset = record.offset(); + long timestamp = record.timestamp(); } // ack will commit the latest offsets (per partition) of the batch. - return records.ack(); + return msg.ack(); } @Incoming("prices") @@ -42,4 +40,19 @@ public void consumeRecords(ConsumerRecords records) { } // + // + @Incoming("prices") + public void consumeRecords(ConsumerRecords records, IncomingKafkaRecordBatchMetadata metadata) { + for (TopicPartition partition : records.partitions()) { + for (ConsumerRecord record : records.records(partition)) { + TracingMetadata tracing = metadata.getMetadataForRecord(record, TracingMetadata.class); + if (tracing != null) { + tracing.getCurrentContext().makeCurrent(); + } + //... process messages + } + } + } + // + } diff --git a/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java b/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java index 2265e4a465..42d96e6f8c 100644 --- a/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java +++ b/documentation/src/main/java/pulsar/inbound/PulsarMessageBatchExample.java @@ -8,6 +8,7 @@ import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; +import io.smallrye.reactive.messaging.TracingMetadata; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessageMetadata; @ApplicationScoped @@ -29,11 +30,26 @@ public CompletionStage consumeMessage(Message> messages) { } @Incoming("prices") - public void consumeRecords(org.apache.pulsar.client.api.Messages messages) { + public void consumeMessages(org.apache.pulsar.client.api.Messages messages) { for (org.apache.pulsar.client.api.Message msg : messages) { //... process messages } } // + + // + @Incoming("prices") + public void consumeMessages(org.apache.pulsar.client.api.Messages messages, + PulsarIncomingBatchMessageMetadata metadata) { + for (org.apache.pulsar.client.api.Message message : messages) { + TracingMetadata tracing = metadata.getMetadataForMessage(message, TracingMetadata.class); + if (tracing != null) { + tracing.getCurrentContext().makeCurrent(); + } + //... process messages + } + } + // + } diff --git a/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java b/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java index 4e46221252..a844f7a79d 100644 --- a/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java +++ b/smallrye-reactive-messaging-kafka-api/src/main/java/io/smallrye/reactive/messaging/kafka/api/IncomingKafkaRecordBatchMetadata.java @@ -1,12 +1,15 @@ package io.smallrye.reactive.messaging.kafka.api; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.eclipse.microprofile.reactive.messaging.Message; /** * Contains information about the batch of messages received from a channel backed by Kafka. @@ -24,16 +27,24 @@ public class IncomingKafkaRecordBatchMetadata { private final int index; private final Map offsets; private final int consumerGroupGenerationId; + private final List> batchedMessages; - public IncomingKafkaRecordBatchMetadata(ConsumerRecords records, String channel, int index, - Map offsets, int consumerGroupGenerationId) { + public IncomingKafkaRecordBatchMetadata(ConsumerRecords records, List> batchedMessages, + String channel, int index, Map offsets, + int consumerGroupGenerationId) { this.records = records; + this.batchedMessages = batchedMessages; this.channel = channel; this.index = index; this.offsets = Collections.unmodifiableMap(offsets); this.consumerGroupGenerationId = consumerGroupGenerationId; } + public IncomingKafkaRecordBatchMetadata(ConsumerRecords records, String channel, int index, + Map offsets, int consumerGroupGenerationId) { + this(records, Collections.emptyList(), channel, index, offsets, consumerGroupGenerationId); + } + public IncomingKafkaRecordBatchMetadata(ConsumerRecords records, String channel, Map offsets) { this(records, channel, -1, offsets, -1); @@ -81,4 +92,29 @@ public int getConsumerIndex() { public int getConsumerGroupGenerationId() { return consumerGroupGenerationId; } + + /** + * @return batched messages + */ + public List> getBatchedMessages() { + return batchedMessages; + } + + /** + * Get metadata object for the given record. + * This method is useful when you need to access metadata for a specific record in the batch. + * + * @param rec Kafka consumer record + * @param metadata metadata type class + * @return metadata object for the given record + * @param metadata type + */ + public M getMetadataForRecord(ConsumerRecord rec, Class metadata) { + for (Message record : batchedMessages) { + if (record.getMetadata().get(IncomingKafkaRecordMetadata.class).orElseThrow().getRecord().equals(rec)) { + return record.getMetadata(metadata).orElse(null); + } + } + return null; + } } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java index b8794bf125..508ac9c262 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecordBatch.java @@ -17,6 +17,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.Metadata; import io.smallrye.mutiny.Multi; @@ -50,8 +51,10 @@ public IncomingKafkaRecordBatch(ConsumerRecords records, String channel, i generationId = entry.getValue().getConsumerGroupGenerationId(); offsets.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().getOffset())); } + // This is safe because the IncomingKafkaRecord is Message + List> batchedRecords = (List>) (List) this.incomingRecords; this.metadata = captureContextMetadata( - new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets, generationId)); + new IncomingKafkaRecordBatchMetadata<>(records, batchedRecords, channel, index, offsets, generationId)); } @Override diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java index 0d637e5a5d..2c8003fc04 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/BatchConsumerTest.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.Test; import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata; +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase; import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig; @@ -93,6 +94,13 @@ void testIncomingConsumingMessageWithMetadata() { assertThat(r.value()).startsWith("v-"); assertThat(r.key()).startsWith("k"); }); + assertThat(bean.metadata()).allSatisfy(m -> { + assertThat(m.getBatchedMessages()).isNotEmpty(); + assertThat(m.getRecords()).allSatisfy(r -> { + assertThat(m.getMetadataForRecord(r, IncomingKafkaRecordMetadata.class)).isNotNull() + .extracting(IncomingKafkaRecordMetadata::getRecord).isEqualTo(r); + }); + }); } @ApplicationScoped diff --git a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java index 34d02575be..943ab5b274 100644 --- a/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java +++ b/smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java @@ -26,7 +26,6 @@ import io.smallrye.reactive.messaging.providers.helpers.AcknowledgementCoordinator; import io.smallrye.reactive.messaging.providers.helpers.ClassUtils; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; -import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; import mutiny.zero.flow.adapters.AdaptersToFlow; @SuppressWarnings("ReactiveStreamsUnusedPublisher") @@ -494,8 +493,9 @@ private Uni> handlePostInvocationWithMessage(Message) res.withAckWithMetadata(m -> res.ack(m).thenCompose(x -> in.ack(m))) - .withNackWithMetadata((t, m) -> res.nack(t, m).thenCompose(x -> in.nack(t, m)))); + return Uni.createFrom() + .item((Message) res.withAckWithMetadata(m -> res.ack(m).thenCompose(x -> in.ack(m))) + .withNackWithMetadata((t, m) -> res.nack(t, m).thenCompose(x -> in.nack(t, m)))); } return Uni.createFrom().item((Message) res); diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java index 564b4f582a..accc85b702 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/AsynchronousPayloadProcessorAckTest.java @@ -43,7 +43,8 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayload() throws } @Test - public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningCompletionStageOfMessage() throws InterruptedException { + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningCompletionStageOfMessage() + throws InterruptedException { addBeanClass(SuccessfulPayloadProcessorCompletionStageOfMessage.class); initialize(); Emitter emitter = get(EmitterBean.class).emitter(); @@ -195,7 +196,6 @@ public Uni process(String s) { } - @ApplicationScoped public static class SuccessfulPayloadProcessorCompletionStageOfMessage { diff --git a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java index b991a4c528..32e02ed419 100644 --- a/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java +++ b/smallrye-reactive-messaging-provider/src/test/java/io/smallrye/reactive/messaging/acknowledgement/SynchronousPayloadProcessorAckTest.java @@ -83,7 +83,6 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws assertThat(nacked).hasSize(0); } - @Test public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessage() throws InterruptedException { addBeanClass(SuccessfulMessageToMessageProcessor.class); @@ -102,7 +101,8 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningM } @Test - public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessing() throws InterruptedException { + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessing() + throws InterruptedException { addBeanClass(SuccessfulMessageToMessageProcessorPostProcessing.class); initialize(); Emitter emitter = get(EmitterBean.class).emitter(); @@ -118,9 +118,9 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningM assertThat(nacked).hasSize(0); } - @Test - public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessingDuplicate() throws InterruptedException { + public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessingDuplicate() + throws InterruptedException { addBeanClass(SuccessfulMessageToMessageProcessorPostProcessingDuplicate.class); initialize(); Emitter emitter = get(EmitterBean.class).emitter(); @@ -136,7 +136,6 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningM assertThat(nacked).hasSize(0); } - @Test public void testThatMessagesAreNackedAfterFailingProcessingOfMessageReturningMessage() throws InterruptedException { addBeanClass(FailingMessageToMessageProcessor.class); diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java index 55278c42cc..5cd7390668 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessage.java @@ -35,7 +35,8 @@ public PulsarIncomingBatchMessage(Messages messages, PulsarAckHandler ackHand } this.incomingMessages = Collections.unmodifiableList(incomings); this.payload = Collections.unmodifiableList(payloads); - this.metadata = captureContextMetadata(new PulsarIncomingBatchMessageMetadata(messages)); + List> batchedMessages = (List>) (List) incomings; + this.metadata = captureContextMetadata(new PulsarIncomingBatchMessageMetadata(messages, batchedMessages)); } @Override diff --git a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessageMetadata.java b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessageMetadata.java index dd7f16d11b..22777cfe7f 100644 --- a/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessageMetadata.java +++ b/smallrye-reactive-messaging-pulsar/src/main/java/io/smallrye/reactive/messaging/pulsar/PulsarIncomingBatchMessageMetadata.java @@ -2,19 +2,50 @@ import static io.smallrye.reactive.messaging.pulsar.i18n.PulsarMessages.msg; +import java.util.Collections; +import java.util.List; import java.util.Objects; import org.apache.pulsar.client.api.Messages; +import org.eclipse.microprofile.reactive.messaging.Message; public class PulsarIncomingBatchMessageMetadata { private final Messages delegate; + private final List> batchedMessages; public PulsarIncomingBatchMessageMetadata(Messages messages) { + this(messages, Collections.emptyList()); + } + + public PulsarIncomingBatchMessageMetadata(Messages messages, List> batchedMessages) { this.delegate = Objects.requireNonNull(messages, msg.isRequired("messages")); + this.batchedMessages = batchedMessages; } public Messages getMessages() { return (Messages) delegate; } + public List> getIncomingMessages() { + return batchedMessages; + } + + /** + * Get metadata object for the given Pulsar Message. + * This method is useful when you need to access metadata for a specific message in the batch. + * + * @param msg Pulsar message + * @param metadata metadata type class + * @param metadata type + * @return the metadata object for the given message + */ + public M getMetadataForMessage(org.apache.pulsar.client.api.Message msg, Class metadata) { + for (Message record : batchedMessages) { + if (record.getMetadata().get(PulsarIncomingMessageMetadata.class).orElseThrow().getMessage().equals(msg)) { + return record.getMetadata(metadata).orElse(null); + } + } + return null; + } + } diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java index 978716ddc8..f25c09ac3e 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/base/WeldTestBase.java @@ -34,6 +34,7 @@ import io.smallrye.reactive.messaging.pulsar.ack.PulsarMessageAck; import io.smallrye.reactive.messaging.pulsar.converters.KeyValueFromPulsarMessageExtractor; import io.smallrye.reactive.messaging.pulsar.converters.KeyValueFromPulsarMessageKeyValueExtractor; +import io.smallrye.reactive.messaging.pulsar.converters.PulsarBatchMessageConverter; import io.smallrye.reactive.messaging.pulsar.converters.PulsarMessageConverter; import io.smallrye.reactive.messaging.pulsar.fault.PulsarNack; import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionsFactory; @@ -79,6 +80,7 @@ public void initWeld() { weld.addBeanClass(PulsarMessageAck.Factory.class); weld.addBeanClass(PulsarNack.Factory.class); weld.addBeanClass(PulsarMessageConverter.class); + weld.addBeanClass(PulsarBatchMessageConverter.class); weld.addBeanClass(KeyValueFromPulsarMessageExtractor.class); weld.addBeanClass(KeyValueFromPulsarMessageKeyValueExtractor.class); weld.disableDiscovery(); diff --git a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java index 413bd23042..2eb1a07d7f 100644 --- a/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java +++ b/smallrye-reactive-messaging-pulsar/src/test/java/io/smallrye/reactive/messaging/pulsar/batch/PulsarBatchReceiveTest.java @@ -14,6 +14,8 @@ import jakarta.enterprise.inject.Produces; import org.apache.pulsar.client.api.BatchReceivePolicy; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -25,6 +27,8 @@ import io.smallrye.common.annotation.Identifier; import io.smallrye.reactive.messaging.pulsar.PulsarConnector; import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessageMetadata; +import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessageMetadata; import io.smallrye.reactive.messaging.pulsar.base.WeldTestBase; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @@ -62,6 +66,20 @@ void testBatchReceiveAppUsingPulsarConnector() { assertThat(app.getResults()).containsExactlyElementsOf(expected); } + @Test + void testBatchReceiveAppUsingPayloads() { + // Run app + MessagesConsumingApp app = runApplication(config(), MessagesConsumingApp.class); + long start = System.currentTimeMillis(); + + // Check for consumed messages in app + await().atMost(Duration.ofSeconds(30)).until(() -> app.getResults().size() == NUMBER_OF_MESSAGES); + long end = System.currentTimeMillis(); + + System.out.println("Ack - Estimate: " + (end - start) + " ms"); + assertThat(app.getResults()).containsExactlyElementsOf(expected); + } + @Test void testBatchReceiveAppWithCustomConfig() { addBeans(BatchConfig.class); @@ -119,4 +137,30 @@ public List getResults() { } } + @ApplicationScoped + public static class MessagesConsumingApp { + + private final List results = new CopyOnWriteArrayList<>(); + private final List allMetadata = new CopyOnWriteArrayList<>(); + + @Incoming("data") + public void consume(Messages batch, PulsarIncomingBatchMessageMetadata metadata) { + for (Message message : batch) { + PulsarIncomingMessageMetadata msgMetadata = metadata.getMetadataForMessage(message, + PulsarIncomingMessageMetadata.class); + assertThat(msgMetadata).isNotNull(); + allMetadata.add(msgMetadata); + results.add(message.getValue()); + } + } + + public List getResults() { + return results; + } + + public List getAllMetadata() { + return allMetadata; + } + } + }