Skip to content

Commit

Permalink
STORM-4004 - Upgrade Kafka Clients to 3.6.0 (#3604)
Browse files Browse the repository at this point in the history
* STORM-4004 - Upgrade Kafka Clients to 3.6.0

* STORM-4004 - Upgrade Kafka Clients to 3.6.0

* STORM-4004 - Fix license
  • Loading branch information
rzo1 authored Dec 4, 2023
1 parent 2785507 commit 8635505
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 13 deletions.
7 changes: 5 additions & 2 deletions DEPENDENCY-LICENSES
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ List of third-party dependencies grouped by their license type.
* Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - http://hc.apache.org/httpcomponents-core-ga)
* Apache HttpCore NIO (org.apache.httpcomponents:httpcore-nio:4.4.15 - http://hc.apache.org/httpcomponents-core-ga)
* Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
* Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - http://kafka.apache.org)
* Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 - https://kafka.apache.org)
* Apache Maven Artifact Transfer (org.apache.maven.shared:maven-artifact-transfer:0.9.1 - https://maven.apache.org/shared/maven-artifact-transfer/)
* Apache Maven Common Artifact Filters (org.apache.maven.shared:maven-common-artifact-filters:3.0.1 - https://maven.apache.org/shared/maven-common-artifact-filters/)
* Apache Maven Dependency Tree (org.apache.maven.shared:maven-dependency-tree:2.2 - http://maven.apache.org/shared/maven-dependency-tree/)
Expand Down Expand Up @@ -319,7 +319,6 @@ List of third-party dependencies grouped by their license type.
* Lucene Sandbox (org.apache.lucene:lucene-sandbox:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-sandbox)
* Lucene Spatial 3D (org.apache.lucene:lucene-spatial3d:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-spatial3d)
* Lucene Suggest (org.apache.lucene:lucene-suggest:8.11.1 - https://lucene.apache.org/lucene-parent/lucene-suggest)
* LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java)
* LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java)
* Maven Aether Provider (org.apache.maven:maven-aether-provider:3.0 - http://maven.apache.org/maven-aether-provider/)
* Maven Artifact (org.apache.maven:maven-artifact:3.0 - http://maven.apache.org/maven-artifact/)
Expand Down Expand Up @@ -516,6 +515,10 @@ List of third-party dependencies grouped by their license type.

* dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org)

BSD 2-Clause License

* zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni)

BSD-3-Clause

* asm (org.ow2.asm:asm:9.3 - http://asm.ow2.io/)
Expand Down
5 changes: 3 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ The license texts of these dependencies can be found in the licenses directory.
* Apache HttpClient (org.apache.httpcomponents:httpclient:4.5.14 - http://hc.apache.org/httpcomponents-client-ga)
* Apache HttpCore (org.apache.httpcomponents:httpcore:4.4.16 - http://hc.apache.org/httpcomponents-core-ga)
* Apache Ivy (org.apache.ivy:ivy:2.4.0 - http://ant.apache.org/ivy/)
* Apache Kafka (org.apache.kafka:kafka-clients:0.11.0.3 - http://kafka.apache.org)
* Apache Kafka (org.apache.kafka:kafka-clients:3.6.0 - https://kafka.apache.org)
* Apache Log4j 1.x Compatibility API (org.apache.logging.log4j:log4j-1.2-api:2.21.1 - https://logging.apache.org/log4j/2.x/log4j-1.2-api/)
* Apache Log4j API (org.apache.logging.log4j:log4j-api:2.21.1 - https://logging.apache.org/log4j/2.x/log4j-api/)
* Apache Log4j Core (org.apache.logging.log4j:log4j-core:2.21.1 - https://logging.apache.org/log4j/2.x/log4j-core/)
Expand Down Expand Up @@ -886,7 +886,7 @@ The license texts of these dependencies can be found in the licenses directory.
* Kerby PKIX Project (org.apache.kerby:kerby-pkix:1.0.1 - http://directory.apache.org/kerby/kerby-pkix)
* Kerby Util (org.apache.kerby:kerby-util:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-util)
* Kerby XDR Project (org.apache.kerby:kerby-xdr:1.0.1 - http://directory.apache.org/kerby/kerby-common/kerby-xdr)
* LZ4 and xxHash (net.jpountz.lz4:lz4:1.3.0 - https://github.com/jpountz/lz4-java)
* LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java)
* Maven Artifact (org.apache.maven:maven-artifact:3.6.0 - https://maven.apache.org/ref/3.6.0/maven-artifact/)
* Maven Artifact Resolver API (org.apache.maven.resolver:maven-resolver-api:1.3.3 - https://maven.apache.org/resolver/maven-resolver-api/)
* Maven Artifact Resolver Connector Basic (org.apache.maven.resolver:maven-resolver-connector-basic:1.3.3 - https://maven.apache.org/resolver/maven-resolver-connector-basic/)
Expand Down Expand Up @@ -1030,6 +1030,7 @@ The license texts of these dependencies can be found in the licenses directory.
BSD 2-Clause license

* dnsjava (dnsjava:dnsjava:2.1.7 - http://www.dnsjava.org)
* zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni)

BSD-3-Clause

Expand Down
4 changes: 4 additions & 0 deletions examples/storm-kafka-client-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
<artifactId>storm-kafka-client-examples</artifactId>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-client</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions external/storm-kafka-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.13</artifactId>
<version>${storm.kafka.client.version}</version>
<classifier>test</classifier>
<scope>test</scope>
Expand All @@ -130,7 +130,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<artifactId>kafka_2.13</artifactId>
<version>${storm.kafka.client.version}</version>
<scope>test</scope>
<exclusions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public KafkaSpoutConfig(Builder<K, V> builder) {
this.processingGuarantee = builder.processingGuarantee;
this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
this.setConsumerGroupId(builder.groupId);
}

/**
Expand Down Expand Up @@ -122,6 +123,7 @@ public static class Builder<K, V> extends CommonKafkaSpoutConfig.Builder<K, V, B
private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
private boolean tupleTrackingEnforced = false;
private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
private String groupId;

public Builder(String bootstrapServers, String... topics) {
super(bootstrapServers, topics);
Expand Down Expand Up @@ -160,6 +162,15 @@ public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
return this;
}

/**
* Specifies the group id.
* @param groupId the group id
*/
public Builder<K, V> setGroupId(String groupId) {
this.groupId = groupId;
return this;
}

/**
* Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
* Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
Expand Down Expand Up @@ -348,6 +359,12 @@ public String getConsumerGroupId() {
return (String) getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG);
}

public void setConsumerGroupId(String groupId) {
if (groupId != null) {
getKafkaProps().put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
}

public int getMaxUncommittedOffsets() {
return maxUncommittedOffsets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.MockTime;
import org.apache.storm.testing.TmpPath;

public class KafkaUnit {
Expand Down Expand Up @@ -64,8 +64,7 @@ public void setUp() throws Exception {
brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
brokerProps.setProperty("offsets.topic.replication.factor", "1");
KafkaConfig config = new KafkaConfig(brokerProps);
MockTime mock = new MockTime();
kafkaServer = TestUtils.createServer(config, mock);
kafkaServer = TestUtils.createServer(config, new MockTime());

// setup default Producer
createProducer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.storm.Testing;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
Expand Down Expand Up @@ -66,7 +67,9 @@ private Tuple createTestTuple(String... values) {

@Test
public void testSimple() {
MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null);
MockProducer<String, String> producer = new MockProducer<>(
Cluster.empty(), false,
null, new StringSerializer(), new StringSerializer());
KafkaBolt<String, String> bolt = makeBolt(producer);

OutputCollector collector = mock(OutputCollector.class);
Expand Down Expand Up @@ -95,7 +98,9 @@ public void testSimple() {

@Test
public void testSimpleWithError() {
MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null);
MockProducer<String, String> producer = new MockProducer<>(
Cluster.empty(), false,
null, new StringSerializer(), new StringSerializer());
KafkaBolt<String, String> bolt = makeBolt(producer);

OutputCollector collector = mock(OutputCollector.class);
Expand Down Expand Up @@ -126,7 +131,9 @@ public void testSimpleWithError() {

@Test
public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(), false, null, null, null);
MockProducer<String, String> producer = new MockProducer<>(
Cluster.empty(), false,
null, new StringSerializer(), new StringSerializer());
KafkaBolt<String, String> bolt = makeBolt(producer);

PreparableCallback customCallback = mock(PreparableCallback.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public KafkaSpoutNullTupleTest() {
KafkaSpoutConfig<String, String> createSpoutConfig() {
return KafkaSpoutConfig.builder("127.0.0.1:" + kafkaUnitExtension.getKafkaUnit().getKafkaPort(),
Pattern.compile(SingleTopicKafkaSpoutConfiguration.TOPIC))
.setGroupId("test")
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
.setRecordTranslator(new NullRecordTranslator<>())
.build();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@

<jackson.version>2.15.2</jackson.version>
<jackson.databind.version>2.15.2</jackson.databind.version>
<storm.kafka.client.version>0.11.0.3</storm.kafka.client.version>
<storm.kafka.client.version>3.6.0</storm.kafka.client.version>
<testcontainers.version>1.19.1</testcontainers.version>

<!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
Expand Down

0 comments on commit 8635505

Please sign in to comment.