Skip to content

Commit

Permalink
Merge pull request #3 from hamzei/master
Browse files Browse the repository at this point in the history
Add specific kafka broker configuration support
  • Loading branch information
bozorgzadeh authored Feb 19, 2019
2 parents 0ecc2c8 + b85eef1 commit 44eed8c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>ir.sahab</groupId>
<artifactId>kafka-rule</artifactId>
<version>1.2</version>
<version>1.3</version>

<repositories>
<repository>
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/ir/sahab/kafkarule/KafkaRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class KafkaRule extends ExternalResource {
private KafkaServerStartable broker;
private File logDir;

private Properties additionalBrokerConfigs;

private boolean selfManagedZooKeeper = false;
private EmbeddedZkServer zkServer;

Expand Down Expand Up @@ -72,6 +74,30 @@ public KafkaRule(String zkAddress) {
initAddresses(zkAddress);
}

/**
* Creates a rule to setup an embedded Kafka server in the case that Kafka rule should setup its
* own embedded ZooKeeper server.
*
* @param kafkaBrokerConfig additional kafka broker config.
*/
public KafkaRule(Properties kafkaBrokerConfig){
this();
additionalBrokerConfigs = kafkaBrokerConfig;
}

/**
* Creates a rule to setup an embedded Kafka server in the case that there is a ZooKeeper server
* available and the Kafka broker is supposed to use that ZooKeeper.
*
* @param zkAddress the address of embedded ZooKeeper. It should be in format of "IP:PORT" and
* the IP should be one of the IPs of the local system.
* @param kafkaBrokerConfigs additional kafka broker configs.
*/
public KafkaRule(String zkAddress, Properties kafkaBrokerConfigs){
this(zkAddress);
additionalBrokerConfigs = kafkaBrokerConfigs;
}

@Override
protected void before() throws Throwable {
if (selfManagedZooKeeper) {
Expand All @@ -90,6 +116,9 @@ protected void before() throws Throwable {
kafkaBrokerConfig.setProperty("log.flush.interval.messages", "1");
kafkaBrokerConfig.setProperty("auto.create.topics.enable", "true");
kafkaBrokerConfig.setProperty("offsets.topic.replication.factor", "1");
if (additionalBrokerConfigs != null) {
kafkaBrokerConfig.putAll(additionalBrokerConfigs);
}
broker = new KafkaServerStartable(new KafkaConfig(kafkaBrokerConfig));
broker.startup();
}
Expand Down

0 comments on commit 44eed8c

Please sign in to comment.