From 981da27dc9efdf361fabf5e2576423aa52c875b0 Mon Sep 17 00:00:00 2001 From: zkxxdiv <46445939+zkxxdiv@users.noreply.github.com> Date: Tue, 4 Feb 2020 17:40:01 +0530 Subject: [PATCH] REMReM publish creating lot of AMQP channels (#188) Added new property channelsCount to customize the rabbitmq channels. --- CHANGELOG.md | 2 ++ .../eiffel/remrem/publish/cli/CliOptions.java | 8 +++++++ .../publish/config/PropertiesConfig.java | 1 + .../config/RabbitMqPropertiesConfig.java | 4 +++- .../publish/helper/RabbitMqProperties.java | 24 +++++++++++++++++-- .../main/resources/config.template.properties | 1 + .../publish/helper/RMQHelperUnitTest.java | 5 ++++ .../src/main/resources/application.properties | 4 ++-- wiki/markdown/configuration.md | 6 ++--- wiki/markdown/usage/cli.md | 11 +++++++++ wiki/markdown/usage/service.md | 5 ++-- 11 files changed, 61 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 423cbf8f..7fbd5904 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## 2.0.13 +- Added new property channelCount in application.properties to customize the rabbitmq channels. +- Added documentation for the new property. - Uplifted eiffel-remrem-parent version from 2.0.2 to 2.0.4. - Uplifted eiffel-remrem-shared version from 2.0.2 to 2.0.4. - Uplifted eiffel-remrem-semantics version from 2.0.9 to 2.0.11. diff --git a/publish-cli/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CliOptions.java b/publish-cli/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CliOptions.java index e4fbb913..3af0fadf 100644 --- a/publish-cli/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CliOptions.java +++ b/publish-cli/src/main/java/com/ericsson/eiffel/remrem/publish/cli/CliOptions.java @@ -78,6 +78,7 @@ public static void createCLIOptions() { options.addOption("tls", "tls", true, "tls version, specify a valid tls version: '1', '1.1, '1.2' or 'default'. It is required for RabbitMq secured port."); options.addOption("mp", "messaging_protocol", true, "name of messaging protocol to be used, e.g. eiffel3, eiffelsemantics, default is eiffelsemantics"); options.addOption("domain", "domainId", true, "identifies the domain that produces the event"); + options.addOption("cc", "channelsCount", true, "Number of channels connected to message bus, default is 1"); options.addOption("ud", "user_domain_suffix", true, "user domain suffix"); options.addOption("v", "lists the versions of publish and all loaded protocols"); options.addOption("tag", "tag", true, "tag to be used in routing key"); @@ -213,6 +214,11 @@ public static void handleMessageBusOptions() throws HandleMessageBusException { String key = PropertiesConfig.DOMAIN_ID; System.setProperty(key, domain); } + if (commandLine.hasOption("channelsCount")) { + String channelsCount = commandLine.getOptionValue("channelsCount"); + String key = PropertiesConfig.CHANNELS_COUNT; + System.setProperty(key, channelsCount); + } if (commandLine.hasOption("tls")) { String tls_ver = commandLine.getOptionValue("tls"); @@ -266,6 +272,8 @@ public static void clearSystemProperties() { System.clearProperty(key); key = PropertiesConfig.DOMAIN_ID; System.clearProperty(key); + key = PropertiesConfig.CHANNELS_COUNT; + System.clearProperty(key); } /** diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java index 059fb5f8..6031b32b 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/PropertiesConfig.java @@ -17,6 +17,7 @@ public class PropertiesConfig { public static final String MESSAGE_BUS_HOST = "com.ericsson.eiffel.remrem.publish.messagebus.host"; public static final String MESSAGE_BUS_PORT = "com.ericsson.eiffel.remrem.publish.messagebus.port"; + public static final String CHANNELS_COUNT = "com.ericsson.eiffel.remrem.publish.messagebus.channels"; public static final String TLS = "com.ericsson.eiffel.remrem.publish.messagebus.tls"; public static final String EXCHANGE_NAME = "com.ericsson.eiffel.remrem.publish.exchange.name"; public static final String USE_PERSISTENCE = "com.ericsson.eiffel.remrem.publish.use.persistence"; diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/RabbitMqPropertiesConfig.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/RabbitMqPropertiesConfig.java index 789ae0e2..8fd52c95 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/RabbitMqPropertiesConfig.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/config/RabbitMqPropertiesConfig.java @@ -94,7 +94,9 @@ private void readSpringProperties() { rabbitMqProperties.setExchangeName(rabbitmqInstanceObject.get("exchangeName").asText()); rabbitMqProperties.setCreateExchangeIfNotExisting(rabbitmqInstanceObject.get("createExchangeIfNotExisting").asBoolean()); rabbitMqProperties.setDomainId(rabbitmqInstanceObject.get("domainId").asText()); - + if((rabbitmqInstanceObject.get("channelsCount") != null) ) { + rabbitMqProperties.setChannelsCount(Integer.parseInt(rabbitmqInstanceObject.get("channelsCount").asText())); + } rabbitMqPropertiesMap.put(protocol, rabbitMqProperties); } } catch (Exception e) { diff --git a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java index 283d1698..7cf5c4f9 100644 --- a/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java +++ b/publish-common/src/main/java/com/ericsson/eiffel/remrem/publish/helper/RabbitMqProperties.java @@ -38,7 +38,6 @@ public class RabbitMqProperties { private RMQBeanConnectionFactory factory = new RMQBeanConnectionFactory(); - private static final int CHANNEL_COUNT = 100; private static final Random random = new Random(); private boolean usePersitance = true; @@ -49,6 +48,7 @@ public class RabbitMqProperties { private String username; private String password; private String domainId; + private Integer channelsCount; private boolean createExchangeIfNotExisting; private Connection rabbitConnection; @@ -122,6 +122,14 @@ public void setCreateExchangeIfNotExisting(boolean createExchangeIfNotExisting) this.createExchangeIfNotExisting = createExchangeIfNotExisting; } + public Integer getChannelsCount() { + return channelsCount; + } + + public void setChannelsCount(Integer channelsCount) { + this.channelsCount = channelsCount; + } + public RMQBeanConnectionFactory getFactory() { return factory; } @@ -171,6 +179,9 @@ public void init() throws RemRemPublishException { factory.setUsername(username); factory.setPassword(password); } + + + if (tlsVer != null && !tlsVer.isEmpty()) { if (tlsVer.contains("default")) { @@ -202,7 +213,10 @@ public void createRabbitMqConnection() { rabbitConnection = factory.newConnection(); log.info("Connected to RabbitMQ."); rabbitChannels = new ArrayList<>(); - for (int i = 0; i < CHANNEL_COUNT; i++) { + if(channelsCount == null || channelsCount == 0 ) { + channelsCount = 1; + } + for (int i = 0; i < channelsCount; i++) { rabbitChannels.add(rabbitConnection.createChannel()); } } catch (IOException | TimeoutException e) { @@ -243,12 +257,17 @@ private void initService() { password = getValuesFromSystemProperties(protocol + ".rabbitmq.password"); } + if (channelsCount == null ) { + channelsCount = Integer.getInteger(getValuesFromSystemProperties(protocol + ".rabbitmq.channelsCount")); + } } + private void setValues() { host = getValuesFromSystemProperties(PropertiesConfig.MESSAGE_BUS_HOST); port = Integer.getInteger(PropertiesConfig.MESSAGE_BUS_PORT); domainId = getValuesFromSystemProperties(PropertiesConfig.DOMAIN_ID); + channelsCount = Integer.getInteger(PropertiesConfig.CHANNELS_COUNT); tlsVer = getValuesFromSystemProperties(PropertiesConfig.TLS); exchangeName = getValuesFromSystemProperties(PropertiesConfig.EXCHANGE_NAME); usePersitance = Boolean.getBoolean(PropertiesConfig.USE_PERSISTENCE); @@ -403,4 +422,5 @@ private Channel giveMeRandomChannel() { } return rabbitChannels.get(random.nextInt(rabbitChannels.size())); } + } diff --git a/publish-common/src/main/resources/config.template.properties b/publish-common/src/main/resources/config.template.properties index afd60557..60778f80 100644 --- a/publish-common/src/main/resources/config.template.properties +++ b/publish-common/src/main/resources/config.template.properties @@ -24,6 +24,7 @@ eiffelsemantics.rabbitmq.tls: eiffelsemantics.rabbitmq.exchangeName: amq.direct eiffelsemantics.rabbitmq.domainId: eiffelxxx + eiffelsemantics.rabbitmq.channelsCount: 1 # properties for server used to generate messages generate.server.uri: http://127.0.0.1:8080 diff --git a/publish-common/src/test/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelperUnitTest.java b/publish-common/src/test/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelperUnitTest.java index cb1a203a..57193279 100644 --- a/publish-common/src/test/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelperUnitTest.java +++ b/publish-common/src/test/java/com/ericsson/eiffel/remrem/publish/helper/RMQHelperUnitTest.java @@ -51,6 +51,7 @@ public class RMQHelperUnitTest { private static final String tlsVer= "1.2"; private static final String usePersistence= "1.2"; private static final String domainId= "eiffelxxx"; + private static final Integer channelsCount= 1; private String protocol = "eiffelsemantics"; private String createExchange = "true"; @@ -100,6 +101,8 @@ private void initProperties() { System.setProperty(key, createExchange); key = PropertiesConfig.DOMAIN_ID; System.setProperty(key, domainId); + key = PropertiesConfig.CHANNELS_COUNT; + System.setProperty(key, Integer.toString(channelsCount)); } private void cleanProperties() { @@ -121,6 +124,8 @@ private void cleanProperties() { System.setProperty(key, createExchange); key = PropertiesConfig.DOMAIN_ID; System.clearProperty(key); + key = PropertiesConfig.CHANNELS_COUNT; + System.clearProperty(key); } @Test public void getHostTest() { diff --git a/publish-service/src/main/resources/application.properties b/publish-service/src/main/resources/application.properties index 1b9507cc..93c0cdee 100644 --- a/publish-service/src/main/resources/application.properties +++ b/publish-service/src/main/resources/application.properties @@ -17,8 +17,8 @@ jasypt.encryptor.password: Identifies the domain that produces the event. + -cc,--channelsCount Number of channels connected to message + bus, default is 1 + -en,--exchange_name Exchange name. -ce,--create_exchange option to denote if we need to create an exchange @@ -102,6 +105,14 @@ java -jar publish-cli.jar -f publishMessages.json -en mb-exchange -mb hostname - java -jar publish-cli.jar -f publishMessages.json -en mb-exchange -mb hostname -domain publish-domain -mp eiffelsemantics -rk myroutingkey ``` +**If you want to change the number of channels connected to RabbitMQ to publish messages:** +channelsCount default value is 1. +If the number of channels increases then the CPU load and memory usage on the RabbitMq increases. + +``` +java -jar publish-cli.jar -f publishMessages.json -en mb-exchange -mb hostname -domain publish-domain -mp eiffelsemantics -cc numberof-channels +``` + **If you want to have the message publishing on RabbitMQ with given tag:** ``` diff --git a/wiki/markdown/usage/service.md b/wiki/markdown/usage/service.md index 1a781b0f..021cfbfd 100644 --- a/wiki/markdown/usage/service.md +++ b/wiki/markdown/usage/service.md @@ -141,6 +141,7 @@ These parameters are related to RabbitMQ Server, which will be used for publishi .rabbitmq.exchangeName: .rabbitmq.createExchangeIfNotExisting: .rabbitmq.domainId: +.rabbitmq.channelsCount: ``` ``` @@ -151,8 +152,8 @@ These parameters are related to RabbitMQ Server, which will be used for publishi # must exist #rabbitmq.exchange.name=eiffel.poc -rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }, \ -{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx","createExchangeIfNotExisting":true }] +rabbitmq.instances.jsonlist=[{ "mp": "eiffelsemantics", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1" ,"createExchangeIfNotExisting":true }, \ +{ "mp": "eiffel3", "host": "127.0.0.1", "port": "5672", "username": "guest", "password": "guest", "tls": "", "exchangeName": "amq.direct", "domainId": "eiffelxxx", "channelsCount": "1" ,"createExchangeIfNotExisting":true }] ``` Application launch is terminated if exchange is unavailable and createExchangeIfNotExisting is set to false. Application can create the exchange by adding the below property in config.properties file