Replies: 2 comments
-
Also posted on Stackoverflow: The AMQP clients do not provide any parallelism by message group id out of the box. You therefore need to manually dispatch event processing to a worker thread pool to process events in parallel. Mutiny APIs allow to do such operations. You can couple that with the KeyedMulti support to group messages by group id (available since Reactive Messaging 4.6.0). Here is a sample code : @Incoming("data")
@Outgoing("sink")
Multi<Integer> process(KeyedMulti<String, Integer> group) {
return group
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.map(i -> {
String id = group.key();
try {
Thread.sleep(20L * i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i + 1;
});
}
@ApplicationScoped
static class AmqpKeyValueExtractor implements KeyValueExtractor {
@Override
public boolean canExtract(Message<?> first, Type keyType, Type valueType) {
return first.getMetadata(IncomingAmqpMetadata.class).isPresent();
}
@Override
public Object extractKey(Message<?> message, Type keyType) {
return message.getMetadata(IncomingAmqpMetadata.class)
.map(IncomingAmqpMetadata::getGroupId)
.orElse(null);
}
@Override
public Object extractValue(Message<?> message, Type valueType) {
return message.getPayload();
}
} The The process method will be called for each new group id, and the method will use You'll notice that while the message order is preserved inside a group, it'll happen concurrently across all groups. |
Beta Was this translation helpful? Give feedback.
-
@ozangunalp, hello! For the sake of knowledge transfer, I discovered another approach: using transformToUniAndConcatenate() and multiple subscriptions for |
Beta Was this translation helpful? Give feedback.
-
Hello! I wish to handle incoming events using reactive-messaging-amqp and quarkus in such way as:
But in spite of setting groupId on the side of my producer, my consumer disrupts order.
I have prepared example project to demonstrate that:
https://github.com/crionuke/reactive-amqp-with-groups
Can you explain, is it a bug or I have missed/wrong something?
Producer
Consumer
I use transformToUniAndMerge in favor of transformToUniAndConcatenate because I need concurrent handlers.
Output.
You can see my subscriber get a list of all events at once and handle them in arbitrary order.
But in my opinion, the next message in a message group should come to the subscriber only after acknowledging the prev one.
Beta Was this translation helpful? Give feedback.
All reactions