Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.reactivecommons.async.rabbit;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.UUID;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class InstanceIdentifier {
private static final String INSTANCE_ID = UUID.randomUUID().toString().replace("-", "");

public static String getInstanceId(String kind) {
return getInstanceId(kind, INSTANCE_ID);
}

public static String getInstanceId(String kind, String defaultHost) {
String host = System.getenv("HOSTNAME");
if (host == null || host.isEmpty()) {
return defaultHost + "-" + kind;
}
return host + "-" + kind;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

@Log
/*
Direct use of channel is temporal, remove when https://github.com/reactor/reactor-rabbitmq/issues/37 is fixed in 1.0.0.RC2
Direct use of channel is temporal, remove when https://github.com/reactor/reactor-rabbitmq/issues/37 is fixed in 1.0
.0.RC2
*/
public class TopologyCreator {

private final Sender sender;
private final String queueType;

public TopologyCreator(Sender sender) {
public TopologyCreator(Sender sender, String queueType) {
this.sender = sender;
this.queueType = queueType != null ? queueType : "classic";
}

public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange) {
Expand All @@ -30,7 +33,7 @@ public Mono<AMQP.Exchange.DeclareOk> declare(ExchangeSpecification exchange) {
}

public Mono<AMQP.Queue.DeclareOk> declare(QueueSpecification queue) {
return sender.declare(queue)
return sender.declare(fillQueueType(queue))
.onErrorMap(TopologyDefException::new);
}

Expand Down Expand Up @@ -85,6 +88,20 @@ public Mono<AMQP.Queue.DeclareOk> declareQueue(String name, Optional<Integer> ma
return declare(specification);
}

protected QueueSpecification fillQueueType(QueueSpecification specification) {
String resolvedQueueType = this.queueType;
if (specification.isAutoDelete() || specification.isExclusive()) {
resolvedQueueType = "classic";
}
Map<String, Object> args = specification.getArguments();
if (args == null) {
args = new HashMap<>();
}
args.put("x-queue-type", resolvedQueueType);
specification.arguments(args);
return specification;
}

public static class TopologyDefException extends RuntimeException {
public TopologyDefException(Throwable cause) {
super(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredCommandHandl
throw new RuntimeException("Unknown handler type");
}

@Override
protected String getKind() {
return "commands";
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListene
}
throw new RuntimeException("Unknown handler type");
}

@Override
protected String getKind() {
return "events";
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,8 @@ private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListene
throw new RuntimeException("Unknown handler type");
}

@Override
protected String getKind() {
return "notifications";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
protected Object parseMessageForReporter(Message msj) {
return converter.readAsyncQueryStructure(msj);
}

@Override
protected String getKind() {
return "queries";
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import lombok.extern.java.Log;
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
import org.reactivecommons.async.rabbit.InstanceIdentifier;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;

import java.util.logging.Level;
Expand Down Expand Up @@ -46,10 +48,12 @@ public void startListening(String routeKey) {
if (createTopology) {
flow = creator.declare(exchange(exchangeName).type("topic").durable(true)).then();
}
ConsumeOptions consumeOptions = new ConsumeOptions();
consumeOptions.consumerTag(InstanceIdentifier.getInstanceId("replies"));
deliveryFlux = flow
.then(creator.declare(queue(queueName).durable(false).autoDelete(true).exclusive(true)))
.then(creator.bind(binding(exchangeName, routeKey, queueName)))
.thenMany(receiver.consumeAutoAck(queueName).doOnNext(delivery -> {
.thenMany(receiver.consumeAutoAck(queueName, consumeOptions).doOnNext(delivery -> {
try {
final String correlationID = delivery.getProperties().getHeaders().get(CORRELATION_ID).toString();
final boolean isEmpty = delivery.getProperties().getHeaders().get(COMPLETION_ONLY_SIGNAL) != null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.extern.java.Log;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.FallbackStrategy;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
import org.reactivecommons.async.rabbit.InstanceIdentifier;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
Expand Down Expand Up @@ -94,6 +96,7 @@ public void startListener() {

ConsumeOptions consumeOptions = new ConsumeOptions();
consumeOptions.qos(messageListener.getPrefetchCount());
consumeOptions.consumerTag(InstanceIdentifier.getInstanceId(getKind()));

if (createTopology) {
this.messageFlux = setUpBindings(messageListener.getTopologyCreator())
Expand Down Expand Up @@ -138,7 +141,8 @@ protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instan
return flow.doOnSuccess(o -> logExecution(executorPath, initTime, true))
.subscribeOn(scheduler).thenReturn(msj);
} catch (Exception e) {
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", msj.getProperties().getMessageId()));
log.log(Level.SEVERE, format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! " +
"Severe Warning! ", msj.getProperties().getMessageId()));
return Mono.error(e);
}
}
Expand Down Expand Up @@ -247,6 +251,8 @@ private Long getRetryNumber(AcknowledgableDelivery delivery) {
}

protected abstract Object parseMessageForReporter(Message msj);

protected abstract String getKind();
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.reactivecommons.async.rabbit;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class InstanceIdentifierTest {

@Test
void shouldGetInstanceIdFromUuid() {
String instanceId = InstanceIdentifier.getInstanceId("events");
var expectedLength = 39;
assertThat(instanceId).endsWith("-events").hasSize(expectedLength);
}

@Test
void shouldGetInstanceIdFromEnv() {
String instanceId = InstanceIdentifier.getInstanceId("events", "host123");
assertThat(instanceId).isEqualTo("host123-events");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.reactivecommons.async.rabbit.communications;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Sender;

import static org.assertj.core.api.Assertions.assertThat;

@ExtendWith(MockitoExtension.class)
class TopologyCreatorTest {
@Mock
private Sender sender;

private TopologyCreator creator;

@BeforeEach
void setUp() {
creator = new TopologyCreator(sender, "quorum");
}

@Test
void shouldInjectQueueType() {
QueueSpecification spec = creator.fillQueueType(QueueSpecification.queue("durable"));
assertThat(spec.getArguments()).containsEntry("x-queue-type", "quorum");
}

@Test
void shouldForceClassicQueueTypeWhenAutodelete() {
QueueSpecification spec = creator.fillQueueType(QueueSpecification.queue("autodelete").autoDelete(true));
assertThat(spec.getArguments()).containsEntry("x-queue-type", "classic");
}

@Test
void shouldForceClassicQueueTypeWhenExclusive() {
QueueSpecification spec = creator.fillQueueType(QueueSpecification.queue("exclusive").exclusive(true));
assertThat(spec.getArguments()).containsEntry("x-queue-type", "classic");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
Expand All @@ -29,6 +30,8 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.reactivecommons.async.commons.Headers.COMPLETION_ONLY_SIGNAL;
Expand Down Expand Up @@ -165,7 +168,7 @@ private Delivery buildDeliveryWithHeaders(Map<String, Object> headers) {
private void instructConsumerMock(Flux<Delivery> initialSource, Flux<Delivery> newSource) {
AtomicReference<Flux<Delivery>> sourceReference = new AtomicReference<>(initialSource);

when(receiver.consumeAutoAck(REPLY_QUEUE))
when(receiver.consumeAutoAck(anyString(), any(ConsumeOptions.class)))
.thenAnswer(invocation -> Flux.defer(() -> sourceReference.getAndSet(newSource)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ protected String getExecutorPath(AcknowledgableDelivery msj) {
protected Object parseMessageForReporter(Message msj) {
return null;
}

@Override
protected String getKind() {
return "stub";
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion docs/.nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v20.11.1
v22.15.0
1 change: 1 addition & 0 deletions docs/docs/reactive-commons/9-configuration-properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ app:
enabled: true # if you want to disable this domain you can set it to false
mandatory: false # if you want to enable mandatory messages, you can set it to true, this will throw an exception if the message cannot be routed to any queue
brokerType: "rabbitmq" # please don't change this value
queueType: classic # you can change the queue type to 'quorum' if your RabbitMQ cluster supports it
flux:
maxConcurrency: 250 # max concurrency of listener flow
domain:
Expand Down
Loading