Skip to content

Commit

Permalink
REMReM publish creating lot of AMQP channels (#188)
Browse files Browse the repository at this point in the history
Added new property channelsCount to customize the rabbitmq channels.
  • Loading branch information
Divya-Kuppula authored Feb 4, 2020
1 parent 906aa0d commit 981da27
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## 2.0.13
- Added new property channelCount in application.properties to customize the rabbitmq channels.
- Added documentation for the new property.
- Uplifted eiffel-remrem-parent version from 2.0.2 to 2.0.4.
- Uplifted eiffel-remrem-shared version from 2.0.2 to 2.0.4.
- Uplifted eiffel-remrem-semantics version from 2.0.9 to 2.0.11.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public static void createCLIOptions() {
options.addOption("tls", "tls", true, "tls version, specify a valid tls version: '1', '1.1, '1.2' or 'default'. It is required for RabbitMq secured port.");
options.addOption("mp", "messaging_protocol", true, "name of messaging protocol to be used, e.g. eiffel3, eiffelsemantics, default is eiffelsemantics");
options.addOption("domain", "domainId", true, "identifies the domain that produces the event");
options.addOption("cc", "channelsCount", true, "Number of channels connected to message bus, default is 1");
options.addOption("ud", "user_domain_suffix", true, "user domain suffix");
options.addOption("v", "lists the versions of publish and all loaded protocols");
options.addOption("tag", "tag", true, "tag to be used in routing key");
Expand Down Expand Up @@ -213,6 +214,11 @@ public static void handleMessageBusOptions() throws HandleMessageBusException {
String key = PropertiesConfig.DOMAIN_ID;
System.setProperty(key, domain);
}
if (commandLine.hasOption("channelsCount")) {
String channelsCount = commandLine.getOptionValue("channelsCount");
String key = PropertiesConfig.CHANNELS_COUNT;
System.setProperty(key, channelsCount);
}

if (commandLine.hasOption("tls")) {
String tls_ver = commandLine.getOptionValue("tls");
Expand Down Expand Up @@ -266,6 +272,8 @@ public static void clearSystemProperties() {
System.clearProperty(key);
key = PropertiesConfig.DOMAIN_ID;
System.clearProperty(key);
key = PropertiesConfig.CHANNELS_COUNT;
System.clearProperty(key);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public class PropertiesConfig {
public static final String MESSAGE_BUS_HOST = "com.ericsson.eiffel.remrem.publish.messagebus.host";
public static final String MESSAGE_BUS_PORT = "com.ericsson.eiffel.remrem.publish.messagebus.port";
public static final String CHANNELS_COUNT = "com.ericsson.eiffel.remrem.publish.messagebus.channels";
public static final String TLS = "com.ericsson.eiffel.remrem.publish.messagebus.tls";
public static final String EXCHANGE_NAME = "com.ericsson.eiffel.remrem.publish.exchange.name";
public static final String USE_PERSISTENCE = "com.ericsson.eiffel.remrem.publish.use.persistence";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ private void readSpringProperties() {
rabbitMqProperties.setExchangeName(rabbitmqInstanceObject.get("exchangeName").asText());
rabbitMqProperties.setCreateExchangeIfNotExisting(rabbitmqInstanceObject.get("createExchangeIfNotExisting").asBoolean());
rabbitMqProperties.setDomainId(rabbitmqInstanceObject.get("domainId").asText());

if((rabbitmqInstanceObject.get("channelsCount") != null) ) {
rabbitMqProperties.setChannelsCount(Integer.parseInt(rabbitmqInstanceObject.get("channelsCount").asText()));
}
rabbitMqPropertiesMap.put(protocol, rabbitMqProperties);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
public class RabbitMqProperties {

private RMQBeanConnectionFactory factory = new RMQBeanConnectionFactory();
private static final int CHANNEL_COUNT = 100;
private static final Random random = new Random();
private boolean usePersitance = true;

Expand All @@ -49,6 +48,7 @@ public class RabbitMqProperties {
private String username;
private String password;
private String domainId;
private Integer channelsCount;
private boolean createExchangeIfNotExisting;

private Connection rabbitConnection;
Expand Down Expand Up @@ -122,6 +122,14 @@ public void setCreateExchangeIfNotExisting(boolean createExchangeIfNotExisting)
this.createExchangeIfNotExisting = createExchangeIfNotExisting;
}

public Integer getChannelsCount() {
return channelsCount;
}

public void setChannelsCount(Integer channelsCount) {
this.channelsCount = channelsCount;
}

public RMQBeanConnectionFactory getFactory() {
return factory;
}
Expand Down Expand Up @@ -171,6 +179,9 @@ public void init() throws RemRemPublishException {
factory.setUsername(username);
factory.setPassword(password);
}




if (tlsVer != null && !tlsVer.isEmpty()) {
if (tlsVer.contains("default")) {
Expand Down Expand Up @@ -202,7 +213,10 @@ public void createRabbitMqConnection() {
rabbitConnection = factory.newConnection();
log.info("Connected to RabbitMQ.");
rabbitChannels = new ArrayList<>();
for (int i = 0; i < CHANNEL_COUNT; i++) {
if(channelsCount == null || channelsCount == 0 ) {
channelsCount = 1;
}
for (int i = 0; i < channelsCount; i++) {
rabbitChannels.add(rabbitConnection.createChannel());
}
} catch (IOException | TimeoutException e) {
Expand Down Expand Up @@ -243,12 +257,17 @@ private void initService() {
password = getValuesFromSystemProperties(protocol + ".rabbitmq.password");
}

if (channelsCount == null ) {
channelsCount = Integer.getInteger(getValuesFromSystemProperties(protocol + ".rabbitmq.channelsCount"));
}
}


private void setValues() {
host = getValuesFromSystemProperties(PropertiesConfig.MESSAGE_BUS_HOST);
port = Integer.getInteger(PropertiesConfig.MESSAGE_BUS_PORT);
domainId = getValuesFromSystemProperties(PropertiesConfig.DOMAIN_ID);
channelsCount = Integer.getInteger(PropertiesConfig.CHANNELS_COUNT);
tlsVer = getValuesFromSystemProperties(PropertiesConfig.TLS);
exchangeName = getValuesFromSystemProperties(PropertiesConfig.EXCHANGE_NAME);
usePersitance = Boolean.getBoolean(PropertiesConfig.USE_PERSISTENCE);
Expand Down Expand Up @@ -403,4 +422,5 @@ private Channel giveMeRandomChannel() {
}
return rabbitChannels.get(random.nextInt(rabbitChannels.size()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
eiffelsemantics.rabbitmq.tls:
eiffelsemantics.rabbitmq.exchangeName: amq.direct
eiffelsemantics.rabbitmq.domainId: eiffelxxx
eiffelsemantics.rabbitmq.channelsCount: 1

# properties for server used to generate messages
generate.server.uri: http://127.0.0.1:8080
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class RMQHelperUnitTest {
private static final String tlsVer= "1.2";
private static final String usePersistence= "1.2";
private static final String domainId= "eiffelxxx";
private static final Integer channelsCount= 1;
private String protocol = "eiffelsemantics";
private String createExchange = "true";

Expand Down Expand Up @@ -100,6 +101,8 @@ private void initProperties() {
System.setProperty(key, createExchange);
key = PropertiesConfig.DOMAIN_ID;
System.setProperty(key, domainId);
key = PropertiesConfig.CHANNELS_COUNT;
System.setProperty(key, Integer.toString(channelsCount));
}

private void cleanProperties() {
Expand All @@ -121,6 +124,8 @@ private void cleanProperties() {
System.setProperty(key, createExchange);
key = PropertiesConfig.DOMAIN_ID;
System.clearProperty(key);
key = PropertiesConfig.CHANNELS_COUNT;
System.clearProperty(key);
}

@Test public void getHostTest() {
Expand Down
4 changes: 2 additions & 2 deletions publish-service/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ jasypt.encryptor.password:<Any value which was used at the time of encrypting th
#rabbitmq.host=mb101-eiffel010.lmera.ericsson.se
# must exist
#rabbitmq.exchange.name=eiffel.poc
rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }, \
{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }]
rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "channelsCount": "1", "domainId": "eiffelxxx", "createExchangeIfNotExisting":true }, \
{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1", "createExchangeIfNotExisting":true }]



Expand Down
6 changes: 3 additions & 3 deletions wiki/markdown/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ If RemRem-Publish is executed on same host as user accesses the RemRem-Publish s

MessageBus(RabbitMq) connections is configured via property "rabbitmq.instances.jsonlist":

rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }, \
{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }]
rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1", "createExchangeIfNotExisting":true }, \
{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1", "createExchangeIfNotExisting":true }]

"rabbitmq.instances.jsonlist" property can be configured with one or several Messagebus per Eiffel protocol/version, "mp" key in json list.
Most common is that user have only one MessageBus which uses one specific Eiffel protocol version and the property should be configured with only one RabbitMq connection:

rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }]
rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1","createExchangeIfNotExisting":true }]

## RemRem-Generate configuration

Expand Down
11 changes: 11 additions & 0 deletions wiki/markdown/usage/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ usage: java -jar
-domain,--domainId <arg> Identifies the domain that produces the event.
-cc,--channelsCount <arg> Number of channels connected to message
bus, default is 1
-en,--exchange_name <arg> Exchange name.
-ce,--create_exchange <arg> option to denote if we need to create an exchange
Expand Down Expand Up @@ -102,6 +105,14 @@ java -jar publish-cli.jar -f publishMessages.json -en mb-exchange -mb hostname -
java -jar publish-cli.jar -f publishMessages.json -en mb-exchange -mb hostname -domain publish-domain -mp eiffelsemantics -rk myroutingkey
```

**If you want to change the number of channels connected to RabbitMQ to publish messages:**
channelsCount default value is 1.
If the number of channels increases then the CPU load and memory usage on the RabbitMq increases.

```
java -jar publish-cli.jar -f publishMessages.json -en mb-exchange -mb hostname -domain publish-domain -mp eiffelsemantics -cc numberof-channels
```

**If you want to have the message publishing on RabbitMQ with given tag:**

```
Expand Down
5 changes: 3 additions & 2 deletions wiki/markdown/usage/service.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ These parameters are related to RabbitMQ Server, which will be used for publishi
<protocol>.rabbitmq.exchangeName: <exchange name, exchange should be already created on RabbitMQ Server>
<protocol>.rabbitmq.createExchangeIfNotExisting: <create Exchange if not present on RabbitMQ Server>
<protocol>.rabbitmq.domainId: <domain id, any string>
<protocol>.rabbitmq.channelsCount: <channels count, eg: 1 (default value is 1)>
```

```
Expand All @@ -151,8 +152,8 @@ These parameters are related to RabbitMQ Server, which will be used for publishi
# must exist
#rabbitmq.exchange.name=eiffel.poc
rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }, \
{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }]
rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1" ,"createExchangeIfNotExisting":true }, \
{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1" ,"createExchangeIfNotExisting":true }]
```

Application launch is terminated if exchange is unavailable and createExchangeIfNotExisting is set to false. Application can create the exchange by adding the below property in config.properties file
Expand Down

0 comments on commit 981da27

Please sign in to comment.