From d62c1fce495f0f68a3d3317906cdf5e07c1e0e6a Mon Sep 17 00:00:00 2001 From: ramitg254 Date: Wed, 22 Oct 2025 10:55:46 +0530 Subject: [PATCH] HIVE-29238:upgrade kafka version to fix CVE-2024-31141 and CVE-2021-38153 --- itests/qtest-druid/pom.xml | 7 +++++- .../hive/kafka/SingleNodeKafkaCluster.java | 12 ++++++---- .../hadoop/hive/kafka/HiveKafkaProducer.java | 24 ++++++++++++++----- .../hive/kafka/KafkaRecordIterator.java | 2 +- .../hive/kafka/HiveKafkaProducerTest.java | 13 +++++++--- .../hive/kafka/KafkaBrokerResource.java | 10 ++++---- .../hive/kafka/KafkaRecordIteratorTest.java | 3 ++- pom.xml | 2 +- 8 files changed, 51 insertions(+), 22 deletions(-) diff --git a/itests/qtest-druid/pom.xml b/itests/qtest-druid/pom.xml index 2e3b84cbf31a..4c1036b03595 100644 --- a/itests/qtest-druid/pom.xml +++ b/itests/qtest-druid/pom.xml @@ -36,7 +36,7 @@ 10.11.1.1 16.0.1 4.1.0 - 2.5.0 + 3.9.1 4.1.0 1.7.30 @@ -226,6 +226,11 @@ kafka-clients ${kafka.test.version} + + org.apache.kafka + kafka-server + ${kafka.test.version} + org.slf4j slf4j-api diff --git a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java index 746830a9a6b8..528b8a6649c2 100644 --- a/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java +++ b/itests/qtest-druid/src/main/java/org/apache/hive/kafka/SingleNodeKafkaCluster.java @@ -19,7 +19,7 @@ package org.apache.hive.kafka; import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import kafka.server.KafkaServer; import org.apache.commons.io.FileUtils; import org.apache.hadoop.service.AbstractService; @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Time; import com.google.common.base.Throwables; import com.google.common.io.Files; @@ -43,6 +44,7 @@ import java.util.List; import java.util.Properties; import java.util.stream.IntStream; +import scala.Option; /** * This class has the hooks to start and stop single node kafka cluster. @@ -54,7 +56,7 @@ public class SingleNodeKafkaCluster extends AbstractService { private static final String LOCALHOST = "localhost"; - private final KafkaServerStartable serverStartable; + private final KafkaServer server; private final int brokerPort; private final String kafkaServer; @@ -94,13 +96,13 @@ public SingleNodeKafkaCluster(String name, String logDir, Integer zkPort, Intege properties.setProperty("transaction.state.log.min.isr", String.valueOf(1)); properties.setProperty("log.cleaner.dedupe.buffer.size", "1048577"); - this.serverStartable = new KafkaServerStartable(KafkaConfig.fromProps(properties)); + this.server = new KafkaServer(KafkaConfig.fromProps(properties), Time.SYSTEM, Option.empty(), false); } @Override protected void serviceStart() throws Exception { - serverStartable.startup(); + server.startup(); log.info("Kafka Server Started on port {}", brokerPort); } @@ -108,7 +110,7 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { log.info("Stopping Kafka Server"); - serverStartable.shutdown(); + server.shutdown(); log.info("Kafka Server Stopped"); } diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java index 7a7d0360a015..72eeabbf1f8f 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.Uuid; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Future; /** @@ -67,6 +69,11 @@ class HiveKafkaProducer implements Producer { kafkaProducer = new KafkaProducer<>(properties); } + @Override + public Uuid clientInstanceId(Duration timeout) { + throw new UnsupportedOperationException(); + } + @Override public void initTransactions() { kafkaProducer.initTransactions(); } @@ -138,11 +145,11 @@ synchronized void resumeTransaction(long producerId, short epoch) { Object transactionManager = getValue(kafkaProducer, "transactionManager"); - Object topicPartitionBookkeeper = getValue(transactionManager, "topicPartitionBookkeeper"); + Object txnPartitionMap = getValue(transactionManager, "txnPartitionMap"); invoke(transactionManager, "transitionTo", getEnum("org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); - invoke(topicPartitionBookkeeper, "reset"); + invoke(txnPartitionMap, "reset"); Object producerIdAndEpoch = getValue(transactionManager, "producerIdAndEpoch"); setValue(producerIdAndEpoch, "producerId", producerId); setValue(producerIdAndEpoch, "epoch", epoch); @@ -181,10 +188,15 @@ short getEpoch() { */ private void flushNewPartitions() { LOG.info("Flushing new partitions"); - TransactionalRequestResult result = enqueueNewPartitions(); - Object sender = getValue(kafkaProducer, "sender"); - invoke(sender, "wakeup"); - result.await(); + Object transactionManager = getValue(kafkaProducer, "transactionManager"); + Set newPartitionsInTransaction = + (Set) getValue(transactionManager, "newPartitionsInTransaction"); + if (!newPartitionsInTransaction.isEmpty()) { + TransactionalRequestResult result = enqueueNewPartitions(); + Object sender = getValue(kafkaProducer, "sender"); + invoke(sender, "wakeup"); + result.await(); + } } private synchronized TransactionalRequestResult enqueueNewPartitions() { diff --git a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java index 74614dea9168..66784d28cbb1 100644 --- a/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java +++ b/kafka-handler/src/java/org/apache/hadoop/hive/kafka/KafkaRecordIterator.java @@ -140,7 +140,7 @@ class KafkaRecordIterator implements Iterator> { } } else { // case seek to beginning of stream - consumer.seekToBeginning(Collections.singleton(topicPartition)); + consumer.seekToBeginning(topicPartitionList); // seekToBeginning is lazy thus need to call position() or poll(0) this.startOffset = consumer.position(topicPartition); LOG.info("Consumer at beginning of topic partition [{}], current start offset [{}]", diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java index 8c9ed5f99b19..934e8eb30fba 100644 --- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java +++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/HiveKafkaProducerTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -158,7 +159,9 @@ @Test(expected = org.apache.kafka.common.KafkaException.class) public void testWrongEpochAndId() { HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties); secondProducer.resumeTransaction(3434L, (short) 12); - secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group"); + secondProducer.sendOffsetsToTransaction(Collections.singletonMap( + new TopicPartition("dummy_topic", 0), + new OffsetAndMetadata(0L)), "__dummy_consumer_group"); secondProducer.close(Duration.ZERO); } @@ -169,7 +172,9 @@ producer.close(Duration.ZERO); HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties); secondProducer.resumeTransaction(pid, (short) 12); - secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group"); + secondProducer.sendOffsetsToTransaction(Collections.singletonMap( + new TopicPartition("dummy_topic", 0), + new OffsetAndMetadata(0L)), "__dummy_consumer_group"); secondProducer.close(Duration.ZERO); } @@ -180,7 +185,9 @@ producer.close(Duration.ZERO); HiveKafkaProducer secondProducer = new HiveKafkaProducer(producerProperties); secondProducer.resumeTransaction(45L, epoch); - secondProducer.sendOffsetsToTransaction(ImmutableMap.of(), "__dummy_consumer_group"); + secondProducer.sendOffsetsToTransaction(Collections.singletonMap( + new TopicPartition("dummy_topic", 0), + new OffsetAndMetadata(0L)), "__dummy_consumer_group"); secondProducer.close(Duration.ZERO); } } diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java index e2f8bbafe016..84a79edeca07 100644 --- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java +++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaBrokerResource.java @@ -26,7 +26,7 @@ import kafka.zk.EmbeddedZookeeper; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hive.common.IPStackUtils; -import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.network.ConnectionMode; import org.apache.kafka.common.utils.Time; import org.apache.kafka.test.TestSslUtils; import org.junit.rules.ExternalResource; @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; +import scala.Option; /** * Test Helper Class to start and stop a kafka broker. @@ -106,7 +107,8 @@ KafkaBrokerResource enableSASL(String principal, String keytab) { brokerProps.setProperty("listener.name.l2.gssapi.sasl.jaas.config", jaasConfig); brokerProps.setProperty("listener.name.l3.gssapi.sasl.jaas.config", jaasConfig); truststoreFile = File.createTempFile("kafka_truststore", "jks"); - brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(Mode.SERVER).createNewTrustStore(truststoreFile).build()); + brokerProps.putAll(new TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER) + .createNewTrustStore(truststoreFile).build()); brokerProps.setProperty("delegation.token.master.key", "AnyValueShouldDoHereItDoesntMatter"); } brokerProps.setProperty("offsets.topic.replication.factor", "1"); @@ -116,9 +118,9 @@ KafkaBrokerResource enableSASL(String principal, String keytab) { kafkaServer = TestUtils.createServer(config, Time.SYSTEM); kafkaServer.startup(); kafkaServer.zkClient(); - adminZkClient = new AdminZkClient(kafkaServer.zkClient()); + adminZkClient = new AdminZkClient(kafkaServer.zkClient(), Option.empty()); LOG.info("Creating kafka TOPIC [{}]", TOPIC); - adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + adminZkClient.createTopic(TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$, false); } /** diff --git a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java index b2dbf12817e5..3df2c8c4231a 100644 --- a/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java +++ b/kafka-handler/src/test/org/apache/hadoop/hive/kafka/KafkaRecordIteratorTest.java @@ -44,6 +44,7 @@ import javax.annotation.Nullable; import java.nio.charset.Charset; +import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -304,7 +305,7 @@ private static void sendData(List> recordList, @N @After public void tearDown() { this.kafkaRecordIterator = null; if (this.consumer != null) { - this.consumer.close(); + this.consumer.close(Duration.ZERO); } } diff --git a/pom.xml b/pom.xml index 0b6e318a1dd3..e4a248b83483 100644 --- a/pom.xml +++ b/pom.xml @@ -172,7 +172,7 @@ 4.13.2 5.13.3 5.13.3 - 2.5.0 + 3.9.1 5.5.0 1.11.9 1.17.0