Skip to content

Commit

Permalink
[CSB-2644]: rework amqp demo to use connection pooling (#123)
Browse files Browse the repository at this point in the history
* Adding camel-activemq using openwire example. Excludes Spring Boot's ActiveMQAutoConfiguration in order to configure camel-activemq entirely using camel.component.activemq configuration and not via spring.activemq configuration.

* org.messaginghub:pooled-jms is managed by BOM

* CSB-2644: rework amqp demo to use connection pooling and configure camel-amqp as much as possible via camel.component.amqp in application.properties rather than Java code

* [CSB-2644]: rework amqp demo to use connection pooling and configure camel-amqp as much as possible via camel.component.amqp in application.properties rather than Java code
  • Loading branch information
tmielke authored Jan 31, 2024
1 parent dda45b7 commit 8e2dbdc
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 66 deletions.
5 changes: 5 additions & 0 deletions amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-amqp-starter</artifactId>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>


<!-- test -->
<dependency>
Expand Down
39 changes: 11 additions & 28 deletions amqp/src/main/data/ReadMe.txt
Original file line number Diff line number Diff line change
@@ -1,43 +1,26 @@
example project which connects to A-MQ 7 from Fuse 7, using remote A-MQ address
Example project which connects to A-MQ 7, using the standard AMQP protocol.

There is the code, from that project, which instantiates component, and sends message

public class CamelRoute extends RouteBuilder {

@Override
public void configure() throws Exception {
JmsComponent component = createArtemisComponent();
getContext().addComponent("artemis", component);

from("timer://foo?fixedRate=true&period=60000&repeatCount=2")
.setBody().constant("HELLO")
.to("artemis:queue:test")
.log("Sent --> ${body}")
;
}

private JmsComponent createArtemisComponent() {

ActiveMQJMSConnectionFactory connectionFactory= new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
connectionFactory.setUser("admin");
connectionFactory.setPassword("admin");

JmsComponent component = new JmsComponent();
component.setConnectionFactory(connectionFactory);

return component;
from("timer:bar")
.id("timer-consumer-route")
.setBody(constant("Hello from Camel"))
.to("amqp:queue:SCIENCEQUEUE")
.log("Message sent from route ${routeId} to SCIENCEQUEUE");
}
}

Please see pom file, I don't specify pom versions, because they come in the BOM
Please also see the pom file, no need to specify pom versions,
because they come from the imported BOM.

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-amqp-starter</artifactId>
</dependency>



41 changes: 19 additions & 22 deletions amqp/src/main/java/sample/camel/AmqpConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,24 @@
*/
package sample.camel;

import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfig {

@Value("${AMQP_HOST}")
private String amqpHost;
@Value("${AMQP_SERVICE_PORT}")
private String amqpPort;
// configuration of the AMQP connection factory.
@Value("${AMQP_SERVICE_USERNAME}")
private String userName;

@Value("${AMQP_SERVICE_PASSWORD}")
private String pass;

@Value("${AMQP_REMOTE_URI}")
private String remoteUri;

public String getAmqpHost() {
return amqpHost;
}

public void setAmqpHost(String amqpHost) {
this.amqpHost = amqpHost;
}

public String getAmqpPort() {
return amqpPort;
}

public void setAmqpPort(String amqpPort) {
this.amqpPort = amqpPort;
}

public String getUserName() {
return userName;
Expand All @@ -69,12 +54,12 @@ public void setPass(String pass) {
public String getRemoteUri() {
return remoteUri;
}

public void setRemoteUri(String remoteUri) {
this.remoteUri = remoteUri;
}

@Bean
// @Bean
public org.apache.qpid.jms.JmsConnectionFactory amqpConnectionFactory() {
org.apache.qpid.jms.JmsConnectionFactory jmsConnectionFactory = new org.apache.qpid.jms.JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
Expand All @@ -83,4 +68,16 @@ public org.apache.qpid.jms.JmsConnectionFactory amqpConnectionFactory() {
return jmsConnectionFactory;
}

/* Recommendation is to use connection pooling.
By using a named bean we could directly reference the connection factory
in camel.component.amqp.connection-factory = #connectionPoolFactory
but its technically not needed if there is only one connectionFactory registered in
the Spring Boot registry.
*/
@Bean(name = "connectionPoolFactory", initMethod = "start", destroyMethod = "stop")
public JmsPoolConnectionFactory jmsPoolConnectionFactory() {
JmsPoolConnectionFactory jmsPoolConnectionFactory = new JmsPoolConnectionFactory();
jmsPoolConnectionFactory.setConnectionFactory(amqpConnectionFactory());
return jmsPoolConnectionFactory;
}
}
22 changes: 10 additions & 12 deletions amqp/src/main/java/sample/camel/SampleAutowiredAmqpRoute.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,28 @@
package sample.camel;

import org.apache.camel.builder.RouteBuilder;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class SampleAutowiredAmqpRoute extends RouteBuilder {

@Autowired JmsConnectionFactory amqpConnectionFactory;
@Bean
public org.apache.camel.component.amqp.AMQPComponent amqpConnection() {
org.apache.camel.component.amqp.AMQPComponent amqp = new org.apache.camel.component.amqp.AMQPComponent();
amqp.setConnectionFactory(amqpConnectionFactory);
return amqp;
}

@Override
public void configure() throws Exception {
from("file:src/main/data?noop=true")
.id("file-consumer-route")
.to("amqp:queue:SCIENCEQUEUE");

/*from("timer:bar")
from("timer:bar")
.id("timer-consumer-route")
.setBody(constant("Hello from Camel"))
.to("amqp:queue:SCIENCEQUEUE");*/
.to("amqp:queue:SCIENCEQUEUE")
.log("Message sent from route ${routeId} to SCIENCEQUEUE");

from("amqp:queue:SCIENCEQUEUE?receiveTimeout=10000")
.id("amqp-consumer-route")
.id("consumer-route")
.to("log:MyLogger?showBody=true");
}

}
26 changes: 22 additions & 4 deletions amqp/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,29 @@
## limitations under the License.
## ---------------------------------------------------------------------------

#You can use this property to override the default autowired broker-url

camel.springboot.main-run-controller = true

# JMS ConnectionFactory configuration
AMQP_REMOTE_URI=amqp://localhost:5672
AMQP_HOST=localhost
AMQP_SERVICE_PORT=5672
AMQP_SERVICE_USERNAME=admin
AMQP_SERVICE_PASSWORD=admin

# Camel AMQP config
# username and password configured here will overwrite the ones
# configured directly on the JMS ConnectionFactory
camel.component.amqp.password=admin
camel.component.amqp.username=admin
camel.component.amqp.cache-level-name = CACHE_CONSUMER
camel.component.amqp.connection-pooling=true
camel.component.amqp.jms-max-connections=1
camel.component.amqp.jms-maximum-active-session-per-connection=10
# optional but useful in case there are multiple connection factories
# camel.component.amqp.connection-factory = #connectionPoolFactory
camel.component.amqp.transacted = true
camel.component.amqp.lazy-create-transaction-manager = true
camel.component.amqp.delivery-persistent = true

# Optional logging configuration, useful to see transactions in action
# logging.level.org.apache.camel.component.jms=DEBUG
# logging.level.org.springframework.jms.connection.JmsTransactionManager=DEBUG
# logging.level.org.apache.qpid.jms.JmsLocalTransactionContext=DEBUG

0 comments on commit 8e2dbdc

Please sign in to comment.