From 449d4a6900ba09a07c7aa87018f63ab843c57407 Mon Sep 17 00:00:00 2001 From: hamzei Date: Tue, 19 Feb 2019 12:33:50 +0330 Subject: [PATCH 1/3] Add specific kafka broker configuration support --- .../java/ir/sahab/kafkarule/KafkaRule.java | 29 +++++++++++++++++++ .../ir/sahab/kafkarule/KafkaRuleTest.java | 7 +++++ 2 files changed, 36 insertions(+) diff --git a/src/main/java/ir/sahab/kafkarule/KafkaRule.java b/src/main/java/ir/sahab/kafkarule/KafkaRule.java index a86007a..0005a7a 100644 --- a/src/main/java/ir/sahab/kafkarule/KafkaRule.java +++ b/src/main/java/ir/sahab/kafkarule/KafkaRule.java @@ -44,6 +44,8 @@ public class KafkaRule extends ExternalResource { private KafkaServerStartable broker; private File logDir; + private Properties additionalBrokerConfigs; + private boolean selfManagedZooKeeper = false; private EmbeddedZkServer zkServer; @@ -72,6 +74,30 @@ public KafkaRule(String zkAddress) { initAddresses(zkAddress); } + /** + * Creates a rule to setup an embedded Kafka server in the case that Kafka rule should setup its + * own embedded ZooKeeper server. + * + * @param kafkaBrokerConfig additional kafka broker config. + */ + public KafkaRule(Properties kafkaBrokerConfig){ + this(); + additionalBrokerConfigs = kafkaBrokerConfig; + } + + /** + * Creates a rule to setup an embedded Kafka server in the case that there is a ZooKeeper server + * available and the Kafka broker is supposed to use that ZooKeeper. + * + * @param zkAddress the address of embedded ZooKeeper. It should be in format of "IP:PORT" and + * the IP should be one of the IPs of the local system. + * @param kafkaBrokerConfigs additional kafka broker configs. + */ + public KafkaRule(String zkAddress, Properties kafkaBrokerConfigs){ + this(zkAddress); + additionalBrokerConfigs = kafkaBrokerConfigs; + } + @Override protected void before() throws Throwable { if (selfManagedZooKeeper) { @@ -90,6 +116,9 @@ protected void before() throws Throwable { kafkaBrokerConfig.setProperty("log.flush.interval.messages", "1"); kafkaBrokerConfig.setProperty("auto.create.topics.enable", "true"); kafkaBrokerConfig.setProperty("offsets.topic.replication.factor", "1"); + if (additionalBrokerConfigs != null) { + kafkaBrokerConfig.putAll(additionalBrokerConfigs); + } broker = new KafkaServerStartable(new KafkaConfig(kafkaBrokerConfig)); broker.startup(); } diff --git a/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java b/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java index dfa177d..2ce429e 100644 --- a/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java +++ b/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java @@ -31,6 +31,13 @@ public class KafkaRuleTest { @ClassRule public static KafkaRule kafkaRuleWithSelfManagedZk = new KafkaRule(); + private static Properties properties = new Properties(); + static { + properties.setProperty("a","b"); + } + @ClassRule + public static KafkaRule kafkaRuleWithSpecificProperties = new KafkaRule(properties); + @BeforeClass public static void before() { kafkaRuleWithSelfManagedZk.createTopic(TOPIC_NAME, 1); From d9c5419cdc330da5eecd15f585f29bde9cedbe79 Mon Sep 17 00:00:00 2001 From: hamzei Date: Tue, 19 Feb 2019 12:38:33 +0330 Subject: [PATCH 2/3] Add specific kafka broker configuration support --- src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java b/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java index 2ce429e..263bb0b 100644 --- a/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java +++ b/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java @@ -33,7 +33,7 @@ public class KafkaRuleTest { private static Properties properties = new Properties(); static { - properties.setProperty("a","b"); + properties.setProperty("message.max.bytes","5000000"); } @ClassRule public static KafkaRule kafkaRuleWithSpecificProperties = new KafkaRule(properties); From cb8c2e7632f88c01b9f7d0ffb2c4b860b71f5d4c Mon Sep 17 00:00:00 2001 From: hamzei Date: Tue, 19 Feb 2019 12:38:33 +0330 Subject: [PATCH 3/3] Add specific kafka broker configuration support --- pom.xml | 2 +- src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 6745c95..98f8c2f 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ ir.sahab kafka-rule - 1.2 + 1.3 diff --git a/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java b/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java index 2ce429e..dfa177d 100644 --- a/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java +++ b/src/test/java/ir/sahab/kafkarule/KafkaRuleTest.java @@ -31,13 +31,6 @@ public class KafkaRuleTest { @ClassRule public static KafkaRule kafkaRuleWithSelfManagedZk = new KafkaRule(); - private static Properties properties = new Properties(); - static { - properties.setProperty("a","b"); - } - @ClassRule - public static KafkaRule kafkaRuleWithSpecificProperties = new KafkaRule(properties); - @BeforeClass public static void before() { kafkaRuleWithSelfManagedZk.createTopic(TOPIC_NAME, 1);