Skip to content

Commit

Permalink
Added access to individual messages from incoming batch metadata for …
Browse files Browse the repository at this point in the history
…Kafka and Pulsar
  • Loading branch information
ozangunalp committed Sep 17, 2024
1 parent 8ad49bc commit 1237798
Show file tree
Hide file tree
Showing 16 changed files with 210 additions and 39 deletions.
5 changes: 2 additions & 3 deletions documentation/src/main/docs/jms/receiving-jms-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ If the target type is a primitive type ort `String`, the resulting
message contains the mapped payload.

If the target type is a class, the object is built using included JSON
deserializer (JSON-B and Jackson provided OOB, for more details see
[Serde](serde)), from the `JMSType`. If not, the default behavior is
used (Java deserialization).
deserializer (JSON-B and Jackson provided OOB from the `JMSType`.
If not, the default behavior is used (Java deserialization).

## Inbound Metadata

Expand Down
8 changes: 4 additions & 4 deletions documentation/src/main/docs/jms/sending-jms-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ as `String` and the `JMSType` is set to the target class. The
If the payload is a `byte[]`, it’s passed as `byte[]` in a JMS
`BytesMessage`.

Otherwise, the payload is encoded using included JSON serializer (JSON-B
and Jackson provided OOB, for more details see [Serde](serde)). The
`JMSType` is set to the target class. The `_classname` property is also
set. The JMS Message is a `TextMessage`.
Otherwise, the payload is encoded using included JSON serializer (JSON-B and Jackson provided OOB.
The `JMSType` is set to the target class.
The `_classname` property is also set.
The JMS Message is a `TextMessage`.

For example, the following code serialize the produced `Person` using
JSON-B.
Expand Down
13 changes: 11 additions & 2 deletions documentation/src/main/docs/kafka/receiving-kafka-records.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,7 @@ container type to receive all the data:
{{ insert('kafka/inbound/KafkaRecordBatchPayloadExample.java', 'code') }}
```

The incoming method can also receive `Message<List<Payload>`,
`KafkaBatchRecords<Payload>` `ConsumerRecords<Key, Payload>` types, They
The incoming method can also receive `Message<List<Payload>` or `ConsumerRecords<Key, Payload>` types, They
give access to record details such as offset or timestamp :

``` java
Expand All @@ -618,6 +617,16 @@ Conversely, if the processing throws an exception, all messages are
*nacked*, applying the failure strategy for all the records inside the
batch.

### Accessing metadata of batch records

When receiving records in batch mode, the metadata of each record is accessible through the `IncomingKafkaRecordBatchMetadata` :

``` java
{{ insert('kafka/inbound/KafkaRecordBatchExample.java', 'batch') }}
```

Like in this example, this can be useful to propagate the tracing information of each record.

## Manual topic-partition assignment

The default behavior of Kafka incoming channels is to subscribe to one or more topics in order to receive records from the Kafka broker.
Expand Down
14 changes: 12 additions & 2 deletions documentation/src/main/docs/pulsar/receiving-pulsar-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,26 @@ mp.messaging.incoming.data.subscriptionType=Shared
By default, incoming methods receive each Pulsar message individually.
You can enable batch mode using `batchReceive=true` property, or setting a `batchReceivePolicy` in consumer configuration.

```java
``` java
{{ insert('pulsar/inbound/PulsarMessageBatchExample.java', 'code') }}
```

Or you can directly receive the list of payloads to the consume method:

```java
``` java
{{ insert('pulsar/inbound/PulsarMessageBatchPayloadExample.java', 'code') }}
```

### Accessing metadata of batch records

When receiving records in batch mode, the metadata of each record is accessible through the `PulsarIncomingBatchMessageMetadata` :

``` java
{{ insert('pulsar/inbound/PulsarMessageBatchExample.java', 'batch') }}
```

Like in this example, this can be useful to propagate the tracing information of each record.

## Configuration Reference

{{ insert('../../../target/connectors/smallrye-pulsar-incoming.md') }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package kafka.inbound;

import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -9,27 +9,25 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata;

@ApplicationScoped
public class KafkaRecordBatchExample {

// <code>
@Incoming("prices")
public CompletionStage<Void> consumeMessage(KafkaRecordBatch<String, Double> records) {
for (KafkaRecord<String, Double> record : records) {
record.getMetadata(IncomingKafkaRecordMetadata.class).ifPresent(metadata -> {
int partition = metadata.getPartition();
long offset = metadata.getOffset();
Instant timestamp = metadata.getTimestamp();
//... process messages
});
public CompletionStage<Void> consumeMessage(Message<List<Double>> msg) {
IncomingKafkaRecordBatchMetadata<String, Double> batchMetadata = msg.getMetadata(IncomingKafkaRecordBatchMetadata.class).get();
for (ConsumerRecord<String, Double> record : batchMetadata.getRecords()) {
int partition = record.partition();
long offset = record.offset();
long timestamp = record.timestamp();
}
// ack will commit the latest offsets (per partition) of the batch.
return records.ack();
return msg.ack();
}

@Incoming("prices")
Expand All @@ -42,4 +40,19 @@ public void consumeRecords(ConsumerRecords<String, Double> records) {
}
// </code>

// <batch>
@Incoming("prices")
public void consumeRecords(ConsumerRecords<String, Double> records, IncomingKafkaRecordBatchMetadata<String, Double> metadata) {
for (TopicPartition partition : records.partitions()) {
for (ConsumerRecord<String, Double> record : records.records(partition)) {
TracingMetadata tracing = metadata.getMetadataForRecord(record, TracingMetadata.class);
if (tracing != null) {
tracing.getCurrentContext().makeCurrent();
}
//... process messages
}
}
}
// </batch>

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.reactive.messaging.TracingMetadata;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessageMetadata;

@ApplicationScoped
Expand All @@ -29,11 +30,26 @@ public CompletionStage<Void> consumeMessage(Message<List<Double>> messages) {
}

@Incoming("prices")
public void consumeRecords(org.apache.pulsar.client.api.Messages<Double> messages) {
public void consumeMessages(org.apache.pulsar.client.api.Messages<Double> messages) {
for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
//... process messages
}
}
// </code>


// <batch>
@Incoming("prices")
public void consumeMessages(org.apache.pulsar.client.api.Messages<Double> messages,
PulsarIncomingBatchMessageMetadata metadata) {
for (org.apache.pulsar.client.api.Message<Double> message : messages) {
TracingMetadata tracing = metadata.getMetadataForMessage(message, TracingMetadata.class);
if (tracing != null) {
tracing.getCurrentContext().makeCurrent();
}
//... process messages
}
}
// </batch>

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.smallrye.reactive.messaging.kafka.api;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.microprofile.reactive.messaging.Message;

/**
* Contains information about the batch of messages received from a channel backed by Kafka.
Expand All @@ -24,16 +27,24 @@ public class IncomingKafkaRecordBatchMetadata<K, T> {
private final int index;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final int consumerGroupGenerationId;
private final List<Message<?>> batchedMessages;

public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, String channel, int index,
Map<TopicPartition, OffsetAndMetadata> offsets, int consumerGroupGenerationId) {
public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, List<Message<?>> batchedMessages,
String channel, int index, Map<TopicPartition, OffsetAndMetadata> offsets,
int consumerGroupGenerationId) {
this.records = records;
this.batchedMessages = batchedMessages;
this.channel = channel;
this.index = index;
this.offsets = Collections.unmodifiableMap(offsets);
this.consumerGroupGenerationId = consumerGroupGenerationId;
}

public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, String channel, int index,
Map<TopicPartition, OffsetAndMetadata> offsets, int consumerGroupGenerationId) {
this(records, Collections.emptyList(), channel, index, offsets, consumerGroupGenerationId);
}

public IncomingKafkaRecordBatchMetadata(ConsumerRecords<K, T> records, String channel,
Map<TopicPartition, OffsetAndMetadata> offsets) {
this(records, channel, -1, offsets, -1);
Expand Down Expand Up @@ -81,4 +92,29 @@ public int getConsumerIndex() {
public int getConsumerGroupGenerationId() {
return consumerGroupGenerationId;
}

/**
* @return batched messages
*/
public List<Message<?>> getBatchedMessages() {
return batchedMessages;
}

/**
* Get metadata object for the given record.
* This method is useful when you need to access metadata for a specific record in the batch.
*
* @param rec Kafka consumer record
* @param metadata metadata type class
* @return metadata object for the given record
* @param <M> metadata type
*/
public <M> M getMetadataForRecord(ConsumerRecord<K, T> rec, Class<M> metadata) {
for (Message<?> record : batchedMessages) {
if (record.getMetadata().get(IncomingKafkaRecordMetadata.class).orElseThrow().getRecord().equals(rec)) {
return record.getMetadata(metadata).orElse(null);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -50,8 +51,10 @@ public IncomingKafkaRecordBatch(ConsumerRecords<K, T> records, String channel, i
generationId = entry.getValue().getConsumerGroupGenerationId();
offsets.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().getOffset()));
}
// This is safe because the IncomingKafkaRecord is Message
List<Message<?>> batchedRecords = (List<Message<?>>) (List) this.incomingRecords;
this.metadata = captureContextMetadata(
new IncomingKafkaRecordBatchMetadata<>(records, channel, index, offsets, generationId));
new IncomingKafkaRecordBatchMetadata<>(records, batchedRecords, channel, index, offsets, generationId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.junit.jupiter.api.Test;

import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordBatchMetadata;
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
import io.smallrye.reactive.messaging.kafka.base.KafkaMapBasedConfig;

Expand Down Expand Up @@ -93,6 +94,13 @@ void testIncomingConsumingMessageWithMetadata() {
assertThat(r.value()).startsWith("v-");
assertThat(r.key()).startsWith("k");
});
assertThat(bean.metadata()).allSatisfy(m -> {
assertThat(m.getBatchedMessages()).isNotEmpty();
assertThat(m.getRecords()).allSatisfy(r -> {
assertThat(m.getMetadataForRecord(r, IncomingKafkaRecordMetadata.class)).isNotNull()
.extracting(IncomingKafkaRecordMetadata::getRecord).isEqualTo(r);
});
});
}

@ApplicationScoped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.smallrye.reactive.messaging.providers.helpers.AcknowledgementCoordinator;
import io.smallrye.reactive.messaging.providers.helpers.ClassUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging;
import mutiny.zero.flow.adapters.AdaptersToFlow;

@SuppressWarnings("ReactiveStreamsUnusedPublisher")
Expand Down Expand Up @@ -494,8 +493,9 @@ private Uni<? extends Message<Object>> handlePostInvocationWithMessage(Message<?
} else if (res != null) {
if (isPostAck()) {
// Here we chain the outgoing message to the incoming, but maybe the message has already been (n)acked
return Uni.createFrom().item((Message<Object>) res.withAckWithMetadata(m -> res.ack(m).thenCompose(x -> in.ack(m)))
.withNackWithMetadata((t, m) -> res.nack(t, m).thenCompose(x -> in.nack(t, m))));
return Uni.createFrom()
.item((Message<Object>) res.withAckWithMetadata(m -> res.ack(m).thenCompose(x -> in.ack(m)))
.withNackWithMetadata((t, m) -> res.nack(t, m).thenCompose(x -> in.nack(t, m))));
}

return Uni.createFrom().item((Message<Object>) res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayload() throws
}

@Test
public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningCompletionStageOfMessage() throws InterruptedException {
public void testThatMessagesAreAckedAfterSuccessfulProcessingOfPayloadReturningCompletionStageOfMessage()
throws InterruptedException {
addBeanClass(SuccessfulPayloadProcessorCompletionStageOfMessage.class);
initialize();
Emitter<String> emitter = get(EmitterBean.class).emitter();
Expand Down Expand Up @@ -195,7 +196,6 @@ public Uni<String> process(String s) {

}


@ApplicationScoped
public static class SuccessfulPayloadProcessorCompletionStageOfMessage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws
assertThat(nacked).hasSize(0);
}


@Test
public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessage() throws InterruptedException {
addBeanClass(SuccessfulMessageToMessageProcessor.class);
Expand All @@ -102,7 +101,8 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningM
}

@Test
public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessing() throws InterruptedException {
public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessing()
throws InterruptedException {
addBeanClass(SuccessfulMessageToMessageProcessorPostProcessing.class);
initialize();
Emitter<String> emitter = get(EmitterBean.class).emitter();
Expand All @@ -118,9 +118,9 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningM
assertThat(nacked).hasSize(0);
}


@Test
public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessingDuplicate() throws InterruptedException {
public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningMessagePostProcessingDuplicate()
throws InterruptedException {
addBeanClass(SuccessfulMessageToMessageProcessorPostProcessingDuplicate.class);
initialize();
Emitter<String> emitter = get(EmitterBean.class).emitter();
Expand All @@ -136,7 +136,6 @@ public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessageReturningM
assertThat(nacked).hasSize(0);
}


@Test
public void testThatMessagesAreNackedAfterFailingProcessingOfMessageReturningMessage() throws InterruptedException {
addBeanClass(FailingMessageToMessageProcessor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public PulsarIncomingBatchMessage(Messages<T> messages, PulsarAckHandler ackHand
}
this.incomingMessages = Collections.unmodifiableList(incomings);
this.payload = Collections.unmodifiableList(payloads);
this.metadata = captureContextMetadata(new PulsarIncomingBatchMessageMetadata(messages));
List<PulsarIncomingMessage<?>> batchedMessages = (List<PulsarIncomingMessage<?>>) (List) incomings;
this.metadata = captureContextMetadata(new PulsarIncomingBatchMessageMetadata(messages, batchedMessages));
}

@Override
Expand Down
Loading

0 comments on commit 1237798

Please sign in to comment.