From d3a2f289af304ffc9864e55c78d93cbb42c11170 Mon Sep 17 00:00:00 2001 From: Suvodeep Pyne Date: Thu, 12 Dec 2024 14:25:44 -0800 Subject: [PATCH 1/5] Added listTopics() interface in StreamMetadataProvider --- .../kafka20/KafkaStreamMetadataProvider.java | 13 +++++++++ .../kafka30/KafkaStreamMetadataProvider.java | 13 +++++++++ .../KafkaPartitionLevelConsumerTest.java | 29 +++++++++++++++++++ .../spi/stream/StreamMetadataProvider.java | 28 ++++++++++++++++++ 4 files changed, 83 insertions(+) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index 9683c4a31dc5..dc9e5d667ef4 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; @@ -166,6 +167,18 @@ public Map getCurrentPartitionLagState( return perPartitionLag; } + @Override + public List listTopics(Duration timeout) { + Map> namePartitionsMap = _consumer.listTopics(timeout); + if (namePartitionsMap == null) { + return Collections.emptyList(); + } + return namePartitionsMap.keySet() + .stream() + .map(topic -> new TopicMetadata().setName(topic)) + .collect(Collectors.toList()); + } + @Override public void close() throws IOException { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java index 3e260e733b32..a440e3b728a2 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.collections4.CollectionUtils; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; @@ -166,6 +167,18 @@ public Map getCurrentPartitionLagState( return perPartitionLag; } + @Override + public List listTopics(Duration timeout) { + Map> namePartitionsMap = _consumer.listTopics(timeout); + if (namePartitionsMap == null) { + return Collections.emptyList(); + } + return namePartitionsMap.keySet() + .stream() + .map(topic -> new TopicMetadata().setName(topic)) + .collect(Collectors.toList()); + } + @Override public void close() throws IOException { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java index 37c309ebd318..a79d549e5526 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java @@ -18,11 +18,15 @@ */ package org.apache.pinot.plugin.stream.kafka30; +import java.time.Duration; import java.time.Instant; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -39,6 +43,7 @@ import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamMessage; import org.apache.pinot.spi.stream.StreamMessageMetadata; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -399,4 +404,28 @@ public void testOffsetsExpired() } assertEquals(messageBatch.getOffsetOfNextBatch().toString(), "700"); } + + @Test + public void testListTopics() { + String streamType = "kafka"; + String streamKafkaBrokerList = _kafkaBrokerAddress; + String streamKafkaConsumerType = "simple"; + String clientId = "clientId"; + String tableNameWithType = "tableName_REALTIME"; + + Map streamConfigMap = new HashMap<>(); + streamConfigMap.put("streamType", streamType); + streamConfigMap.put("stream.kafka.topic.name", "NON_EXISTING_TOPIC"); + streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList); + streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType); + streamConfigMap.put("stream.kafka.consumer.factory.class.name", getKafkaConsumerFactoryName()); + streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass"); + StreamConfig streamConfig = new StreamConfig(tableNameWithType, streamConfigMap); + + KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig); + List topics = + streamMetadataProvider.listTopics(Duration.ofSeconds(60)); + Set topicNames = topics.stream().map(StreamMetadataProvider.TopicMetadata::getName).collect(Collectors.toSet()); + assertEquals(topicNames, Set.of(TEST_TOPIC_1, TEST_TOPIC_2, TEST_TOPIC_3)); + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 78847c06271b..0bbda2ca55d1 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -106,6 +107,33 @@ default Map getCurrentPartitionLagState( return result; } + /** + * Fetches the list of available topics/streams + * + * @param timeout Timeout for fetching the list of topics. If this is null, the implementation should use a default + * timeout. + * @return List of topics + */ + default List listTopics(Duration timeout) { + throw new UnsupportedOperationException(); + } + + /** + * Represents the metadata of a topic. This can be used to represent the topic name and other metadata in the future. + */ + class TopicMetadata { + private String _name; + + public String getName() { + return _name; + } + + public TopicMetadata setName(String name) { + _name = name; + return this; + } + } + class UnknownLagState extends PartitionLagState { } } From df2b3b4038c637f0704c07b3161cc8d92a484d49 Mon Sep 17 00:00:00 2001 From: Suvodeep Pyne Date: Thu, 12 Dec 2024 14:40:17 -0800 Subject: [PATCH 2/5] fix checkstyle --- .../stream/kafka30/KafkaPartitionLevelConsumerTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java index a79d549e5526..a26506b5324d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java @@ -423,9 +423,10 @@ public void testListTopics() { StreamConfig streamConfig = new StreamConfig(tableNameWithType, streamConfigMap); KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig); - List topics = - streamMetadataProvider.listTopics(Duration.ofSeconds(60)); - Set topicNames = topics.stream().map(StreamMetadataProvider.TopicMetadata::getName).collect(Collectors.toSet()); + List topics = streamMetadataProvider.listTopics(Duration.ofSeconds(60)); + Set topicNames = topics.stream() + .map(StreamMetadataProvider.TopicMetadata::getName) + .collect(Collectors.toSet()); assertEquals(topicNames, Set.of(TEST_TOPIC_1, TEST_TOPIC_2, TEST_TOPIC_3)); } } From 3aef43fc8e93b9c02fa6e55f61abb07ddd0d01c6 Mon Sep 17 00:00:00 2001 From: Suvodeep Pyne Date: Thu, 12 Dec 2024 15:26:02 -0800 Subject: [PATCH 3/5] Fixed unit tests --- .../stream/kafka30/KafkaPartitionLevelConsumerTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java index a26506b5324d..78b09e02d17a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.apache.kafka.clients.producer.KafkaProducer; @@ -424,9 +423,9 @@ public void testListTopics() { KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig); List topics = streamMetadataProvider.listTopics(Duration.ofSeconds(60)); - Set topicNames = topics.stream() + List topicNames = topics.stream() .map(StreamMetadataProvider.TopicMetadata::getName) - .collect(Collectors.toSet()); - assertEquals(topicNames, Set.of(TEST_TOPIC_1, TEST_TOPIC_2, TEST_TOPIC_3)); + .collect(Collectors.toList()); + assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2, TEST_TOPIC_3))); } } From aa9ba36dd7c20fc6225d4042f253f9355427b8aa Mon Sep 17 00:00:00 2001 From: Suvodeep Pyne Date: Thu, 12 Dec 2024 15:43:08 -0800 Subject: [PATCH 4/5] Removed Duration arg from listTopics() API --- .../plugin/stream/kafka20/KafkaStreamMetadataProvider.java | 4 ++-- .../plugin/stream/kafka30/KafkaStreamMetadataProvider.java | 4 ++-- .../stream/kafka30/KafkaPartitionLevelConsumerTest.java | 3 +-- .../org/apache/pinot/spi/stream/StreamMetadataProvider.java | 5 +---- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index dc9e5d667ef4..541c87331edc 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -168,8 +168,8 @@ public Map getCurrentPartitionLagState( } @Override - public List listTopics(Duration timeout) { - Map> namePartitionsMap = _consumer.listTopics(timeout); + public List listTopics() { + Map> namePartitionsMap = _consumer.listTopics(); if (namePartitionsMap == null) { return Collections.emptyList(); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java index a440e3b728a2..442601261867 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java @@ -168,8 +168,8 @@ public Map getCurrentPartitionLagState( } @Override - public List listTopics(Duration timeout) { - Map> namePartitionsMap = _consumer.listTopics(timeout); + public List listTopics() { + Map> namePartitionsMap = _consumer.listTopics(); if (namePartitionsMap == null) { return Collections.emptyList(); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java index 78b09e02d17a..73d4a54bc31f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.plugin.stream.kafka30; -import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.List; @@ -422,7 +421,7 @@ public void testListTopics() { StreamConfig streamConfig = new StreamConfig(tableNameWithType, streamConfigMap); KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig); - List topics = streamMetadataProvider.listTopics(Duration.ofSeconds(60)); + List topics = streamMetadataProvider.listTopics(); List topicNames = topics.stream() .map(StreamMetadataProvider.TopicMetadata::getName) .collect(Collectors.toList()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 0bbda2ca55d1..72df84ddfd23 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -110,11 +109,9 @@ default Map getCurrentPartitionLagState( /** * Fetches the list of available topics/streams * - * @param timeout Timeout for fetching the list of topics. If this is null, the implementation should use a default - * timeout. * @return List of topics */ - default List listTopics(Duration timeout) { + default List listTopics() { throw new UnsupportedOperationException(); } From 34952e0a3b9dbe25a6571bd25b0e2043badcd8c6 Mon Sep 17 00:00:00 2001 From: Suvodeep Pyne Date: Fri, 13 Dec 2024 08:45:18 -0800 Subject: [PATCH 5/5] Changed TopicMetadata to interface --- .../kafka20/KafkaStreamMetadataProvider.java | 17 +++++++++++++++-- .../kafka30/KafkaStreamMetadataProvider.java | 16 ++++++++++++++-- .../KafkaPartitionLevelConsumerTest.java | 2 +- .../spi/stream/StreamMetadataProvider.java | 15 +++------------ 4 files changed, 33 insertions(+), 17 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index 541c87331edc..bf837b54e5c8 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -168,17 +168,30 @@ public Map getCurrentPartitionLagState( } @Override - public List listTopics() { + public List getTopics() { Map> namePartitionsMap = _consumer.listTopics(); if (namePartitionsMap == null) { return Collections.emptyList(); } return namePartitionsMap.keySet() .stream() - .map(topic -> new TopicMetadata().setName(topic)) + .map(topic -> new KafkaTopicMetadata().setName(topic)) .collect(Collectors.toList()); } + public static class KafkaTopicMetadata implements TopicMetadata { + private String _name; + + public String getName() { + return _name; + } + + public KafkaTopicMetadata setName(String name) { + _name = name; + return this; + } + } + @Override public void close() throws IOException { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java index 442601261867..5fec5ddec2d3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java @@ -168,17 +168,29 @@ public Map getCurrentPartitionLagState( } @Override - public List listTopics() { + public List getTopics() { Map> namePartitionsMap = _consumer.listTopics(); if (namePartitionsMap == null) { return Collections.emptyList(); } return namePartitionsMap.keySet() .stream() - .map(topic -> new TopicMetadata().setName(topic)) + .map(topic -> new KafkaTopicMetadata().setName(topic)) .collect(Collectors.toList()); } + public static class KafkaTopicMetadata implements TopicMetadata { + private String _name; + + public String getName() { + return _name; + } + + public KafkaTopicMetadata setName(String name) { + _name = name; + return this; + } + } @Override public void close() throws IOException { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java index 73d4a54bc31f..878ea742700e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java @@ -421,7 +421,7 @@ public void testListTopics() { StreamConfig streamConfig = new StreamConfig(tableNameWithType, streamConfigMap); KafkaStreamMetadataProvider streamMetadataProvider = new KafkaStreamMetadataProvider(clientId, streamConfig); - List topics = streamMetadataProvider.listTopics(); + List topics = streamMetadataProvider.getTopics(); List topicNames = topics.stream() .map(StreamMetadataProvider.TopicMetadata::getName) .collect(Collectors.toList()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 72df84ddfd23..85bb2801a1f6 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -111,24 +111,15 @@ default Map getCurrentPartitionLagState( * * @return List of topics */ - default List listTopics() { + default List getTopics() { throw new UnsupportedOperationException(); } /** * Represents the metadata of a topic. This can be used to represent the topic name and other metadata in the future. */ - class TopicMetadata { - private String _name; - - public String getName() { - return _name; - } - - public TopicMetadata setName(String name) { - _name = name; - return this; - } + interface TopicMetadata { + String getName(); } class UnknownLagState extends PartitionLagState {