Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #227 from BatyaPinski/support_bind_mutliple_queues…
Browse files Browse the repository at this point in the history
…_to_exchange

Support for binding multiple queues to the same exchanges
  • Loading branch information
v1r3n authored Jun 21, 2023
2 parents 65c0040 + 2cebe69 commit c0fe98c
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())
/*
* Create queue if not present based on the settings provided in the queue URI
* or configuration properties. Sample URI format:
* amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive
* amqp_exchange:myExchange?bindQueueName=myQueue&exchangeType=topic&routingKey=myRoutingKey&exclusive
* =false&autoDelete=false&durable=true Default settings if not provided in the
* queue URI or properties: isDurable: true, autoDelete: false, isExclusive:
* false The same settings are currently used during creation of exchange as
Expand All @@ -776,7 +776,7 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())
final AMQP.Queue.DeclareOk declareOk =
getOrCreateQueue(
ConnectionType.SUBSCRIBER,
String.format("bound_to_%s", settings.getQueueOrExchangeName()),
settings.getExchangeBoundQueueName(),
settings.isDurable(),
settings.isExclusive(),
settings.autoDelete(),
Expand Down Expand Up @@ -816,7 +816,7 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())
/*
* Create queue if not present based on the settings provided in the queue URI
* or configuration properties. Sample URI format:
* amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive
* amqp_exchange:myExchange?bindQueueName=myQueue&exchangeType=topic&routingKey=myRoutingKey&exclusive
* =false&autoDelete=false&durable=true Default settings if not provided in the
* queue URI or properties: isDurable: true, autoDelete: false, isExclusive:
* false The same settings are currently used during creation of exchange as
Expand All @@ -826,7 +826,7 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())
final AMQP.Queue.DeclareOk declareOk =
getOrCreateQueue(
ConnectionType.SUBSCRIBER,
String.format("bound_to_%s", settings.getQueueOrExchangeName()),
settings.getExchangeBoundQueueName(),
settings.isDurable(),
settings.isExclusive(),
settings.autoDelete(),
Expand All @@ -835,7 +835,7 @@ ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())
queueName = declareOk.getQueue();
amqpConnection
.getOrCreateChannel(
ConnectionType.SUBSCRIBER, getSettings().getQueueOrExchangeName())
ConnectionType.SUBSCRIBER, settings.getQueueOrExchangeName())
.queueBind(
queueName,
settings.getQueueOrExchangeName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum AMQPConfigurations {

// queue exchange settings
PARAM_EXCHANGE_TYPE("exchangeType"),
PARAM_QUEUE_NAME("bindQueueName"),
PARAM_ROUTING_KEY("routingKey"),
PARAM_DELIVERY_MODE("deliveryMode"),
PARAM_DURABLE("durable"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@

import com.netflix.conductor.contribs.queue.amqp.config.AMQPEventQueueProperties;

import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.PARAM_AUTO_DELETE;
import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.PARAM_DELIVERY_MODE;
import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.PARAM_DURABLE;
import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.PARAM_EXCHANGE_TYPE;
import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.PARAM_EXCLUSIVE;
import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.PARAM_MAX_PRIORITY;
import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.PARAM_ROUTING_KEY;
import static com.netflix.conductor.contribs.queue.amqp.util.AMQPConfigurations.*;

/**
* @author Ritu Parathody
Expand All @@ -45,6 +39,7 @@ public class AMQPSettings {
private String queueOrExchangeName;
private String eventName;
private String exchangeType;
private String exchangeBoundQueueName;
private String queueType;
private String routingKey;
private final String contentEncoding;
Expand Down Expand Up @@ -109,6 +104,14 @@ public String getQueueOrExchangeName() {
return queueOrExchangeName;
}

public String getExchangeBoundQueueName() {
if (StringUtils.isEmpty(exchangeBoundQueueName)) {
return String.format("bound_to_%s", queueOrExchangeName);
}
return exchangeBoundQueueName;
}


public String getExchangeType() {
return exchangeType;
}
Expand Down Expand Up @@ -139,13 +142,13 @@ public String getContentType() {
* <p><u>Example for queue:</u>
*
* <pre>
* amqp-queue:myQueue?deliveryMode=1&autoDelete=true&exclusive=true
* amqp_queue:myQueue?deliveryMode=1&autoDelete=true&exclusive=true
* </pre>
*
* <u>Example for exchange:</u>
*
* <pre>
* amqp-exchange:myExchange?exchangeType=topic&routingKey=myRoutingKey&exclusive=true
* amqp_exchange:myExchange?bindQueueName=myQueue&exchangeType=topic&routingKey=myRoutingKey&exclusive=true
* </pre>
*
* @param queueURI
Expand Down Expand Up @@ -179,6 +182,10 @@ public final AMQPSettings fromURI(final String queueURI) {
}
exchangeType = value;
}
if (kv[0].equalsIgnoreCase(
(String.valueOf(PARAM_QUEUE_NAME)))) {
exchangeBoundQueueName = kv[1];
}
if (kv[0].equalsIgnoreCase(
(String.valueOf(PARAM_ROUTING_KEY)))) {
String value = kv[1];
Expand Down Expand Up @@ -230,6 +237,7 @@ public boolean equals(Object obj) {
&& Objects.equals(exchangeType, other.exchangeType)
&& exclusive == other.exclusive
&& Objects.equals(queueOrExchangeName, other.queueOrExchangeName)
&& Objects.equals(exchangeBoundQueueName, other.exchangeBoundQueueName)
&& Objects.equals(queueType, other.queueType)
&& Objects.equals(routingKey, other.routingKey)
&& sequentialProcessing == other.sequentialProcessing;
Expand All @@ -248,6 +256,7 @@ public int hashCode() {
exchangeType,
exclusive,
queueOrExchangeName,
exchangeBoundQueueName,
queueType,
routingKey,
sequentialProcessing);
Expand All @@ -261,6 +270,8 @@ public String toString() {
+ eventName
+ ", exchangeType="
+ exchangeType
+ ", exchangeQueueName="
+ exchangeBoundQueueName
+ ", queueType="
+ queueType
+ ", routingKey="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,15 @@ void runObserve(
channel, connection, true, true, true, true, true);
}

@Test
public void testGetMessagesFromExistingExchangeWithDefaultConfiguration() throws IOException, TimeoutException {
// Mock channel and connection
Channel channel = mockBaseChannel();
Connection connection = mockGoodConnection(channel);
testGetMessagesFromExchangeAndDefaultConfiguration(
channel, connection, true, true);
}

@Test
public void testPublishMessagesToNotExistingExchangeAndDefaultConfiguration()
throws IOException, TimeoutException {
Expand Down Expand Up @@ -408,6 +417,7 @@ private void testGetMessagesFromExchangeAndDefaultConfiguration(
assertEquals(name, settings.getQueueOrExchangeName());
assertEquals(type, settings.getExchangeType());
assertEquals(routingKey, settings.getRoutingKey());
assertEquals(queueName, settings.getExchangeBoundQueueName());

List<GetResponse> queue = buildQueue(random, batchSize);
channel =
Expand Down Expand Up @@ -493,6 +503,8 @@ private void testGetMessagesFromExchangeAndCustomConfigurationFromURI(
+ name
+ "?exchangeType="
+ type
+ "&bindQueueName="
+ queueName
+ "&routingKey="
+ routingKey
+ "&deliveryMode=2"
Expand All @@ -508,6 +520,7 @@ private void testGetMessagesFromExchangeAndCustomConfigurationFromURI(
assertEquals(2, settings.getDeliveryMode());
assertEquals(name, settings.getQueueOrExchangeName());
assertEquals(type, settings.getExchangeType());
assertEquals(queueName, settings.getExchangeBoundQueueName());
assertEquals(routingKey, settings.getRoutingKey());

List<GetResponse> queue = buildQueue(random, batchSize);
Expand Down Expand Up @@ -540,6 +553,8 @@ private void testGetMessagesFromExchangeAndCustomConfigurationFromURI(
+ name
+ "?exchangeType="
+ type
+ "&bindQueueName="
+ queueName
+ "&routingKey="
+ routingKey
+ "&deliveryMode=2"
Expand Down

0 comments on commit c0fe98c

Please sign in to comment.