Skip to content

Commit

Permalink
Upgrade Kafka version from 1.0.0 to 1.1.1 (#4)
Browse files Browse the repository at this point in the history
Upgrade kafka version from 1.0.0 to 1.1.1
  • Loading branch information
Borjianamin98 authored and bozorgzadeh committed Nov 30, 2019
1 parent ae479f2 commit bd0097b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 28 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ JUnit rule which provides an embedded Kafka server. It can both setup its own ZK

## Sample Usage

```
```java
private static final String TOPIC_NAME = "test-topic";

@ClassRule
Expand Down Expand Up @@ -33,7 +33,7 @@ public void test() {
}
```
It is also possible to use a shared available ZK server by the Kafka rule:
```
```java
private static final String ZK_ADDRESS = "127.0.1.1:" + anOpenPort();

@ClassRule
Expand Down
10 changes: 7 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

<groupId>ir.sahab</groupId>
<artifactId>kafka-rule</artifactId>
<version>1.3</version>
<version>1.4.0</version>

<properties>
<kafka.version>1.1.1</kafka.version>
</properties>

<repositories>
<repository>
Expand All @@ -23,8 +27,8 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
60 changes: 37 additions & 23 deletions src/main/java/ir/sahab/kafkarule/KafkaRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Properties;
import kafka.admin.TopicCommand;
import kafka.admin.TopicCommand.TopicCommandOptions;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode.Disabled$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -107,15 +109,20 @@ protected void before() throws Throwable {
logDir = Files.createTempDirectory("kafka").toFile();
Properties kafkaBrokerConfig = new Properties();

kafkaBrokerConfig.setProperty("zookeeper.connect", zkAddress);
kafkaBrokerConfig.setProperty("broker.id", "1");
kafkaBrokerConfig.setProperty("host.name", localIp);
kafkaBrokerConfig.setProperty("advertised.host.name", localIp);
kafkaBrokerConfig.setProperty("port", Integer.toString(brokerPort));
kafkaBrokerConfig.setProperty("log.dir", logDir.getAbsolutePath());
kafkaBrokerConfig.setProperty("log.flush.interval.messages", "1");
kafkaBrokerConfig.setProperty("auto.create.topics.enable", "true");
kafkaBrokerConfig.setProperty("offsets.topic.replication.factor", "1");
kafkaBrokerConfig.setProperty(KafkaConfig.ZkConnectProp(), zkAddress);
kafkaBrokerConfig.setProperty(KafkaConfig.BrokerIdProp(), "1");
// Configs 'host.name' and 'advertised.host.name' are deprecated since Kafka version 1.1.
// Use 'listeners' and 'advertised.listeners' instead of them. See this:
// https://kafka.apache.org/11/documentation.html#configuration
kafkaBrokerConfig.setProperty(KafkaConfig.ListenersProp(),
String.format("PLAINTEXT://%s:%s", localIp, brokerPort));
kafkaBrokerConfig.setProperty(KafkaConfig.AdvertisedListenersProp(),
String.format("PLAINTEXT://%s:%s", localIp, brokerPort));
kafkaBrokerConfig.setProperty(KafkaConfig.PortProp(), Integer.toString(brokerPort));
kafkaBrokerConfig.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath());
kafkaBrokerConfig.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
kafkaBrokerConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "true");
kafkaBrokerConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
if (additionalBrokerConfigs != null) {
kafkaBrokerConfig.putAll(additionalBrokerConfigs);
}
Expand Down Expand Up @@ -167,16 +174,23 @@ public void createTopic(String topicName) {
createTopic(topicName, 1);
}

public void createTopic(String topicName, Integer numPartitions) {
String[] arguments = new String[]{"--create", "--zookeeper", zkAddress,
"--replication-factor", "1",
"--partitions", "" + numPartitions,
"--topic", topicName};
TopicCommandOptions opts = new TopicCommandOptions(arguments);
ZkUtils zkUtils = ZkUtils.apply(opts.options().valueOf(opts.zkConnectOpt()),
30000, 30000, JaasUtils.isZkSecurityEnabled());
logger.info("Executing: CreateTopic " + Arrays.toString(arguments));
TopicCommand.createTopic(zkUtils, opts);
public void createTopic(String topicName, int numPartitions) {
ZkClient zkClient = null;
ZkUtils zkUtils;
try {
// We have to pass the serializer class to ZkClient constructor Otherwise createTopic will return
// without error. The topic will exist in zookeeper and be returned when listing topics, but Kafka
// itself does not create the topic.
zkClient = new ZkClient(zkAddress, 30000, 30000, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zkAddress), JaasUtils.isZkSecurityEnabled());
logger.info("Executing create Topic: " + topicName + ", partitions: " + numPartitions
+ ", replication-factor: 1.");
AdminUtils.createTopic(zkUtils, topicName, numPartitions, 1, new Properties(), Disabled$.MODULE$);
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}

/**
Expand Down Expand Up @@ -272,7 +286,7 @@ public String getBrokerAddress() {
return localIp + ":" + brokerPort;
}

static Integer anOpenPort() {
static int anOpenPort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
Expand Down

0 comments on commit bd0097b

Please sign in to comment.