Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class TopologyCreator {

public TopologyCreator(Sender sender, String queueType) {
this.sender = sender;
this.queueType = queueType != null ? queueType : "classic";
this.queueType = queueType;
}

public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange) {
Expand Down Expand Up @@ -90,14 +90,17 @@ public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, Optional<Integer> ma

protected QueueSpecification fillQueueType(QueueSpecification specification) {
String resolvedQueueType = this.queueType;
if (specification.isAutoDelete() || specification.isExclusive()) {
if ("quorum".equals(resolvedQueueType)
&& (specification.isAutoDelete() || specification.isExclusive())) {
resolvedQueueType = "classic";
}
Map<String, Object> args = specification.getArguments();
if (args == null) {
args = new HashMap<>();
}
args.put("x-queue-type", resolvedQueueType);
if (resolvedQueueType != null) {
args.put("x-queue-type", resolvedQueueType);
}
specification.arguments(args);
return specification;
}
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/reactive-commons/9-configuration-properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ app:
enabled: true # if you want to disable this domain you can set it to false
mandatory: false # if you want to enable mandatory messages, you can set it to true, this will throw an exception if the message cannot be routed to any queue
brokerType: "rabbitmq" # please don't change this value
queueType: classic # you can change the queue type to 'quorum' if your RabbitMQ cluster supports it
queueType: null # you can set to 'classic' or to 'quorum' if your RabbitMQ cluster supports it, by default it will take the virtual host default queue type
flux:
maxConcurrency: 250 # max concurrency of listener flow
domain:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ public class AsyncProps extends GenericAsyncProps<RabbitProperties> {
private String brokerType = "rabbitmq";

@Builder.Default
private String queueType = "classic"; // or "quorum"
private String queueType = null; // "classic" or "quorum"

}