Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Remove usage of Scala classes from Kafka in tests (#1653)
Browse files Browse the repository at this point in the history
### Motivation

`KStreamAggregationTest` uses a Scala class `ConsoleConsumer`, which
might lead to a `NoClassDefFoundError` in branch-2.10:
https://github.com/streamnative/kop/actions/runs/3797937683/jobs/6459789362

### Modifications

Replace the `ConsoleConsumer` use with a `KafkaConsumer` and the same
formatting logic on each received record.

(cherry picked from commit 88120f6)
  • Loading branch information
BewareMyPower committed Dec 29, 2022
1 parent e9675b7 commit d7287cd
Showing 1 changed file with 43 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -32,9 +31,13 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.tools.ConsoleConsumer;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
Expand Down Expand Up @@ -77,6 +80,9 @@
* Tests for KStream aggregation.
*/
public class KStreamAggregationTest extends KafkaStreamsTestBase {

private final AtomicInteger groupIdIndex = new AtomicInteger(0);

private String streamOneInput;
private String outputTopic;
private String userSessionsStream;
Expand Down Expand Up @@ -719,35 +725,42 @@ private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp

private <K, V> String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
final Class innerClass,
final Class<?> innerClass,
final int numMessages,
final boolean printTimestamp) {
final ByteArrayOutputStream newConsole = new ByteArrayOutputStream();
final PrintStream originalStream = System.out;
try (final PrintStream newStream = new PrintStream(newConsole)) {
System.setOut(newStream);

final String keySeparator = ", ";
// manually construct the console consumer argument array
final String[] args = new String[] {
"--bootstrap-server", bootstrapServers,
"--from-beginning",
"--property", "print.key=true",
"--property", "print.timestamp=" + printTimestamp,
"--topic", outputTopic,
"--max-messages", String.valueOf(numMessages),
"--property", "key.deserializer=" + keyDeserializer.getClass().getName(),
"--property", "value.deserializer=" + valueDeserializer.getClass().getName(),
"--property", "key.separator=" + keySeparator,
"--property", "key.deserializer." + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
+ "=" + Serdes.serdeFrom(innerClass).getClass().getName()
};

ConsoleConsumer.messageCount_$eq(0); //reset the message count
ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args));
newStream.flush();
System.setOut(originalStream);
return newConsole.toString();
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-" + groupIdIndex.getAndIncrement());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final Map<String, String> configs = new HashMap<>();
Serde<?> serde = Serdes.serdeFrom(innerClass);
configs.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, serde.getClass().getName());
serde.close();
keyDeserializer.configure(configs, true);

final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(outputTopic));
final StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < numMessages; ) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<byte[], byte[]> record : records) {
if (printTimestamp) {
stringBuilder.append(record.timestampType());
stringBuilder.append(":");
stringBuilder.append(record.timestamp());
stringBuilder.append(", ");
}
stringBuilder.append(keyDeserializer.deserialize(outputTopic, record.key()).toString());
stringBuilder.append(", ");
stringBuilder.append(valueDeserializer.deserialize(outputTopic, record.value()).toString());
stringBuilder.append("\n");
i++;
}
}
consumer.close();
return stringBuilder.toString();
}
}

0 comments on commit d7287cd

Please sign in to comment.