Skip to content

Commit

Permalink
ARTEMIS-4703 Add example of Queue federation with multiple brokers
Browse files Browse the repository at this point in the history
Adds an example that demonstrates moving messages through an intermediary
broker which has no consumer demand.
  • Loading branch information
tabish121 authored and gemmellr committed Apr 1, 2024
1 parent cf62334 commit 374f543
Show file tree
Hide file tree
Showing 7 changed files with 615 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.activemq.examples.broker-connection</groupId>
<artifactId>broker-connections</artifactId>
<version>2.34.0-SNAPSHOT</version>
</parent>

<artifactId>amqp-federation-queue-multiple-brokers</artifactId>
<packaging>jar</packaging>
<name>amqp-federation Queue Multiple Brokers</name>

<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>createA</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/serverA</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/serverA</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>createB</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/serverB</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/serverB</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>createC</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/serverC</instance>
<allowAnonymous>true</allowAnonymous>
<configuration>${basedir}/target/classes/activemq/serverC</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<!-- we first start broker A, then B, then C to avoid reconnecting statements -->
<execution>
<id>startA</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/serverA</location>
<testURI>tcp://localhost:5670</testURI>
<args>
<param>run</param>
</args>
<name>serverA</name>
</configuration>
</execution>
<execution>
<id>startB</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<location>${basedir}/target/serverB</location>
<testURI>tcp://localhost:5671</testURI>
<args>
<param>run</param>
</args>
<name>serverB</name>
</configuration>
</execution>
<execution>
<id>startC</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<location>${basedir}/target/serverC</location>
<testURI>tcp://localhost:5672</testURI>
<args>
<param>run</param>
</args>
<name>serverC</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<!-- you may have to set export MAVEN_OPTS="-Djava.net.preferIPv4Stack=true"
if you are on MacOS for instance -->
<clientClass>org.apache.activemq.artemis.jms.example.BrokerFederationExample</clientClass>
</configuration>
</execution>
<execution>
<id>stopC</id>
<goals>
<goal>stop</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/serverC</location>
</configuration>
</execution>
<execution>
<id>stopB</id>
<goals>
<goal>stop</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/serverB</location>
</configuration>
</execution>
<execution>
<id>stopA</id>
<goals>
<goal>stop</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/serverA</location>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.broker-connection</groupId>
<artifactId>amqp-federation-queue-multiple-brokers</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# AMQP Broker Connection with Queue federation across three brokers

To run the example, simply type **mvn verify** from this directory, or **mvn -PnoServer verify** if you want to create and start the broker manually.

This example demonstrates the configuration of Queue federation across a set of three brokers where the message
flow is arranged as follows

```
Producer -> A -> B -> C -> Consumer
```

A consumer on broker 'C' should receive messages sent by a producer on broker 'A'
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnectionFactory;

/**
* This example is demonstrating how queued messages are federated when a series of brokers are
* configured as follows:
*
* Producer -> A -> B -> C -> Consumer
*
* The intermediary broker B must be configured to treat a federation consumer as local demand so
* that it will establish federation with A when broker C establishes federation with it due to it
* having local demand from a real consumer.
*/
public class BrokerFederationExample {

public static void main(final String[] args) throws Exception {
{
final ConnectionFactory connectionFactoryServerA = new JmsConnectionFactory("amqp://localhost:5670");

try (Connection connectionOnServerA = connectionFactoryServerA.createConnection()) {
final Session sessionOnServerA = connectionOnServerA.createSession(Session.AUTO_ACKNOWLEDGE);
final Queue applicationQueue = sessionOnServerA.createQueue("applicationQueue");
final MessageProducer producerOnA = sessionOnServerA.createProducer(applicationQueue);

producerOnA.send(sessionOnServerA.createTextMessage("message #1"));
producerOnA.send(sessionOnServerA.createTextMessage("message #2"));
producerOnA.send(sessionOnServerA.createTextMessage("message #3"));
}
}

// Consumer created on server C should receive message from producer on Server A
final ConnectionFactory connectionFactoryServerC = new JmsConnectionFactory("amqp://localhost:5672");
final Connection connectionOnServerC = connectionFactoryServerC.createConnection();
final Session sessionOnServerC = connectionOnServerC.createSession(Session.AUTO_ACKNOWLEDGE);
final Queue applicationQueue = sessionOnServerC.createQueue("applicationQueue");
final MessageConsumer consumerOnC = sessionOnServerC.createConsumer(applicationQueue);

connectionOnServerC.start();

final TextMessage received1FromA = (TextMessage) consumerOnC.receive(10_000);
final TextMessage received2FromA = (TextMessage) consumerOnC.receive(10_000);
final TextMessage received3FromA = (TextMessage) consumerOnC.receive(10_000);

System.out.println("Consumer on server C received message: " + received1FromA.getText());
System.out.println("Consumer on server C received message: " + received2FromA.getText());
System.out.println("Consumer on server C received message: " + received3FromA.getText());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">

<name>ServerA</name>

<persistence-enabled>false</persistence-enabled>

<journal-type>NIO</journal-type>

<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>false</critical-analyzer>

<critical-analyzer-timeout>120000</critical-analyzer-timeout>

<critical-analyzer-check-period>60000</critical-analyzer-check-period>

<critical-analyzer-policy>HALT</critical-analyzer-policy>

<page-sync-timeout>44000</page-sync-timeout>

<acceptors>
<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:5670?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
</acceptors>

<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createAddress" roles="guest"/>
<permission type="deleteAddress" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="browse" roles="guest"/>
<permission type="send" roles="guest"/>
<permission type="manage" roles="guest"/>
</security-setting>
</security-settings>

<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
</address-settings>

<addresses>
<address name="applicationQueue">
<anycast>
<queue name="applicationQueue" />
</anycast>
</address>
</addresses>

</core>
</configuration>
Loading

0 comments on commit 374f543

Please sign in to comment.