diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java index dab2f37452..17aba52cef 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/streams/KStreamAggregationTest.java @@ -19,8 +19,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -32,9 +31,13 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import kafka.tools.ConsoleConsumer; +import java.util.concurrent.atomic.AtomicInteger; import lombok.NonNull; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -77,6 +80,9 @@ * Tests for KStream aggregation. */ public class KStreamAggregationTest extends KafkaStreamsTestBase { + + private final AtomicInteger groupIdIndex = new AtomicInteger(0); + private String streamOneInput; private String outputTopic; private String userSessionsStream; @@ -719,35 +725,42 @@ private List>> receiveMessagesWithTimestamp private String readWindowedKeyedMessagesViaConsoleConsumer(final Deserializer keyDeserializer, final Deserializer valueDeserializer, - final Class innerClass, + final Class innerClass, final int numMessages, final boolean printTimestamp) { - final ByteArrayOutputStream newConsole = new ByteArrayOutputStream(); - final PrintStream originalStream = System.out; - try (final PrintStream newStream = new PrintStream(newConsole)) { - System.setOut(newStream); - - final String keySeparator = ", "; - // manually construct the console consumer argument array - final String[] args = new String[] { - "--bootstrap-server", bootstrapServers, - "--from-beginning", - "--property", "print.key=true", - "--property", "print.timestamp=" + printTimestamp, - "--topic", outputTopic, - "--max-messages", String.valueOf(numMessages), - "--property", "key.deserializer=" + keyDeserializer.getClass().getName(), - "--property", "value.deserializer=" + valueDeserializer.getClass().getName(), - "--property", "key.separator=" + keySeparator, - "--property", "key.deserializer." + StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS - + "=" + Serdes.serdeFrom(innerClass).getClass().getName() - }; - - ConsoleConsumer.messageCount_$eq(0); //reset the message count - ConsoleConsumer.run(new ConsoleConsumer.ConsumerConfig(args)); - newStream.flush(); - System.setOut(originalStream); - return newConsole.toString(); + final Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-" + groupIdIndex.getAndIncrement()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + final Map configs = new HashMap<>(); + Serde serde = Serdes.serdeFrom(innerClass); + configs.put(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, serde.getClass().getName()); + serde.close(); + keyDeserializer.configure(configs, true); + + final KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singleton(outputTopic)); + final StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < numMessages; ) { + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord record : records) { + if (printTimestamp) { + stringBuilder.append(record.timestampType()); + stringBuilder.append(":"); + stringBuilder.append(record.timestamp()); + stringBuilder.append(", "); + } + stringBuilder.append(keyDeserializer.deserialize(outputTopic, record.key()).toString()); + stringBuilder.append(", "); + stringBuilder.append(valueDeserializer.deserialize(outputTopic, record.value()).toString()); + stringBuilder.append("\n"); + i++; + } } + consumer.close(); + return stringBuilder.toString(); } }