-
Notifications
You must be signed in to change notification settings - Fork 138
[BUG] KStreams doesn't work with KoP #1743
Comments
It's weird that I tried adding a test to KoP but it passed. diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/AnomalyDetectionLambdaTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/AnomalyDetectionLambdaTest.java
new file mode 100644
index 0000000..63194a0
--- /dev/null
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/AnomalyDetectionLambdaTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.pulsar.handlers.kop.streams;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AnomalyDetectionLambdaTest extends KafkaStreamsTestBase {
+
+ @Override
+ protected void createTopics() throws Exception {
+ admin.topics().createPartitionedTopic("UserClicks", 1);
+ admin.topics().createPartitionedTopic("AnomalousUsers", 1);
+ }
+
+ @Override
+ protected @NonNull String getApplicationIdPrefix() {
+ return "anomaly-detection-lambda-example";
+ }
+
+ @Override
+ protected void extraSetup() throws Exception {
+ // No ops
+ }
+
+ @Override
+ protected Class<?> getKeySerdeClass() {
+ return Serdes.String().getClass();
+ }
+
+ @Override
+ protected Class<?> getValueSerdeClass() {
+ return Serdes.String().getClass();
+ }
+
+ @Test
+ public void test() throws Exception {
+ final Serde<String> stringSerde = Serdes.String();
+ final Serde<Long> longSerde = Serdes.Long();
+
+ final KStream<String, String> views = builder.stream("UserClicks");
+ final KTable<Windowed<String>, Long> anomalousUsers = views
+ .map((ignoredKey, username) -> new KeyValue<>(username, username))
+ .groupByKey()
+ .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
+ .count()
+ .filter((windowedUserId, count) -> count >= 3);
+ final KStream<String, Long> anomalousUsersForConsole = anomalousUsers
+ .toStream()
+ .filter((windowedUserId, count) -> count != null)
+ .map((windowedUserId, count) -> new KeyValue<>(windowedUserId.toString(), count));
+
+ anomalousUsersForConsole.to("AnomalousUsers", Produced.with(stringSerde, longSerde));
+
+ anomalousUsersForConsole.to("AnomalousUsers", Produced.with(stringSerde, longSerde));
+
+ final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
+ streams.cleanUp();
+ streams.start();
+
+ final Properties consumerProps = newKafkaConsumerProperties();
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
+ final KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Collections.singleton("AnomalousUsers"));
+
+ final KafkaProducer<String, String> producer = new KafkaProducer<>(newKafkaProducerProperties());
+ final String[] values = {"alice", "alice", "bob", "alice", "alice", "charlie"};
+ for (String value : values) {
+ producer.send(new ProducerRecord<>("UserClicks", value)).get();
+ }
+
+ int i = 0;
+ while (i < 4) {
+ final ConsumerRecords<String, Long> records = consumer.poll(Duration.ofSeconds(1));
+ records.forEach(record -> log.info("XYZ received {}", record.value()));
+ i += records.count();
+ }
+
+ streams.close();
+ }
+}
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java
index 88a0ad7..b79cb44 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KafkaStreamsTestBase.java
@@ -36,7 +36,7 @@ public abstract class KafkaStreamsTestBase extends KopProtocolHandlerTestBase {
protected String bootstrapServers;
@Getter
private int testNo = 0; // the suffix of the prefix of test topic name or application id, etc.
- private Properties streamsConfiguration;
+ protected Properties streamsConfiguration;
protected StreamsBuilder builder; // the builder to build `kafkaStreams` and other objects of Kafka Streams
protected KafkaStreams kafkaStreams; I will test 2.10.3.4 soon. |
Then I found the error described in this issue:
From https://kafka.apache.org/33/documentation/streams/developer-guide/config-streams#replication-factor-parm we can see we need to set KoP upgrades the There are two workarounds:
|
I upgraded to 2.11.0.4, and got this:
In broker logs
Docker compose file:
|
Okay, I will verify 2.11.0.4 as well. |
Yeah, the error logs will be generated. But it seems that the Kafka Streams example still works well? $ bin/kafka-topics.sh --create --topic UserClicks \
--bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic UserClicks.
$ bin/kafka-topics.sh --create --topic AnomalousUsers \
--bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic AnomalousUsers.
$ bin/kafka-console-consumer.sh --topic AnomalousUsers --from-beginning \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
[alice@1681801260000/1681801320000] 4 $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic UserClicks
>alice
>alice
>bob
>alice
>alice
>charlie
>^C |
It seems to be related to the I'm going to look deeper into this issue |
This issue is not related to the topic id. These topics will be created eventually. You can run the following commands via Kafka's CLI: bin/kafka-topics.sh --create --topic UserClicks \
--bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --create --topic AnomalousUsers \
--bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Then, run the Kafka Streams application, after the following logs appear:
You will see the topics are created.
Even with the Kafka server , you will still see the following warning logs at the client side.
However, there are no error logs in Kafka's logs because Kafka does not print any error log if the requested topic does not exist. With KoP, more error logs appeared at the client side like:
That's because the default The default value of the equivalent # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0 |
1st thing I want to say: I tried it with Kafka, and it just worked:
How did I do it?
Only 3 WARN appeared |
I think warning logs should not be treated as a bug. And after #1801, KoP won't print warning or error logs when receiving metadata requests for topics that do not exist. You mentioned "it worked" for Kafka multiple times. However, how could you verify it did not work for KoP? Do you think warning logs means "not worked"? BTW, could you check my following comment before?
To avoid the affect by this config, the |
Sorry for taking so long. Last time I got stuck trying to start Pulsar locally. I'm now trying it out from scratch. Can you please explain that step you wrote, which follows topic creation?
Why do I need to wait for this log, compared to Kafka, which I don't need to wait for something after I create the topics? I have searched broker logs and have not found that line after topic creation. I will address everything you wrote. |
Ok, now it is working! I have modified my docker-compose-cluster.yaml to be exactly like the one in kop (I think the main change was version upgrade to The only thing that different compared to Kafka is the WARN from the client
This WARN appears many times. Here is the log for it:
This is how it looks like when running the example, against Kafka:
You say after the PR you have, those WARN will not appear anymore? |
Can you explain again the group config property? In Kafka I don't have any issue. Are you saying I should configure that property for KoP? I tried adding this to my env for Pulsar at docker compose file:
but I still see a lot of
|
There are two issues.
kafka_2.13-3.3.1$ grep "group.initial.rebalance" config/server.properties
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
group.initial.rebalance.delay.ms=0 |
Yes, both Kafka and KoP produce this WARN on the client, BUT, Kafka has this WARN message appears once, and KoP many times. See my comment before to see exactly how many times as I pasted the output for both.
I guess I don't understand the motivation for talking about that config? What do we aim to solve by using that config ?
|
Yes. It should be an issue. I think you can open another one for that. This issue might already go far.
Because KoP might have the following logs while Kafka did not because of the difference of that config.
|
Opened: #1858 |
Describe the bug
I get the following exception when using an example of KStreams, based on their official Kafka Streams Example repository.
When I run the file, I get the following exception:
To Reproduce
Steps to reproduce the behavior:
docker compose up -d
using the follow docker-compose fileAnomalyDetectionLambdaExample
Expected behavior
Should work
The text was updated successfully, but these errors were encountered: