From bd0097baadce3b5b291d3ad94d42c90170636400 Mon Sep 17 00:00:00 2001 From: Amin Borjian Date: Sat, 30 Nov 2019 16:14:46 +0330 Subject: [PATCH] Upgrade Kafka version from 1.0.0 to 1.1.1 (#4) Upgrade kafka version from 1.0.0 to 1.1.1 --- README.md | 4 +- pom.xml | 10 +++- .../java/ir/sahab/kafkarule/KafkaRule.java | 60 ++++++++++++------- 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index e2efcc7..d19ebbd 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ JUnit rule which provides an embedded Kafka server. It can both setup its own ZK ## Sample Usage -``` +```java private static final String TOPIC_NAME = "test-topic"; @ClassRule @@ -33,7 +33,7 @@ public void test() { } ``` It is also possible to use a shared available ZK server by the Kafka rule: -``` +```java private static final String ZK_ADDRESS = "127.0.1.1:" + anOpenPort(); @ClassRule diff --git a/pom.xml b/pom.xml index 98f8c2f..20577cd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,11 @@ ir.sahab kafka-rule - 1.3 + 1.4.0 + + + 1.1.1 + @@ -23,8 +27,8 @@ org.apache.kafka - kafka_2.11 - 1.0.0 + kafka_2.12 + ${kafka.version} org.slf4j diff --git a/src/main/java/ir/sahab/kafkarule/KafkaRule.java b/src/main/java/ir/sahab/kafkarule/KafkaRule.java index 0005a7a..004846b 100644 --- a/src/main/java/ir/sahab/kafkarule/KafkaRule.java +++ b/src/main/java/ir/sahab/kafkarule/KafkaRule.java @@ -9,14 +9,16 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Arrays; import java.util.Comparator; import java.util.Properties; -import kafka.admin.TopicCommand; -import kafka.admin.TopicCommand.TopicCommandOptions; +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode.Disabled$; import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -107,15 +109,20 @@ protected void before() throws Throwable { logDir = Files.createTempDirectory("kafka").toFile(); Properties kafkaBrokerConfig = new Properties(); - kafkaBrokerConfig.setProperty("zookeeper.connect", zkAddress); - kafkaBrokerConfig.setProperty("broker.id", "1"); - kafkaBrokerConfig.setProperty("host.name", localIp); - kafkaBrokerConfig.setProperty("advertised.host.name", localIp); - kafkaBrokerConfig.setProperty("port", Integer.toString(brokerPort)); - kafkaBrokerConfig.setProperty("log.dir", logDir.getAbsolutePath()); - kafkaBrokerConfig.setProperty("log.flush.interval.messages", "1"); - kafkaBrokerConfig.setProperty("auto.create.topics.enable", "true"); - kafkaBrokerConfig.setProperty("offsets.topic.replication.factor", "1"); + kafkaBrokerConfig.setProperty(KafkaConfig.ZkConnectProp(), zkAddress); + kafkaBrokerConfig.setProperty(KafkaConfig.BrokerIdProp(), "1"); + // Configs 'host.name' and 'advertised.host.name' are deprecated since Kafka version 1.1. + // Use 'listeners' and 'advertised.listeners' instead of them. See this: + // https://kafka.apache.org/11/documentation.html#configuration + kafkaBrokerConfig.setProperty(KafkaConfig.ListenersProp(), + String.format("PLAINTEXT://%s:%s", localIp, brokerPort)); + kafkaBrokerConfig.setProperty(KafkaConfig.AdvertisedListenersProp(), + String.format("PLAINTEXT://%s:%s", localIp, brokerPort)); + kafkaBrokerConfig.setProperty(KafkaConfig.PortProp(), Integer.toString(brokerPort)); + kafkaBrokerConfig.setProperty(KafkaConfig.LogDirProp(), logDir.getAbsolutePath()); + kafkaBrokerConfig.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1"); + kafkaBrokerConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "true"); + kafkaBrokerConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1"); if (additionalBrokerConfigs != null) { kafkaBrokerConfig.putAll(additionalBrokerConfigs); } @@ -167,16 +174,23 @@ public void createTopic(String topicName) { createTopic(topicName, 1); } - public void createTopic(String topicName, Integer numPartitions) { - String[] arguments = new String[]{"--create", "--zookeeper", zkAddress, - "--replication-factor", "1", - "--partitions", "" + numPartitions, - "--topic", topicName}; - TopicCommandOptions opts = new TopicCommandOptions(arguments); - ZkUtils zkUtils = ZkUtils.apply(opts.options().valueOf(opts.zkConnectOpt()), - 30000, 30000, JaasUtils.isZkSecurityEnabled()); - logger.info("Executing: CreateTopic " + Arrays.toString(arguments)); - TopicCommand.createTopic(zkUtils, opts); + public void createTopic(String topicName, int numPartitions) { + ZkClient zkClient = null; + ZkUtils zkUtils; + try { + // We have to pass the serializer class to ZkClient constructor Otherwise createTopic will return + // without error. The topic will exist in zookeeper and be returned when listing topics, but Kafka + // itself does not create the topic. + zkClient = new ZkClient(zkAddress, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = new ZkUtils(zkClient, new ZkConnection(zkAddress), JaasUtils.isZkSecurityEnabled()); + logger.info("Executing create Topic: " + topicName + ", partitions: " + numPartitions + + ", replication-factor: 1."); + AdminUtils.createTopic(zkUtils, topicName, numPartitions, 1, new Properties(), Disabled$.MODULE$); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } } /** @@ -272,7 +286,7 @@ public String getBrokerAddress() { return localIp + ":" + brokerPort; } - static Integer anOpenPort() { + static int anOpenPort() { try (ServerSocket socket = new ServerSocket(0)) { return socket.getLocalPort(); } catch (IOException e) {