diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/DiscardNotifier.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/DiscardNotifier.java index f698809b..4cf50eb9 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/DiscardNotifier.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/DiscardNotifier.java @@ -3,6 +3,7 @@ import org.reactivecommons.async.commons.communications.Message; import reactor.core.publisher.Mono; +@FunctionalInterface public interface DiscardNotifier { Mono notifyDiscard(Message message); } diff --git a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/matcher/Matcher.java b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/matcher/Matcher.java index d7e5e1f4..c0795a9c 100644 --- a/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/matcher/Matcher.java +++ b/async/async-commons/src/main/java/org/reactivecommons/async/commons/utils/matcher/Matcher.java @@ -2,6 +2,7 @@ import java.util.Set; +@FunctionalInterface public interface Matcher { String match(Set sources, String target); } diff --git a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java index b370e6bf..9600529b 100644 --- a/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java +++ b/async/async-rabbit/src/test/java/org/reactivecommons/async/rabbit/communications/UnroutableMessageNotifierTest.java @@ -44,9 +44,9 @@ class UnroutableMessageNotifierTest { @BeforeEach void setUp() { - // Usar el constructor por defecto y espiar el sink interno + // Use the default constructor and spy on the internal sink unroutableMessageNotifier = new UnroutableMessageNotifier(); - // Inyectar el mock del sink usando un spy para poder verificarlo + // Inject the sink mock using a spy to verify it try { java.lang.reflect.Field sinkField = UnroutableMessageNotifier.class.getDeclaredField("sink"); sinkField.setAccessible(true); diff --git a/build.gradle b/build.gradle index e504f49c..9504177c 100644 --- a/build.gradle +++ b/build.gradle @@ -13,9 +13,9 @@ buildscript { plugins { id 'jacoco' id 'org.sonarqube' version '6.3.1.5724' - id 'org.springframework.boot' version '3.5.5' apply false + id 'org.springframework.boot' version '3.5.6' apply false id 'io.github.gradle-nexus.publish-plugin' version '2.0.0' - id 'co.com.bancolombia.cleanArchitecture' version '3.25.0' + id 'co.com.bancolombia.cleanArchitecture' version '3.26.1' } repositories { diff --git a/docs/docs/migration-guides.md b/docs/docs/migration-guides.md new file mode 100644 index 00000000..acdcbee8 --- /dev/null +++ b/docs/docs/migration-guides.md @@ -0,0 +1,229 @@ +--- +sidebar_position: 4 +--- + +# Migration + +## From 5.x.x to 6.x.x + +### New Features + +- **Connection customization:** You can now customize the RabbitMQ connection by defining a + `ConnectionFactoryCustomizer` bean. For more details, + see [Customizing the connection](/reactive-commons-java/docs/reactive-commons/configuration_properties/rabbitmq#customizing-the-connection). + +```java title="Programmatic configuration" + +@Bean +public ConnectionFactoryCustomizer connectionFactoryCustomizer() { + return (ConnectionFactoryCustomizer) (asyncProps, connectionFactory) -> { + connectionFactory.setExceptionHandler(new MyCustomExceptionHandler()); // Optional custom exception handler + connectionFactory.setCredentialsProvider(new MyCustomCredentialsProvider()); // Optional custom credentials provider + return connectionFactory; + }; +} +``` + +### Change notes + +- The configuration property `listenReplies` for RabbitMQ now defaults to `null`. Previously, it was `true`, causing all + applications to subscribe to a reply queue even when not needed. +- The domain `app` is now **required**. If not defined, the application will fail to start. + +### Actions + +- If your application uses the ReqReply pattern, you must explicitly set `app.async.app.listenReplies` to `true`. + Otherwise, it should be `false` to avoid unnecessary resource usage: + +```yaml title="application.yaml" +app: + async: + app: + listenReplies: true # set to true if ReqReply is required, false if not +``` + +```java title="Programmatic configuration" +@Configuration +public class MyDomainConfig { + + @Bean + @Primary + public AsyncRabbitPropsDomainProperties customDomainProperties() { + RabbitProperties propertiesApp = new RabbitProperties(); + // Additional connection configuration goes here... + return AsyncRabbitPropsDomainProperties.builder() + .withDomain("app", AsyncProps.builder() + .connectionProperties(propertiesApp) + .listenReplies(Boolean.TRUE) // set to true if ReqReply is required, false if not + .build()) + .build(); + } +} +``` + +--- + +- The domain `app` must be defined in your configuration. Otherwise, the application will throw an exception at startup: + +```yaml title="application.yaml" +app: + async: + app: # Configure the 'app' domain + # domain configuration goes here +``` + +```java title="Programmatic configuration" +@Configuration +public class MyDomainConfig { + + @Bean + @Primary + public AsyncRabbitPropsDomainProperties customDomainProperties() { + RabbitProperties propertiesApp = new RabbitProperties(); + // Additional connection configuration goes here... + return AsyncRabbitPropsDomainProperties.builder() + .withDomain("app", AsyncProps.builder() // Configure the 'app' domain + .connectionProperties(propertiesApp) + .build()) + .build(); + } +} +``` + +## From 4.x.x to 5.x.x + +### New Features + +- **Support for multiple brokers:** It is now possible to configure and connect to up to two brokers simultaneously, + using + independent domains in the configuration. + +### Change notes + +- Configuration properties are now defined per domain, allowing each to have its own properties and connection + settings. +- The broker connection is no longer manually defined in the code. It is now automatically managed based on the + configuration declared in the `application.yaml` file or through programmatic configuration. + +### Actions + +- The `app` domain needs to be defined to specify the configuration properties. + +Before: + +```yaml title="application.yaml" +app: + async: + withDLQRetry: true + maxRetries: 1 + retryDelay: 1000 +``` + +Now: + +```yaml title="application.yaml" +app: + async: + app: # this is the name of the default domain + withDLQRetry: true + maxRetries: 1 + retryDelay: 1000 +``` + +- Migrate the connection configuration: + +Before: the connection was defined manually in a Java class, as shown below: + +```java +@Log4j2 +@Configuration +@RequiredArgsConstructor +public class MyDomainConfig { + + private final RabbitMQConnectionProperties properties; + private static final String TLS = "TLSv1.2"; + private static final String FAIL_MSG = "Error creating ConnectionFactoryProvider in enroll"; + + @Primary + @Bean + public ConnectionFactoryProvider getConnectionFactoryProvider() { + final var factory = new ConnectionFactory(); + var map = PropertyMapper.get(); + map.from(properties::hostname).whenNonNull().to(factory::setHost); + map.from(properties::port).to(factory::setPort); + map.from(properties::username).whenNonNull().to(factory::setUsername); + map.from(properties::password).whenNonNull().to(factory::setPassword); + map.from(properties::ssl).whenTrue().as(isSsl -> factory).to(this::configureSsl); + return () -> factory; + } + + private void configureSsl(ConnectionFactory factory) { + try { + var sslContext = SSLContext.getInstance(TLS); + var trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + trustManagerFactory.init((KeyStore) null); + sslContext.init(null, trustManagerFactory.getTrustManagers(), null); + factory.useSslProtocol(sslContext); + } catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException e) { + log.error("{}: {}", FAIL_MSG, e); + } + } +} +``` + +Now: the connection is configured directly in the `application.yaml` file per domain: + +```yaml title="application.yaml" +app: + async: + app: # this is the name of the default domain + connectionProperties: # you can override the connection properties of each domain + host: localhost + port: 5672 + username: guest + password: guest + virtual-host: / + # Another domain can be configured with same properties structure that app + accounts: # this is a second domain name and can have another independent setup + connectionProperties: # you can override the connection properties of each domain + host: localhost + port: 5672 + username: guest + password: guest + virtual-host: /accounts +``` + +Domains can also be configured programmatically: + +```java title="Programmatic configuration" +@Configuration +public class MyDomainConfig { + + @Bean + @Primary + public AsyncRabbitPropsDomainProperties customDomainProperties() { + RabbitProperties propertiesApp = new RabbitProperties(); + propertiesApp.setHost("localhost"); + propertiesApp.setPort(5672); + propertiesApp.setVirtualHost("/"); + propertiesApp.setUsername("guest"); + propertiesApp.setPassword("guest"); + + RabbitProperties propertiesAccounts = new RabbitProperties(); + propertiesAccounts.setHost("localhost"); + propertiesAccounts.setPort(5672); + propertiesAccounts.setVirtualHost("/accounts"); + propertiesAccounts.setUsername("guest"); + propertiesAccounts.setPassword("guest"); + + return AsyncRabbitPropsDomainProperties.builder() + .withDomain("app", AsyncProps.builder() + .connectionProperties(propertiesApp) + .build()) + .withDomain("accounts", AsyncProps.builder() + .connectionProperties(propertiesAccounts) + .build()) + .build(); + } +} +``` \ No newline at end of file diff --git a/docs/docs/reactive-commons/1-getting-started.md b/docs/docs/reactive-commons/1-getting-started.md index 4d400ca8..56efc5d7 100644 --- a/docs/docs/reactive-commons/1-getting-started.md +++ b/docs/docs/reactive-commons/1-getting-started.md @@ -17,7 +17,7 @@ Commons. You need Java JRE installed (Java 17 or later). -You also need to install RabbitMQ. Follow the [instructions from the website](https://www.rabbitmq.com/download.html) +You also need to install RabbitMQ. Follow the [instructions from the website](https://www.rabbitmq.com/download.html). ## Start RabbitMQ @@ -50,7 +50,8 @@ dependencies { } ``` -Note: If you will use Cloud Events, you should include the Cloud Events dependency: +:::tip +If you will use Cloud Events, you should include the Cloud Events dependency: ```groovy dependencies { @@ -58,7 +59,7 @@ dependencies { } ``` -```groovy +::: ### Configuration properties diff --git a/docs/docs/reactive-commons/3-sending-a-command.md b/docs/docs/reactive-commons/3-sending-a-command.md index 98cfc7ad..84e45599 100644 --- a/docs/docs/reactive-commons/3-sending-a-command.md +++ b/docs/docs/reactive-commons/3-sending-a-command.md @@ -1,5 +1,5 @@ --- -sidebar_position: 2 +sidebar_position: 3 --- # Sending a Command diff --git a/docs/docs/reactive-commons/_category_.json b/docs/docs/reactive-commons/_category_.json index ed74895f..4f42bc0a 100644 --- a/docs/docs/reactive-commons/_category_.json +++ b/docs/docs/reactive-commons/_category_.json @@ -1,8 +1,9 @@ { "label": "Reactive Commons", "position": 2, + "collapsed": false, "link": { "type": "generated-index", "description": "Learn how to build reactive systems using the Reactive Commons library." } -} +} \ No newline at end of file diff --git a/docs/docs/reactive-commons/9-configuration-properties.md b/docs/docs/reactive-commons/configuration_properties/1-rabbitmq.md similarity index 65% rename from docs/docs/reactive-commons/9-configuration-properties.md rename to docs/docs/reactive-commons/configuration_properties/1-rabbitmq.md index d9a122e7..af28a614 100644 --- a/docs/docs/reactive-commons/9-configuration-properties.md +++ b/docs/docs/reactive-commons/configuration_properties/1-rabbitmq.md @@ -1,16 +1,10 @@ --- -sidebar_position: 8 +sidebar_position: 1 --- -# Configuration Properties +# RabbitMQ Configuration -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; - - - - -You can customize some predefined variables of Reactive Commons +You can customize some predefined variables of Reactive Commons. This can be done by Spring Boot `application.yaml` or by overriding the [AsyncProps](https://github.com/reactive-commons/reactive-commons-java/blob/master/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java) @@ -23,10 +17,10 @@ app: withDLQRetry: false # if you want to have dlq queues with retries you can set it to true, you cannot change it after queues are created, because you will get an error, so you should delete topology before the change. maxRetries: -1 # -1 will be considered default value. When withDLQRetry is true, it will be retried 10 times. When withDLQRetry is false, it will be retried indefinitely. retryDelay: 1000 # interval for message retries, with and without DLQRetry - listenReplies: true # if you will not use ReqReply patter you can set it to false + listenReplies: null # Allows true or false values. If you're using the ReqReply pattern, set it to true. If you don't, set it to false. createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process. delayedCommands: false # Enable to send a delayed command to an external target - prefetchCount: 250 # is the maximum number of in flight messages you can reduce it to process less concurrent messages, this settings acts per instance of your service + prefetchCount: 250 # is the maximum number of in flight messages you can reduce it to process less concurrent messages, this setting acts per instance of your service useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain enabled: true # if you want to disable this domain you can set it to false mandatory: false # if you want to enable mandatory messages, you can set it to true, this will throw an exception if the message cannot be routed to any queue @@ -64,7 +58,12 @@ app: virtual-host: /accounts ``` -You can override this settings programmatically through a `AsyncRabbitPropsDomainProperties` bean. +You can override this settings programmatically through an `AsyncRabbitPropsDomainProperties` bean: + +:::caution[Mandatory `app` Domain Configuration] +To ensure a correct configuration, you should always override the properties of the `app` domain. If it is not +configured, an exception will be thrown. You can also add properties for additional custom domain if needed. +::: ```java package sample; @@ -169,7 +168,7 @@ import java.lang.reflect.GenericArrayType; @Log4j2 @Configuration -public class AsyncEventBusConfig { +public class RabbitMQConfig { // TODO: You should create the GenericManager bean as indicated in Secrets Manager library @Bean @@ -189,93 +188,146 @@ public class AsyncEventBusConfig { return genericManager.getSecret(secretName, RabbitConnectionProperties.class).toRabbitProperties(); } } - ``` - - - You can customize some predefined variables of Reactive Commons +## Customizing the connection -This can be done by Spring Boot `application.yaml` or by overriding -the [AsyncKafkaProps](https://github.com/reactive-commons/reactive-commons-java/blob/master/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/AsyncKafkaProps.java) -bean. +For advanced control over the RabbitMQ connection, you can define a `ConnectionFactoryCustomizer` bean. This allows you +to configure options that are not exposed through standard properties, such as custom timeouts, SSL/TLS settings, +or automatic recovery strategies: -```yaml -reactive: - commons: - kafka: - app: # this is the name of the default domain - withDLQRetry: false # if you want to have dlq queues with retries you can set it to true, you cannot change it after queues are created, because you will get an error, so you should delete topology before the change. - maxRetries: -1 # -1 will be considered default value. When withDLQRetry is true, it will be retried 10 times. When withDLQRetry is false, it will be retried indefinitely. - retryDelay: 1000 # interval for message retries, with and without DLQRetry - checkExistingTopics: true # if you don't want to verify topic existence before send a record you can set it to false - createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process. - useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain - enabled: true # if you want to disable this domain you can set it to false - brokerType: "kafka" # please don't change this value - domain: - ignoreThisListener: false # Allows you to disable event listener for this specific domain - connectionProperties: # you can override the connection properties of each domain - bootstrap-servers: localhost:9092 - # Another domain can be configured with same properties structure that app - accounts: # this is a second domain name and can have another independent setup - connectionProperties: # you can override the connection properties of each domain - bootstrap-servers: localhost:9093 +```java + +@Bean +public ConnectionFactoryCustomizer connectionFactoryCustomizer() { + return (ConnectionFactoryCustomizer) (asyncProps, connectionFactory) -> { + connectionFactory.setExceptionHandler(new MyCustomExceptionHandler()); // Optional custom exception handler + connectionFactory.setCredentialsProvider(new MyCustomCredentialsProvider()); // Optional custom credentials provider + return connectionFactory; + }; +} ``` -You can override this settings programmatically through a `AsyncKafkaPropsDomainProperties` bean. +## Connections and channels -```java -package sample; +Reactive Commons establishes **a single connection to the RabbitMQ broker**, which is reused for all messaging +operations, both sending and listening. However, the number of open **channels** within that connection varies depending +on the enabled annotations and the type of interaction (sending, listening, or both). Each scenario described below +shows how the number of channels changes according to the applied configuration. -import org.reactivecommons.async.kafka.config.KafkaProperties; -import org.reactivecommons.async.kafka.config.props.AsyncProps; -import org.reactivecommons.async.kafka.config.props.AsyncKafkaPropsDomainProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; +In the context of this documentation, a domain refers to a connection with a broker. The configuration supports up to +two brokers, which means the described scenarios are limited to a maximum of two domains. -@Configuration -public class MyDomainConfig { +### Annotations used in the tables - @Bean - @Primary - public AsyncKafkaPropsDomainProperties customKafkaDomainProperties() { - KafkaProperties propertiesApp = new KafkaProperties(); - propertiesApp.setBootstrapServers(List.of("localhost:9092")); +**[1] Annotations for sending messages:** - KafkaProperties propertiesAccounts = new KafkaProperties(); - propertiesAccounts.setBootstrapServers(List.of("localhost:9093")); +- `@EnableDomainEventBus` to send [domain events](/reactive-commons-java/docs/reactive-commons/sending-a-domain-event). +- `@EnableDirectAsyncGateway` to send [commands](/reactive-commons-java/docs/reactive-commons/sending-a-command) + and [asynchronous queries](/reactive-commons-java/docs/reactive-commons/making-an-async-query). - return AsyncKafkaPropsDomainProperties.builder() - .withDomain("app", AsyncProps.builder() - .connectionProperties(propertiesApp) - .build()) - .withDomain("accounts", AsyncProps.builder() - .connectionProperties(propertiesAccounts) - .build()) - .build(); - } -} -``` +**[2] Annotations for listening to messages:** -Additionally, if you want to set only connection properties you can use the `AsyncKafkaPropsDomain.KafkaSecretFiller` -class. +- `@EnableEventListeners` to + listen [domain events](/reactive-commons-java/docs/reactive-commons/handling-domain-events). +- `@EnableCommandListeners` to listen [commands](/reactive-commons-java/docs/reactive-commons/handling-commands). +- `@EnableQueryListeners` to serve [async queries](/reactive-commons-java/docs/reactive-commons/serving-async-queries). -```java +### 1. Sending messages (single domain) -@Bean -@Primary -public AsyncKafkaPropsDomain.KafkaSecretFiller customKafkaFiller() { - return (domain, asyncProps) -> { - // customize asyncProps here by domain - }; -} -``` +> In this scenario we only use annotations to enable message sending only, along with different configurations for the +`listenReplies` property: + +| Enabled annotations | listenReplies | Broker | Connections | Channels | +|----------------------------|---------------|------------|-------------|----------| +| One or all for sending [1] | true | Broker app | 1 | 13 | +| | false | Broker app | 1 | 11 | + +### 2. Sending messages (multiple domains) + +> In this scenario, we only send messages to two brokers, using one or all of the annotations and configurations for the +`listenReplies` property: + +| Enabled annotations | listenReplies | Broker | Connections | Channels | +|----------------------------|---------------|-------------------|-------------|----------| +| One or all for sending [1] | true | Broker app | 1 | 18 | +| | | Additional broker | 1 | 8 | +| One or all for sending [1] | false | Broker app | 1 | 16 | +| | | Additional broker | 1 | 6 | + +### 3. Listening for messages (single domain) + +> This scenario enables only listening for messages from a single broker, using one or all available annotations: + +| Enabled annotations | Broker | Connections | Channels | +|------------------------------|------------|-------------|----------| +| One or all for listening [2] | Broker app | 1 | 14 | + +### 4. Listening for messages (multiple domains) + +> In this scenario, messages are listened to from two brokers, with variations in the annotations enabled: + +| Enabled annotations | Broker | Connections | Channels | +|-----------------------|-------------------|-------------|----------| +| All for listening [2] | Broker app | 1 | 19 | +| | Additional broker | 1 | 8 | +| Two for listening [2] | Broker app | 1 | 18 | +| | Additional broker | 1 | 8 | +| One for listening [2] | Broker app | 1 | 17 | +| | Additional broker | 1 | 7 | + +### 5. Sending and listening for messages (single domain) + +> This scenario enables both sending and listening for messages on a single broker, with all annotations enabled: + +| Enabled annotations | Broker | Connections | Channels | +|-----------------------------------------------|------------|-------------|----------| +| All for sending [1] and all for listening [2] | Broker app | 1 | 16 | + +### 6. Sending and listening for messages (multiple domains) + +> In this scenario, messages are sent and listened from two brokers, with variations in the annotations enabled: + +| Enabled annotations | Broker | Connections | Channels | +|-----------------------------------------------|-------------------|-------------|----------| +| All for sending [1] and all for listening [2] | Broker app | 1 | 21 | +| | Additional broker | 1 | 10 | +| One for sending [1] and all for listening [2] | Broker app | 1 | 20 | +| | Additional broker | 1 | 9 | +| All for sending [1] and two for listening [2] | Broker app | 1 | 20 | +| | Additional broker | 1 | 10 | +| All for sending [1] and one for listening [2] | Broker app | 1 | 19 | +| | Additional broker | 1 | 8 | +| All for sending [1] | Broker app | 1 | 16 | +| | Additional broker | 1 | 6 | - - +### Recommendations -## Mandatory property in RabbitMQ +- **Resource Optimization:** If only sending commands and events is required, disabling the `listenReplies` property + reduces the number of open channels. +- **Selective Annotation Activation:** Enabling only the necessary annotations for the use case can improve performance + and simplify configuration. +- **Proper Use of Configuration Properties:** Adjusting configuration properties according to the specific use case + allows for resource optimization and avoids unnecessary configurations. + +### Connections in microservices with multiple replicas + +In a typical cloud production environment, such as AWS, microservices are deployed in containers orchestrated by +Kubernetes, using managed services like Amazon EKS. For the messaging broker, Amazon MQ for RabbitMQ is used, configured +in a 3-node cluster with a Multi-AZ deployment to ensure high availability and fault tolerance. + +When working with microservices that use multiple replicas (instances) and implement Reactive Commons, it is important +to understand how connections to the message broker are managed. Each replica of a microservice establishes **a single +connection** to the broker, which is used for both sending and listening to messages. + +The number of open channels within that single connection will depend on the configuration of the annotations used, as +described in the connection scenarios above. This allows each replica to manage its messaging operations independently, +distributing the workload efficiently. + +For example, if a microservice is deployed with **4 replicas**, each of them will establish its own connection to the +broker. As a result, the entire microservice deployment will have a total of **4 connections** to the broker. + +## Mandatory property The mandatory property is a message publishing parameter in RabbitMQ that determines the behavior when a message cannot be routed to any queue. This can happen if there are no queues bound to the exchange or if the routing key does not @@ -292,7 +344,7 @@ queues. If no queue receives the message, then: - The producer must have a ReturnListener or an equivalent handler to receive and process the returned message. If one is not defined, the message is lost. -#### Example +### Example Assuming we have: @@ -328,7 +380,7 @@ message). ### Implementation -To enable the `mandatory` property in Reactive Commons, you can configure it in your project's `application.yaml` file: +To enable the `mandatory` property, you can configure it in your project's `application.yaml` file: ```yaml app: @@ -467,7 +519,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; -@Configuration("rabbitMQConfiguration") +@Configuration public class RabbitMQConfig { private final RabbitMQConnectionProperties properties; @@ -534,3 +586,37 @@ public class RabbitMQConfig { } ``` + +## Troubleshooting + +### PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' + +This error occurs when there is a mismatch between the queue properties defined in your application and the properties +of the queue that already exists in the RabbitMQ broker. +It commonly happens when you try to: + +- Change the name of a domain. +- Enable or disable DLQ (Dead Letter Queue) functionality for a queue that has already been created. + +**Error log example:** + +```text +Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: +#method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' +for queue 'ms_example.query' in vhost '/': received none but current is the value 'directMessages.DLQ' +of type 'longstr', class-id=50, method-id=10) +``` + +**Cause:** + +RabbitMQ does not allow changing certain durable properties of a queue after it has been declared, such as the +`x-dead-letter-exchange` argument. +When your application starts, it tries to declare the queue with the new properties, but the broker rejects the +declaration because it conflicts +with the existing queue. + +**Solution:** + +To resolve this issue, you must manually delete the conflicting queues from the RabbitMQ broker. Once the queues are +deleted, +you can restart the microservice to recreate them with the correct, updated properties. diff --git a/docs/docs/reactive-commons/configuration_properties/2-kafka.md b/docs/docs/reactive-commons/configuration_properties/2-kafka.md new file mode 100644 index 00000000..81917b22 --- /dev/null +++ b/docs/docs/reactive-commons/configuration_properties/2-kafka.md @@ -0,0 +1,85 @@ +--- +sidebar_position: 2 +--- + +# Kafka Configuration + +You can customize some predefined variables of Reactive Commons. + +This can be done by Spring Boot `application.yaml` or by overriding +the [AsyncKafkaProps](https://github.com/reactive-commons/reactive-commons-java/blob/master/starters/async-kafka-starter/src/main/java/org/reactivecommons/async/kafka/config/props/AsyncKafkaProps.java) +bean. + +```yaml +reactive: + commons: + kafka: + app: # this is the name of the default domain + withDLQRetry: false # if you want to have dlq queues with retries you can set it to true, you cannot change it after queues are created, because you will get an error, so you should delete topology before the change. + maxRetries: -1 # -1 will be considered default value. When withDLQRetry is true, it will be retried 10 times. When withDLQRetry is false, it will be retried indefinitely. + retryDelay: 1000 # interval for message retries, with and without DLQRetry + checkExistingTopics: true # if you don't want to verify topic existence before send a record you can set it to false + createTopology: true # if your organization have restrictions with automatic topology creation you can set it to false and create it manually or by your organization process. + useDiscardNotifierPerDomain: false # if true it uses a discard notifier for each domain,when false it uses a single discard notifier for all domains with default 'app' domain + enabled: true # if you want to disable this domain you can set it to false + brokerType: "kafka" # please don't change this value + domain: + ignoreThisListener: false # Allows you to disable event listener for this specific domain + connectionProperties: # you can override the connection properties of each domain + bootstrap-servers: localhost:9092 + # Another domain can be configured with same properties structure that app + accounts: # this is a second domain name and can have another independent setup + connectionProperties: # you can override the connection properties of each domain + bootstrap-servers: localhost:9093 +``` + +You can override this settings programmatically through a `AsyncKafkaPropsDomainProperties` bean: + +```java +package sample; + +import org.reactivecommons.async.kafka.config.KafkaProperties; +import org.reactivecommons.async.kafka.config.props.AsyncProps; +import org.reactivecommons.async.kafka.config.props.AsyncKafkaPropsDomainProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; + +@Configuration +public class MyDomainConfig { + + @Bean + @Primary + public AsyncKafkaPropsDomainProperties customKafkaDomainProperties() { + KafkaProperties propertiesApp = new KafkaProperties(); + propertiesApp.setBootstrapServers(List.of("localhost:9092")); + + KafkaProperties propertiesAccounts = new KafkaProperties(); + propertiesAccounts.setBootstrapServers(List.of("localhost:9093")); + + return AsyncKafkaPropsDomainProperties.builder() + .withDomain("app", AsyncProps.builder() + .connectionProperties(propertiesApp) + .build()) + .withDomain("accounts", AsyncProps.builder() + .connectionProperties(propertiesAccounts) + .build()) + .build(); + } +} +``` + +## Loading properties from a secret + +Additionally, if you want to set only connection properties you can use the `AsyncKafkaPropsDomain.KafkaSecretFiller` +class. + +```java + +@Bean +@Primary +public AsyncKafkaPropsDomain.KafkaSecretFiller customKafkaFiller() { + return (domain, asyncProps) -> { + // customize asyncProps here by domain + }; +} +``` diff --git a/docs/docs/reactive-commons/configuration_properties/_category_.json b/docs/docs/reactive-commons/configuration_properties/_category_.json new file mode 100644 index 00000000..9074429b --- /dev/null +++ b/docs/docs/reactive-commons/configuration_properties/_category_.json @@ -0,0 +1,4 @@ +{ + "label": "Configuration Properties", + "position": 9 +} diff --git a/docs/package.json b/docs/package.json index ac45ffb8..4f028ff5 100644 --- a/docs/package.json +++ b/docs/package.json @@ -17,11 +17,11 @@ "@docusaurus/core": "^3.8.1", "@docusaurus/preset-classic": "^3.8.1", "@docusaurus/theme-common": "^3.8.1", - "@mdx-js/react": "^3.1.0", + "@mdx-js/react": "^3.1.1", "clsx": "^2.1.1", "prism-react-renderer": "^2.4.1", - "react": "^19.1.0", - "react-dom": "^19.1.0" + "react": "^19.1.1", + "react-dom": "^19.1.1" }, "devDependencies": { "@docusaurus/module-type-aliases": "^3.8.1", diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 1b33c55b..8bdaf60c 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index d4081da4..6a38a8ce 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip +distributionSha256Sum=a17ddd85a26b6a7f5ddb71ff8b05fc5104c0202c6e64782429790c933686c806 +distributionUrl=https\://services.gradle.org/distributions/gradle-9.1.0-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 23d15a93..adff685a 100755 --- a/gradlew +++ b/gradlew @@ -1,7 +1,7 @@ #!/bin/sh # -# Copyright © 2015-2021 the original authors. +# Copyright © 2015 the original authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -114,7 +114,6 @@ case "$( uname )" in #( NONSTOP* ) nonstop=true ;; esac -CLASSPATH="\\\"\\\"" # Determine the Java command to use to start the JVM. @@ -172,7 +171,6 @@ fi # For Cygwin or MSYS, switch paths to Windows format before running java if "$cygwin" || "$msys" ; then APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) - CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) JAVACMD=$( cygpath --unix "$JAVACMD" ) @@ -212,7 +210,6 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ - -classpath "$CLASSPATH" \ -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \ "$@" diff --git a/gradlew.bat b/gradlew.bat index 5eed7ee8..e509b2dd 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -70,11 +70,10 @@ goto fail :execute @rem Setup the command line -set CLASSPATH= @rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* :end @rem End local scope for the variables with windows NT shell diff --git a/main.gradle b/main.gradle index 79545c89..c2dbd052 100644 --- a/main.gradle +++ b/main.gradle @@ -3,8 +3,9 @@ allprojects { apply plugin: 'jacoco' java { - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 + toolchain { + languageVersion = JavaLanguageVersion.of(17) + } } repositories { @@ -22,7 +23,7 @@ allprojects { property 'sonar.organization', 'reactive-commons' property 'sonar.host.url', 'https://sonarcloud.io' property "sonar.sources", "src/main" - property "sonar.test", "src/test" + property "sonar.tests", "src/test" property "sonar.java.binaries", "build/classes" property "sonar.junit.reportPaths", "build/test-results/test" property "sonar.java-coveragePlugin", "jacoco" @@ -86,7 +87,7 @@ subprojects { dependencyManagement { imports { - mavenBom 'org.springframework.boot:spring-boot-dependencies:3.5.4' + mavenBom 'org.springframework.boot:spring-boot-dependencies:3.5.6' } } @@ -182,5 +183,7 @@ tasks.register('generateMergedReport', JacocoReport) { } tasks.named('wrapper') { - gradleVersion = '8.14.3' + gradleVersion = '9.1.0' + validateDistributionUrl = true + distributionSha256Sum = "a17ddd85a26b6a7f5ddb71ff8b05fc5104c0202c6e64782429790c933686c806" } \ No newline at end of file diff --git a/samples/async/simpleConsumer/simple-consumer.gradle b/samples/async/simpleConsumer/simple-consumer.gradle index 65847422..77431149 100644 --- a/samples/async/simpleConsumer/simple-consumer.gradle +++ b/samples/async/simpleConsumer/simple-consumer.gradle @@ -1,3 +1,8 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter' +} + +test { + // Disable error when no tests are found (for example modules) + failOnNoDiscoveredTests = false } \ No newline at end of file diff --git a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java index a6717026..31dd72a7 100644 --- a/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java +++ b/starters/async-commons-starter/src/main/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomain.java @@ -32,11 +32,11 @@ public GenericAsyncPropsDomain(String defaultAppName, this.asyncPropsClass = asyncPropsClass; ObjectMapper mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); - this.computeIfAbsent(DEFAULT_DOMAIN, k -> { - T defaultApp = AsyncPropsDomainBuilder.instantiate(asyncPropsClass); - defaultApp.setConnectionProperties(mapper.convertValue(defaultProperties, propsClass)); - return defaultApp; - }); + + if (!this.containsKey(DEFAULT_DOMAIN)) { + throw new InvalidConfigurationException("Required domain '" + DEFAULT_DOMAIN + "' is not configured."); + } + super.forEach((key, value) -> { // To ensure that each domain has an appName if (value.getAppName() == null) { if (defaultAppName == null || defaultAppName.isEmpty()) { diff --git a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java index b69b8fde..9773b926 100644 --- a/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java +++ b/starters/async-commons-starter/src/test/java/org/reactivecommons/async/starter/props/GenericAsyncPropsDomainTest.java @@ -25,6 +25,8 @@ void shouldCreateProps() { String defaultAppName = "sample"; MyBrokerConnProps defaultMyBrokerProps = new MyBrokerConnProps(); AsyncMyBrokerPropsDomainProperties configured = new AsyncMyBrokerPropsDomainProperties(); + configured.put(DEFAULT_DOMAIN, new MyBrokerAsyncProps()); + MyBrokerAsyncProps other = new MyBrokerAsyncProps(); other.setAppName(OTHER); configured.put(OTHER, other); diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/ConnectionFactoryCustomizer.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/ConnectionFactoryCustomizer.java new file mode 100644 index 00000000..1ecaf204 --- /dev/null +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/ConnectionFactoryCustomizer.java @@ -0,0 +1,14 @@ +package org.reactivecommons.async.rabbit; + +import com.rabbitmq.client.ConnectionFactory; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; + +/** + * Interface for customizing the RabbitMQ ConnectionFactory. + */ +@FunctionalInterface +public interface ConnectionFactoryCustomizer { + + ConnectionFactory customize(AsyncProps asyncProps, ConnectionFactory connectionFactory); + +} diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java index 55e97ebc..fe61e342 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProvider.java @@ -141,7 +141,7 @@ public void listenQueries(HandlerResolver resolver) { @Override public void listenReplies() { - if (props.isListenReplies()) { + if (Boolean.TRUE.equals(props.getListenReplies())) { final ApplicationReplyListener replyListener = new ApplicationReplyListener(router, receiver, props.getBrokerConfigProps().getReplyQueue(), diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactory.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactory.java index a995e7e8..acc6fe73 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactory.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactory.java @@ -10,7 +10,6 @@ import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier; import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; -import org.reactivecommons.async.rabbit.config.RabbitProperties; import org.reactivecommons.async.rabbit.config.props.AsyncProps; import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter; import org.reactivecommons.async.rabbit.discard.RabbitMQDiscardProviderFactory; @@ -30,6 +29,7 @@ public class RabbitMQBrokerProviderFactory implements BrokerProviderFactory getProvider(String domain, AsyncProps props, DiscardProvider discardProvider) { - RabbitProperties properties = props.getConnectionProperties(); - ConnectionFactoryProvider provider = RabbitMQSetupUtils.connectionFactoryProvider(properties); + ConnectionFactoryProvider provider = RabbitMQSetupUtils.connectionFactoryProvider(props, cfCustomizer); RabbitReactiveHealthIndicator healthIndicator = new RabbitReactiveHealthIndicator(domain, provider.getConnectionFactory()); ReactiveMessageSender sender = RabbitMQSetupUtils.createMessageSender(provider, props, converter, diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java index 00331b9e..923ef2e9 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/RabbitMQSetupUtils.java @@ -4,7 +4,6 @@ import com.rabbitmq.client.ConnectionFactory; import lombok.AccessLevel; import lombok.NoArgsConstructor; -import lombok.SneakyThrows; import lombok.extern.java.Log; import org.reactivecommons.api.domain.DomainEventBus; import org.reactivecommons.async.commons.DLQDiscardNotifier; @@ -50,18 +49,21 @@ import java.security.cert.CertificateException; import java.time.Duration; import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; @Log @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class RabbitMQSetupUtils { - private static final String LISTENER_TYPE = "listener"; - private static final String TOPOLOGY_TYPE = "topology"; - private static final String SENDER_TYPE = "sender"; + private static final String SHARED_TYPE = "shared"; private static final String DEFAULT_PROTOCOL; public static final int START_INTERVAL = 300; public static final int MAX_BACKOFF_INTERVAL = 3000; + private static final ConcurrentMap FACTORY_CACHE = new ConcurrentHashMap<>(); + private static final ConcurrentMap> CONNECTION_CACHE = new ConcurrentHashMap<>(); + static { String protocol = "TLSv1.1"; try { @@ -78,21 +80,28 @@ public final class RabbitMQSetupUtils { DEFAULT_PROTOCOL = protocol; } - @SneakyThrows - public static ConnectionFactoryProvider connectionFactoryProvider(RabbitProperties properties) { - final ConnectionFactory factory = new ConnectionFactory(); - PropertyMapper map = PropertyMapper.get(); - map.from(properties::determineHost).whenNonNull().to(factory::setHost); - map.from(properties::determinePort).to(factory::setPort); - map.from(properties::determineUsername).whenNonNull().to(factory::setUsername); - map.from(properties::determinePassword).whenNonNull().to(factory::setPassword); - map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost); - factory.useNio(); - setUpSSL(factory, properties); + public static ConnectionFactoryProvider connectionFactoryProvider(AsyncProps asyncProps, + ConnectionFactoryCustomizer cfCustomizer) { + final ConnectionFactory factory = FACTORY_CACHE.computeIfAbsent(asyncProps, props -> { + try { + RabbitProperties rabbitProperties = props.getConnectionProperties(); + ConnectionFactory newFactory = new ConnectionFactory(); + PropertyMapper map = PropertyMapper.get(); + map.from(rabbitProperties::determineHost).whenNonNull().to(newFactory::setHost); + map.from(rabbitProperties::determinePort).to(newFactory::setPort); + map.from(rabbitProperties::determineUsername).whenNonNull().to(newFactory::setUsername); + map.from(rabbitProperties::determinePassword).whenNonNull().to(newFactory::setPassword); + map.from(rabbitProperties::determineVirtualHost).whenNonNull().to(newFactory::setVirtualHost); + newFactory.useNio(); + setUpSSL(newFactory, rabbitProperties); + return cfCustomizer.customize(props, newFactory); + } catch (Exception e) { + throw new RuntimeException("Error creating ConnectionFactory: ", e); + } + }); return () -> factory; } - public static ReactiveMessageSender createMessageSender(ConnectionFactoryProvider provider, AsyncProps props, MessageConverter converter, @@ -107,7 +116,7 @@ public static ReactiveMessageSender createMessageSender(ConnectionFactoryProvide public static ReactiveMessageListener createMessageListener(ConnectionFactoryProvider provider, AsyncProps props) { final Mono connection = - createConnectionMono(provider.getConnectionFactory(), props.getAppName(), LISTENER_TYPE); + createConnectionMono(provider.getConnectionFactory(), props.getAppName()); final Receiver receiver = RabbitFlux.createReceiver(new ReceiverOptions().connectionMono(connection)); final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection)); @@ -117,10 +126,9 @@ public static ReactiveMessageListener createMessageListener(ConnectionFactoryPro props.getPrefetchCount()); } - public static TopologyCreator createTopologyCreator(AsyncProps props) { - ConnectionFactoryProvider provider = connectionFactoryProvider(props.getConnectionProperties()); - final Mono connection = createConnectionMono(provider.getConnectionFactory(), - props.getAppName(), TOPOLOGY_TYPE); + public static TopologyCreator createTopologyCreator(AsyncProps props, ConnectionFactoryCustomizer cfCustomizer) { + ConnectionFactoryProvider provider = connectionFactoryProvider(props, cfCustomizer); + final Mono connection = createConnectionMono(provider.getConnectionFactory(), props.getAppName()); final Sender sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connection)); return new TopologyCreator(sender, props.getQueueType()); } @@ -134,8 +142,7 @@ public static DiscardNotifier createDiscardNotifier(ReactiveMessageSender sender private static SenderOptions reactiveCommonsSenderOptions(String appName, ConnectionFactoryProvider provider, RabbitProperties rabbitProperties) { - final Mono senderConnection = createConnectionMono(provider.getConnectionFactory(), appName, - SENDER_TYPE); + final Mono senderConnection = createConnectionMono(provider.getConnectionFactory(), appName); final ChannelPoolOptions channelPoolOptions = new ChannelPoolOptions(); final PropertyMapper map = PropertyMapper.get(); @@ -153,18 +160,20 @@ private static SenderOptions reactiveCommonsSenderOptions(String appName, Connec .transform(Utils::cache)); } - private static Mono createConnectionMono(ConnectionFactory factory, String connectionPrefix, - String connectionType) { - log.info("Creating connection mono to RabbitMQ Broker in host '" + factory.getHost() + "' with " + - "type: " + connectionType); - return Mono.fromCallable(() -> factory.newConnection(connectionPrefix + " " + connectionType)) - .doOnError(err -> - log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host '" + - factory.getHost() + "'. Starting retry process...", err) - ) - .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(START_INTERVAL)) - .maxBackoff(Duration.ofMillis(MAX_BACKOFF_INTERVAL))) - .cache(); + private static Mono createConnectionMono(ConnectionFactory factory, String appName) { + return CONNECTION_CACHE.computeIfAbsent(factory, f -> { + log.info("Creating connection mono to RabbitMQ Broker in host '" + f.getHost() + "'"); + return Mono.fromCallable(() -> f.newConnection( + appName + "-" + InstanceIdentifier.getInstanceId(SHARED_TYPE, "") + )) + .doOnError(err -> + log.log(Level.SEVERE, "Error creating connection to RabbitMQ Broker in host '" + + f.getHost() + "'. Starting retry process...", err) + ) + .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(START_INTERVAL)) + .maxBackoff(Duration.ofMillis(MAX_BACKOFF_INTERVAL))) + .cache(); + }); } // SSL based on RabbitConnectionFactoryBean diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java index d8370f16..03f2b717 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/config/props/AsyncProps.java @@ -54,7 +54,14 @@ public class AsyncProps extends GenericAsyncProps { private Integer retryDelay = 1000; @Builder.Default - private boolean listenReplies = true; + private Boolean listenReplies = null; + + public Boolean getListenReplies() { + if (listenReplies == null) { + throw new IllegalArgumentException("The 'listenReplies' property is required, please specify a 'true' or 'false' value."); + } + return listenReplies; + } @Builder.Default private Boolean withDLQRetry = false; diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderFactory.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderFactory.java index 62bf9fb3..000b4477 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderFactory.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderFactory.java @@ -2,9 +2,13 @@ import org.reactivecommons.async.commons.config.BrokerConfig; import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.rabbit.ConnectionFactoryCustomizer; import org.reactivecommons.async.rabbit.config.props.AsyncProps; import org.reactivecommons.async.starter.broker.DiscardProvider; +@FunctionalInterface public interface RabbitMQDiscardProviderFactory { - DiscardProvider build(AsyncProps props, BrokerConfig config, MessageConverter converter); + DiscardProvider build( + AsyncProps props, BrokerConfig config, MessageConverter converter, ConnectionFactoryCustomizer cfCustomizer + ); } diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderImpl.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderImpl.java index 9a71e21b..40e26d6b 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderImpl.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/rabbit/discard/RabbitMQDiscardProviderImpl.java @@ -4,10 +4,10 @@ import org.reactivecommons.async.commons.DiscardNotifier; import org.reactivecommons.async.commons.config.BrokerConfig; import org.reactivecommons.async.commons.converters.MessageConverter; +import org.reactivecommons.async.rabbit.ConnectionFactoryCustomizer; import org.reactivecommons.async.rabbit.RabbitMQSetupUtils; import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender; import org.reactivecommons.async.rabbit.config.ConnectionFactoryProvider; -import org.reactivecommons.async.rabbit.config.RabbitProperties; import org.reactivecommons.async.rabbit.config.props.AsyncProps; import org.reactivecommons.async.starter.broker.DiscardProvider; @@ -19,6 +19,7 @@ public class RabbitMQDiscardProviderImpl implements DiscardProvider { private final AsyncProps props; private final BrokerConfig config; private final MessageConverter converter; + private final ConnectionFactoryCustomizer cfCustomizer; private final Map discardNotifier = new ConcurrentHashMap<>(); @Override @@ -27,8 +28,7 @@ public DiscardNotifier get() { } private DiscardNotifier buildDiscardNotifier(boolean ignored) { - RabbitProperties properties = props.getConnectionProperties(); - ConnectionFactoryProvider provider = RabbitMQSetupUtils.connectionFactoryProvider(properties); + ConnectionFactoryProvider provider = RabbitMQSetupUtils.connectionFactoryProvider(props, cfCustomizer); ReactiveMessageSender sender = RabbitMQSetupUtils.createMessageSender(provider, props, converter, null); return RabbitMQSetupUtils.createDiscardNotifier(sender, props, config, converter); } diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfig.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfig.java index d0f88594..29ca59c1 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfig.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfig.java @@ -3,6 +3,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.java.Log; import org.reactivecommons.async.commons.converters.json.ObjectMapperSupplier; +import org.reactivecommons.async.rabbit.ConnectionFactoryCustomizer; import org.reactivecommons.async.rabbit.RabbitMQBrokerProviderFactory; import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier; import org.reactivecommons.async.rabbit.communications.UnroutableMessageProcessor; @@ -58,4 +59,11 @@ UnroutableMessageProcessor defaultUnroutableMessageProcessor(UnroutableMessageNo notifier.listenToUnroutableMessages(factory); return factory; } + + @Bean + @ConditionalOnMissingBean(ConnectionFactoryCustomizer.class) + public ConnectionFactoryCustomizer defaultConnectionFactoryCustomizer() { + return ((asyncProps, connectionFactory) -> connectionFactory); + } + } diff --git a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/listener/rabbit/RabbitMQListenerOnlyConfig.java b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/listener/rabbit/RabbitMQListenerOnlyConfig.java index cebe207e..098cc414 100644 --- a/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/listener/rabbit/RabbitMQListenerOnlyConfig.java +++ b/starters/async-rabbit-starter/src/main/java/org/reactivecommons/async/starter/impl/listener/rabbit/RabbitMQListenerOnlyConfig.java @@ -2,6 +2,7 @@ import org.reactivecommons.async.api.DynamicRegistry; import org.reactivecommons.async.commons.config.IBrokerConfigProps; +import org.reactivecommons.async.rabbit.ConnectionFactoryCustomizer; import org.reactivecommons.async.rabbit.DynamicRegistryImp; import org.reactivecommons.async.rabbit.RabbitMQSetupUtils; import org.reactivecommons.async.rabbit.communications.TopologyCreator; @@ -20,9 +21,9 @@ public class RabbitMQListenerOnlyConfig { @Bean @ConditionalOnMissingBean(DynamicRegistry.class) - public DynamicRegistry dynamicRegistry(AsyncPropsDomain asyncPropsDomain, DomainHandlers handlers) { + public DynamicRegistry dynamicRegistry(AsyncPropsDomain asyncPropsDomain, DomainHandlers handlers, ConnectionFactoryCustomizer cfCustomizer) { AsyncProps props = asyncPropsDomain.getProps(DEFAULT_DOMAIN); - TopologyCreator topologyCreator = RabbitMQSetupUtils.createTopologyCreator(props); + TopologyCreator topologyCreator = RabbitMQSetupUtils.createTopologyCreator(props, cfCustomizer); IBrokerConfigProps brokerConfigProps = new BrokerConfigProps(asyncPropsDomain.getProps(DEFAULT_DOMAIN)); return new DynamicRegistryImp(handlers.get(DEFAULT_DOMAIN), topologyCreator, brokerConfigProps); } diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactoryTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactoryTest.java index 9e26aeb6..ff49c4b0 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactoryTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderFactoryTest.java @@ -1,5 +1,6 @@ package org.reactivecommons.async.rabbit; +import com.rabbitmq.client.ConnectionFactory; import io.micrometer.core.instrument.MeterRegistry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -22,6 +23,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class RabbitMQBrokerProviderFactoryTest { @@ -35,13 +38,15 @@ class RabbitMQBrokerProviderFactoryTest { private CustomReporter errorReporter; @Mock private UnroutableMessageNotifier unroutableMessageNotifier; + @Mock + private ConnectionFactoryCustomizer cfCustomizer; private BrokerProviderFactory providerFactory; @BeforeEach void setUp() { providerFactory = new RabbitMQBrokerProviderFactory(config, router, converter, meterRegistry, errorReporter, - RabbitMQDiscardProviderImpl::new, unroutableMessageNotifier); + RabbitMQDiscardProviderImpl::new, unroutableMessageNotifier, cfCustomizer); } @Test @@ -65,6 +70,9 @@ void shouldReturnCreateDiscardProvider() { @Test void shouldReturnBrokerProvider() { + when(cfCustomizer.customize(any(AsyncProps.class), any(ConnectionFactory.class))) + .thenAnswer(invocation -> invocation.getArgument(1)); + // Arrange AsyncProps props = new AsyncProps(); props.setConnectionProperties(new RabbitProperties()); diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java index 7e9ee686..09dc8e28 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQBrokerProviderTest.java @@ -80,6 +80,7 @@ void init() { IBrokerConfigProps configProps = new BrokerConfigProps(props); props.setBrokerConfigProps(configProps); props.setAppName("test"); + props.setListenReplies(Boolean.TRUE); brokerProvider = new RabbitMQBrokerProvider(DEFAULT_DOMAIN, props, brokerConfig, diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQDiscardProviderImplTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQDiscardProviderImplTest.java index c2ce4338..cdfea8ad 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQDiscardProviderImplTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/rabbit/RabbitMQDiscardProviderImplTest.java @@ -1,5 +1,7 @@ package org.reactivecommons.async.rabbit; +import com.rabbitmq.client.ConnectionFactory; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -15,12 +17,23 @@ import org.reactivecommons.async.rabbit.discard.RabbitMQDiscardProviderImpl; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class RabbitMQDiscardProviderImplTest { @Mock private RabbitJacksonMessageConverter converter; + @Mock + private ConnectionFactoryCustomizer cfCustomizer; + + @BeforeEach + void setUp() { + when(cfCustomizer.customize(any(AsyncProps.class), any(ConnectionFactory.class))) + .thenAnswer(invocation -> invocation.getArgument(1)); + } + @Test void shouldCreateDiscardNotifier() { // Arrange @@ -29,7 +42,7 @@ void shouldCreateDiscardNotifier() { IBrokerConfigProps brokerConfigProps = new BrokerConfigProps(props); props.setBrokerConfigProps(brokerConfigProps); BrokerConfig brokerConfig = new BrokerConfig(); - RabbitMQDiscardProviderImpl discardProvider = new RabbitMQDiscardProviderImpl(props, brokerConfig, converter); + var discardProvider = new RabbitMQDiscardProviderImpl(props, brokerConfig, converter, cfCustomizer); // Act DiscardNotifier notifier = discardProvider.get(); // Assert diff --git a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfigTest.java b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfigTest.java index d11d48df..32986a06 100644 --- a/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfigTest.java +++ b/starters/async-rabbit-starter/src/test/java/org/reactivecommons/async/starter/impl/common/rabbit/RabbitMQConfigTest.java @@ -1,14 +1,17 @@ package org.reactivecommons.async.starter.impl.common.rabbit; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.ConnectionFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; +import org.reactivecommons.async.rabbit.ConnectionFactoryCustomizer; import org.reactivecommons.async.rabbit.RabbitMQBrokerProviderFactory; import org.reactivecommons.async.rabbit.communications.MyOutboundMessage; import org.reactivecommons.async.rabbit.communications.UnroutableMessageHandler; import org.reactivecommons.async.rabbit.communications.UnroutableMessageNotifier; import org.reactivecommons.async.rabbit.communications.UnroutableMessageProcessor; +import org.reactivecommons.async.rabbit.config.props.AsyncProps; import org.reactivecommons.async.rabbit.config.props.AsyncPropsDomain; import org.reactivecommons.async.rabbit.converters.json.RabbitJacksonMessageConverter; import org.reactivecommons.async.starter.config.ConnectionManager; @@ -118,4 +121,22 @@ void shouldSubscribeProcessorToNotifierWhenNotifierIsProvided() { void shouldThrowNullPointerExceptionWhenNotifierIsNull() { assertThrows(NullPointerException.class, () -> rabbitMQConfig.defaultUnroutableMessageProcessor(null)); } + + @Test + void shouldReturnDefaultConnectionFactoryCustomizer() { + ConnectionFactoryCustomizer customizer = rabbitMQConfig.defaultConnectionFactoryCustomizer(); + + assertThat(customizer).isNotNull(); + } + + @Test + void shouldReturnSameConnectionFactoryWhenCustomizing() { + ConnectionFactoryCustomizer customizer = rabbitMQConfig.defaultConnectionFactoryCustomizer(); + ConnectionFactory originalFactory = new ConnectionFactory(); + AsyncProps asyncProps = new AsyncProps(); + + ConnectionFactory result = customizer.customize(asyncProps, originalFactory); + + assertThat(result).isSameAs(originalFactory); + } } diff --git a/starters/async-rabbit-starter/src/test/resources/application.properties b/starters/async-rabbit-starter/src/test/resources/application.properties deleted file mode 100644 index 3d8d7db0..00000000 --- a/starters/async-rabbit-starter/src/test/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -spring.application.name=test-app \ No newline at end of file diff --git a/starters/async-rabbit-starter/src/test/resources/application.yaml b/starters/async-rabbit-starter/src/test/resources/application.yaml new file mode 100644 index 00000000..803654f3 --- /dev/null +++ b/starters/async-rabbit-starter/src/test/resources/application.yaml @@ -0,0 +1,10 @@ +# Default App Name +spring: + application: + name: test-app + +# Async Props +app: + async: + app: # this is the name of the default domain + listenReplies: true \ No newline at end of file