From 71752604a56d6ffaa0972d416636369e37ee6262 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 7 Jan 2025 10:12:08 +0100 Subject: [PATCH] Replace kafka-junit with testcontainers (#264) --- build.gradle.kts | 1 + gradle.properties | 4 +- streams-bootstrap-cli/build.gradle.kts | 5 +- .../test/java/com/bakdata/kafka/CliTest.java | 49 +-- .../kafka/integration/RunProducerAppTest.java | 46 +-- .../kafka/integration/RunStreamsAppTest.java | 61 ++-- .../kafka/integration/StreamsCleanUpTest.java | 116 +++--- streams-bootstrap-core/build.gradle.kts | 7 +- .../kafka/util/ConsumerGroupClient.java | 23 +- .../com/bakdata/kafka/util/TopicClient.java | 36 +- .../test/java/com/bakdata/kafka/TestUtil.java | 49 --- .../bakdata/kafka/integration/KafkaTest.java | 31 +- .../ProducerCleanUpRunnerTest.java | 28 +- .../kafka/integration/ProducerRunnerTest.java | 19 +- .../integration/StreamsCleanUpRunnerTest.java | 341 +++++++++++------- .../kafka/integration/StreamsRunnerTest.java | 93 +++-- .../kafka/util/SchemaTopicClientTest.java | 207 +++++------ .../bakdata/kafka/util/TopicClientTest.java | 64 ++-- .../bakdata/kafka/KafkaContainerHelper.java | 142 ++++++++ .../java/com/bakdata/kafka/TestUtil.java | 18 +- 20 files changed, 782 insertions(+), 558 deletions(-) delete mode 100644 streams-bootstrap-core/src/test/java/com/bakdata/kafka/TestUtil.java create mode 100644 streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java rename {streams-bootstrap-cli/src/test => streams-bootstrap-core/src/testFixtures}/java/com/bakdata/kafka/TestUtil.java (64%) diff --git a/build.gradle.kts b/build.gradle.kts index 5cfa071e..9f0deeff 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -67,6 +67,7 @@ configure { subprojects { apply(plugin = "java-library") + apply(plugin = "java-test-fixtures") apply(plugin = "io.freefair.lombok") configure { diff --git a/gradle.properties b/gradle.properties index c496d4e5..01c84aa8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,9 +1,9 @@ version=3.1.1-SNAPSHOT org.gradle.caching=true -# running Kafka JUnit in parallel causes problems +# running Kafka Streams in parallel causes problems with colliding consumer groups org.gradle.parallel=false kafkaVersion=3.6.1 -kafkaJunitVersion=3.6.0 +testContainersVersion=1.20.4 confluentVersion=7.6.0 fluentKafkaVersion=2.14.0 junitVersion=5.10.2 diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 15548118..0f7da6d3 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -17,10 +17,7 @@ dependencies { val mockitoVersion: String by project testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) - val kafkaJunitVersion: String by project - testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - } + testImplementation(testFixtures(project(":streams-bootstrap-core"))) testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") val fluentKafkaVersion: String by project testImplementation( diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 05060156..efb7c9df 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,23 +24,23 @@ package com.bakdata.kafka; +import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static net.mguenther.kafka.junit.Wait.delay; +import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; +import com.bakdata.kafka.util.ImprovedAdminClient; import com.ginsberg.junit.exit.ExpectSystemExitWithStatus; +import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; -import net.mguenther.kafka.junit.SendKeyValues; -import net.mguenther.kafka.junit.TopicConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; +import org.testcontainers.kafka.ConfluentKafkaContainer; class CliTest { @@ -214,7 +214,7 @@ public SerdeConfig defaultSerializationConfig() { @ExpectSystemExitWithStatus(1) void shouldExitWithErrorInTopology() throws InterruptedException { final String input = "input"; - try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); + try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -235,23 +235,23 @@ public SerdeConfig defaultSerializationConfig() { } })) { kafkaCluster.start(); - kafkaCluster.createTopic(TopicConfig.withName(input).build()); runApp(app, - "--bootstrap-server", kafkaCluster.getBrokerList(), + "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input ); - kafkaCluster.send(SendKeyValues.to(input, List.of(new KeyValue<>("foo", "bar")))); - delay(10, TimeUnit.SECONDS); + new KafkaContainerHelper(kafkaCluster).send() + .to(input, List.of(new KeyValue<>("foo", "bar"))); + Thread.sleep(Duration.ofSeconds(10).toMillis()); } } @Test @ExpectSystemExitWithStatus(0) - void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException { + void shouldExitWithSuccessCodeOnShutdown() { final String input = "input"; final String output = "output"; - try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); + try (final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -270,22 +270,25 @@ public SerdeConfig defaultSerializationConfig() { } })) { kafkaCluster.start(); - kafkaCluster.createTopic(TopicConfig.withName(input).build()); - kafkaCluster.createTopic(TopicConfig.withName(output).build()); + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(kafkaCluster); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap()); + } runApp(app, - "--bootstrap-server", kafkaCluster.getBrokerList(), + "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input, "--output-topic", output ); - kafkaCluster.send(SendKeyValues.to(input, List.of(new KeyValue<>("foo", "bar")))); - delay(10, TimeUnit.SECONDS); - final List> keyValues = kafkaCluster.read(ReadKeyValues.from(output)); + kafkaContainerHelper.send() + .to(input, List.of(new KeyValue<>("foo", "bar"))); + final List> keyValues = kafkaContainerHelper.read() + .from(output, Duration.ofSeconds(10)); assertThat(keyValues) .hasSize(1) .anySatisfy(kv -> { - assertThat(kv.getKey()).isEqualTo("foo"); - assertThat(kv.getValue()).isEqualTo("bar"); + assertThat(kv.key()).isEqualTo("foo"); + assertThat(kv.value()).isEqualTo("bar"); }); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index b5b74aa5..99a5ef60 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,9 +25,9 @@ package com.bakdata.kafka.integration; import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static net.mguenther.kafka.junit.Wait.delay; import static org.assertj.core.api.Assertions.assertThat; +import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaProducerApplication; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; @@ -35,15 +35,12 @@ import com.bakdata.kafka.SerializerConfig; import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.bakdata.kafka.TestRecord; +import com.bakdata.kafka.util.ImprovedAdminClient; import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.ReadKeyValues; -import net.mguenther.kafka.junit.TopicConfig; +import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -53,12 +50,17 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +@Testcontainers class RunProducerAppTest { - private static final int TIMEOUT_SECONDS = 10; + private static final Duration TIMEOUT = Duration.ofSeconds(10); @RegisterExtension final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); - private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); + @Container + private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); @BeforeEach void setup() { @@ -73,7 +75,6 @@ void tearDown() { @Test void shouldRunApp() throws InterruptedException { final String output = "output"; - this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults()); try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() { @Override public ProducerRunnable buildRunnable(final ProducerBuilder builder) { @@ -90,30 +91,29 @@ public SerializerConfig defaultSerializationConfig() { return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); } })) { - app.setBootstrapServers(this.kafkaCluster.getBrokerList()); + app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); app.setOutputTopic(output); - app.setKafkaConfig(Map.of( - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" - )); app.run(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class) + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); + assertThat(kafkaContainerHelper.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMockExtension.getUrl()) - .build())) + .from(output, TIMEOUT)) .hasSize(1) .anySatisfy(kv -> { - assertThat(kv.getKey()).isEqualTo("foo"); - assertThat(kv.getValue().getContent()).isEqualTo("bar"); + assertThat(kv.key()).isEqualTo("foo"); + assertThat(kv.value().getContent()).isEqualTo("bar"); }); app.clean(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertThat(this.kafkaCluster.exists(app.getOutputTopic())) - .as("Output topic is deleted") - .isFalse(); + Thread.sleep(TIMEOUT.toMillis()); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + assertThat(admin.getTopicClient().exists(app.getOutputTopic())) + .as("Output topic is deleted") + .isFalse(); + } } } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 5657040b..62c52b27 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,54 +24,48 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static net.mguenther.kafka.junit.Wait.delay; +import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; +import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaStreamsApplication; import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.Mirror; +import com.bakdata.kafka.util.ImprovedAdminClient; +import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; -import net.mguenther.kafka.junit.SendKeyValuesTransactional; -import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +@Testcontainers @ExtendWith(MockitoExtension.class) class RunStreamsAppTest { - private static final int TIMEOUT_SECONDS = 10; - private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); - - @BeforeEach - void setup() { - this.kafkaCluster.start(); - } - - @AfterEach - void tearDown() { - this.kafkaCluster.stop(); - } + private static final Duration TIMEOUT = Duration.ofSeconds(10); + @Container + private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); @Test - void shouldRunApp() throws InterruptedException { + void shouldRunApp() { final String input = "input"; final String output = "output"; - this.kafkaCluster.createTopic(TopicConfig.withName(input).useDefaults()); - this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults()); + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap()); + } try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(Mirror::new)) { - app.setBootstrapServers(this.kafkaCluster.getBrokerList()); + app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); app.setKafkaConfig(Map.of( ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); @@ -79,17 +73,14 @@ void shouldRunApp() throws InterruptedException { app.setOutputTopic(output); // run in Thread because the application blocks indefinitely new Thread(app).start(); - final SendKeyValuesTransactional kvSendKeyValuesTransactionalBuilder = - SendKeyValuesTransactional.inTransaction(input, List.of(new KeyValue<>("foo", "bar"))) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .build(); - this.kafkaCluster.send(kvSendKeyValuesTransactionalBuilder); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, String.class) + kafkaContainerHelper.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(input, List.of(new KeyValue<>("foo", "bar"))); + assertThat(kafkaContainerHelper.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .build())) + .from(output, TIMEOUT)) .hasSize(1); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 2f56fc8d..d62051ec 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,45 +25,48 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static net.mguenther.kafka.junit.Wait.delay; +import static java.util.Collections.emptyMap; import com.bakdata.kafka.CloseFlagApp; +import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaStreamsApplication; import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.WordCount; +import com.bakdata.kafka.util.ImprovedAdminClient; +import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; -import net.mguenther.kafka.junit.SendValuesTransactional; -import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +@Testcontainers @Slf4j @ExtendWith(SoftAssertionsExtension.class) @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class StreamsCleanUpTest { - private static final int TIMEOUT_SECONDS = 10; - private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); + private static final Duration TIMEOUT = Duration.ofSeconds(10); + @Container + private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions private SoftAssertions softly; @@ -76,28 +79,22 @@ private static void runApp(final KafkaStreamsApplication app) throws Interrup // run in Thread because the application blocks indefinitely new Thread(app).start(); // Wait until stream application has consumed all data - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - - @BeforeEach - void setup() throws InterruptedException { - this.kafkaCluster.start(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - - @AfterEach - void tearDown() throws InterruptedException { - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - this.kafkaCluster.stop(); + Thread.sleep(TIMEOUT.toMillis()); } @Test void shouldClean() throws InterruptedException { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { - final SendValuesTransactional sendRequest = - SendValuesTransactional.inTransaction(app.getInputTopics().get(0), - List.of("blub", "bla", "blub")).useDefaults(); - this.kafkaCluster.send(sendRequest); + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); + try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { + admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to(app.getInputTopics().get(0), List.of( + new KeyValue<>(null, "blub"), + new KeyValue<>(null, "bla"), + new KeyValue<>(null, "blub") + )); final List> expectedValues = List.of( new KeyValue<>("blub", 1L), @@ -107,13 +104,18 @@ void shouldClean() throws InterruptedException { this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); app.clean(); - this.softly.assertThat(this.kafkaCluster.exists(app.getOutputTopic())) - .as("Output topic is deleted") - .isFalse(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) + .as("Output topic is deleted") + .isFalse(); + } + try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { + admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 2nd run", app); } } @@ -121,10 +123,16 @@ void shouldClean() throws InterruptedException { @Test void shouldReset() throws InterruptedException { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { - final SendValuesTransactional sendRequest = - SendValuesTransactional.inTransaction(app.getInputTopics().get(0), - List.of("blub", "bla", "blub")).useDefaults(); - this.kafkaCluster.send(sendRequest); + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); + try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { + admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to(app.getInputTopics().get(0), List.of( + new KeyValue<>(null, "blub"), + new KeyValue<>(null, "bla"), + new KeyValue<>(null, "blub") + )); final List> expectedValues = List.of( new KeyValue<>("blub", 1L), @@ -134,9 +142,15 @@ void shouldReset() throws InterruptedException { this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); app.reset(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) + .as("Output topic exists") + .isTrue(); + } + final List> entriesTwice = expectedValues.stream() .flatMap(entry -> Stream.of(entry, entry)) .collect(Collectors.toList()); @@ -147,19 +161,16 @@ void shouldReset() throws InterruptedException { @Test void shouldCallClose() throws InterruptedException { try (final CloseFlagApp app = this.createCloseFlagApplication()) { - this.kafkaCluster.createTopic(TopicConfig.withName(app.getInputTopics().get(0)).useDefaults()); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { + admin.getTopicClient().createTopic(app.getInputTopics().get(0), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); - // if we don't run the app, the coordinator will be unavailable - runAppAndClose(app); - this.softly.assertThat(app.isAppClosed()).isTrue(); - app.setAppClosed(false); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); app.clean(); this.softly.assertThat(app.isAppClosed()).isTrue(); app.setAppClosed(false); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); app.reset(); this.softly.assertThat(app.isAppClosed()).isTrue(); } @@ -172,10 +183,13 @@ private CloseFlagApp createCloseFlagApplication() { return this.configure(app); } - private List> readOutputTopic(final String outputTopic) throws InterruptedException { - final ReadKeyValues readRequest = ReadKeyValues.from(outputTopic, Long.class) - .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class).build(); - return this.kafkaCluster.read(readRequest); + private List> readOutputTopic(final String outputTopic) { + final List> records = new KafkaContainerHelper(this.kafkaCluster).read() + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) + .from(outputTopic, TIMEOUT); + return records.stream() + .map(record -> new KeyValue<>(record.key(), record.value())) + .collect(Collectors.toList()); } private void runAndAssertContent(final Iterable> expectedValues, @@ -197,7 +211,7 @@ private KafkaStreamsApplication createWordCountApplication() { } private > T configure(final T application) { - application.setBootstrapServers(this.kafkaCluster.getBrokerList()); + application.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); application.setKafkaConfig(Map.of( StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index 27675109..2d20fb1d 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -38,10 +38,9 @@ dependencies { name = "schema-registry-mock-junit5", version = fluentKafkaVersion ) - val kafkaJunitVersion: String by project - testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = kafkaJunitVersion) { - exclude(group = "org.slf4j", module = "slf4j-log4j12") - } + val testContainersVersion: String by project + testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) + testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion) testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java index 3182f01d..9698658e 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -37,6 +37,7 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.common.errors.GroupIdNotFoundException; /** * This class offers helpers to interact with Kafka consumer groups. @@ -74,7 +75,12 @@ public void deleteConsumerGroup(final String groupName) { } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); throw new KafkaAdminException("Failed to delete consumer group " + groupName, ex); - } catch (final ExecutionException | TimeoutException ex) { + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw new KafkaAdminException("Failed to delete consumer group " + groupName, ex); + } catch (final TimeoutException ex) { throw new KafkaAdminException("Failed to delete consumer group " + groupName, ex); } } @@ -110,7 +116,12 @@ public Collection listGroups() { } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); throw new KafkaAdminException("Failed to list consumer groups", ex); - } catch (final ExecutionException | TimeoutException ex) { + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw new KafkaAdminException("Failed to list consumer groups", ex); + } catch (final TimeoutException ex) { throw new KafkaAdminException("Failed to list consumer groups", ex); } } @@ -122,7 +133,11 @@ public Collection listGroups() { */ public void deleteGroupIfExists(final String groupName) { if (this.exists(groupName)) { - this.deleteConsumerGroup(groupName); + try { + this.deleteConsumerGroup(groupName); + } catch (final GroupIdNotFoundException e) { + // do nothing + } } } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 46b65bb6..318135ac 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -116,7 +116,12 @@ public void deleteTopic(final String topicName) { } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); throw failedToDeleteTopic(topicName, ex); - } catch (final ExecutionException | TimeoutException ex) { + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw failedToDeleteTopic(topicName, ex); + } catch (final TimeoutException ex) { throw failedToDeleteTopic(topicName, ex); } if (this.exists(topicName)) { @@ -149,7 +154,12 @@ public TopicSettings describe(final String topicName) { } catch (final InterruptedException e) { Thread.currentThread().interrupt(); throw failedToRetrieveTopicDescription(topicName, e); - } catch (final ExecutionException | TimeoutException e) { + } catch (final ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw failedToRetrieveTopicDescription(topicName, e); + } catch (final TimeoutException e) { throw failedToRetrieveTopicDescription(topicName, e); } } @@ -174,9 +184,11 @@ public boolean exists(final String topicName) { } catch (final ExecutionException e) { if (e.getCause() instanceof UnknownTopicOrPartitionException) { return false; - } else { - throw failedToCheckIfTopicExists(topicName, e); } + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } + throw failedToCheckIfTopicExists(topicName, e); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); throw failedToCheckIfTopicExists(topicName, e); @@ -203,7 +215,12 @@ public void createTopic(final String topicName, final TopicSettings settings, fi } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); throw failedToCreateTopic(topicName, ex); - } catch (final ExecutionException | TimeoutException ex) { + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw failedToCreateTopic(topicName, ex); + } catch (final TimeoutException ex) { throw failedToCreateTopic(topicName, ex); } } @@ -222,7 +239,12 @@ public Collection listTopics() { } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); throw failedToListTopics(ex); - } catch (final ExecutionException | TimeoutException ex) { + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw failedToListTopics(ex); + } catch (final TimeoutException ex) { throw failedToListTopics(ex); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/TestUtil.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/TestUtil.java deleted file mode 100644 index a40268a6..00000000 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/TestUtil.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; -import static net.mguenther.kafka.junit.EmbeddedKafkaConfig.brokers; - -import lombok.experimental.UtilityClass; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.EmbeddedKafkaConfig.EmbeddedKafkaConfigBuilder; - -@UtilityClass -public class TestUtil { - public static EmbeddedKafkaCluster newKafkaCluster() { - return provisionWith(newClusterConfig() - .configure(newKafkaConfig() - .build()) - .build()); - } - - public static EmbeddedKafkaConfigBuilder newKafkaConfig() { - return brokers() - .with("transaction.state.log.num.partitions", 10) - .with("offsets.topic.num.partitions", 10); - } -} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index 8432ab78..6a6e5d37 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,39 +24,36 @@ package com.bakdata.kafka.integration; +import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaEndpointConfig; import com.bakdata.kafka.TestUtil; import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +@Testcontainers abstract class KafkaTest { @RegisterExtension final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); - final EmbeddedKafkaCluster kafkaCluster = TestUtil.newKafkaCluster(); - - @BeforeEach - void setup() { - this.kafkaCluster.start(); - } - - @AfterEach - void tearDown() { - this.kafkaCluster.stop(); - } + @Container + private final ConfluentKafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() - .bootstrapServers(this.kafkaCluster.getBrokerList()) + .bootstrapServers(this.kafkaCluster.getBootstrapServers()) .build(); } KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() - .bootstrapServers(this.kafkaCluster.getBrokerList()) + .bootstrapServers(this.kafkaCluster.getBootstrapServers()) .schemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()) .build(); } + + KafkaContainerHelper newContainerHelper() { + return new KafkaContainerHelper(this.kafkaCluster); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 05f3381c..4630cb6b 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -42,12 +42,15 @@ import com.bakdata.kafka.test_applications.AvroKeyProducer; import com.bakdata.kafka.test_applications.AvroValueProducer; import com.bakdata.kafka.test_applications.StringProducer; +import com.bakdata.kafka.util.ImprovedAdminClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; +import java.time.Duration; import java.util.List; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -94,7 +97,7 @@ private static void run(final ExecutableApp executableAp } @Test - void shouldDeleteTopic() throws InterruptedException { + void shouldDeleteTopic() { try (final ConfiguredProducerApp app = createStringApplication(); final ExecutableProducerApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -106,9 +109,11 @@ void shouldDeleteTopic() throws InterruptedException { clean(executableApp); - this.softly.assertThat(this.kafkaCluster.exists(app.getTopics().getOutputTopic())) - .as("Output topic is deleted") - .isFalse(); + try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { + this.softly.assertThat(admin.getTopicClient().exists(app.getTopics().getOutputTopic())) + .as("Output topic is deleted") + .isFalse(); + } } } @@ -167,9 +172,12 @@ public ProducerCleanUpConfiguration setupCleanUp( .build()); } - private List> readOutputTopic(final String outputTopic) throws InterruptedException { - final ReadKeyValues readRequest = ReadKeyValues.from(outputTopic).build(); - return this.kafkaCluster.read(readRequest); + private List> readOutputTopic(final String outputTopic) { + final List> records = + this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L)); + return records.stream() + .map(record -> new org.apache.kafka.streams.KeyValue<>(record.key(), record.value())) + .collect(Collectors.toList()); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java index 1c0b23bd..98cb4821 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -31,9 +31,11 @@ import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerRunner; import com.bakdata.kafka.ProducerTopicConfig; +import java.time.Duration; import java.util.List; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; +import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -56,7 +58,7 @@ static ConfiguredProducerApp configureApp(final ProducerApp app, fi } @Test - void shouldRunApp() throws InterruptedException { + void shouldRunApp() { try (final ConfiguredProducerApp app = createStringApplication(); final ProducerRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner()) { @@ -68,9 +70,12 @@ void shouldRunApp() throws InterruptedException { } } - private List> readOutputTopic(final String outputTopic) throws InterruptedException { - final ReadKeyValues readRequest = ReadKeyValues.from(outputTopic).build(); - return this.kafkaCluster.read(readRequest); + private List> readOutputTopic(final String outputTopic) { + final List> records = + this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L)); + return records.stream() + .map(record -> new KeyValue<>(record.key(), record.value())) + .collect(Collectors.toList()); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index a80ae6cc..01c8bd0d 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,8 +25,9 @@ package com.bakdata.kafka.integration; +import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp; -import static net.mguenther.kafka.junit.Wait.delay; +import static java.util.Collections.emptyMap; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -37,6 +38,7 @@ import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableStreamsApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; +import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsCleanUpConfiguration; import com.bakdata.kafka.StreamsCleanUpRunner; @@ -55,22 +57,17 @@ import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; +import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; -import net.mguenther.kafka.junit.SendKeyValuesTransactional; -import net.mguenther.kafka.junit.SendValuesTransactional; -import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -85,7 +82,7 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class StreamsCleanUpRunnerTest extends KafkaTest { - private static final int TIMEOUT_SECONDS = 10; + private static final Duration TIMEOUT = Duration.ofSeconds(10); @InjectSoftAssertions private SoftAssertions softly; @Mock @@ -120,18 +117,22 @@ private static ConfiguredStreamsApp createMirrorKeyApplication() { } private static void reset(final ExecutableApp app) { - app.createCleanUpRunner().reset(); + try (final StreamsCleanUpRunner cleanUpRunner = app.createCleanUpRunner()) { + cleanUpRunner.reset(); + } } private static void clean(final ExecutableApp app) { - app.createCleanUpRunner().clean(); + try (final CleanUpRunner cleanUpRunner = app.createCleanUpRunner()) { + cleanUpRunner.clean(); + } } private static void run(final ExecutableApp app) throws InterruptedException { try (final StreamsRunner runner = app.createRunner()) { StreamsRunnerTest.run(runner); // Wait until stream application has consumed all data - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); } } @@ -140,11 +141,17 @@ void shouldDeleteTopic() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(app.getTopics().getInputTopics().get(0), List.of("blub", "bla", "blub")) - .useDefaults(); - this.kafkaCluster.send(sendRequest); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(null, "blub"), + new KeyValue<>(null, "bla"), + new KeyValue<>(null, "blub") + )); final List> expectedValues = List.of(new KeyValue<>("blub", 1L), @@ -156,12 +163,14 @@ void shouldDeleteTopic() throws InterruptedException { this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - this.softly.assertThat(this.kafkaCluster.exists(app.getTopics().getOutputTopic())) - .as("Output topic is deleted") - .isFalse(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + this.softly.assertThat(admin.getTopicClient().exists(app.getTopics().getOutputTopic())) + .as("Output topic is deleted") + .isFalse(); + } } } @@ -170,11 +179,17 @@ void shouldDeleteConsumerGroup() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(app.getTopics().getInputTopics().get(0), List.of("blub", "bla", "blub")) - .useDefaults(); - this.kafkaCluster.send(sendRequest); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(null, "blub"), + new KeyValue<>(null, "bla"), + new KeyValue<>(null, "blub") + )); final List> expectedValues = List.of(new KeyValue<>("blub", 1L), @@ -186,17 +201,19 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) { - this.softly.assertThat(adminClient.exists(app.getUniqueAppId())) + try (final ImprovedAdminClient adminClient = this.createAdminClient(); + final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { + this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group exists") .isTrue(); } - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) { - this.softly.assertThat(adminClient.exists(app.getUniqueAppId())) + try (final ImprovedAdminClient adminClient = this.createAdminClient(); + final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { + this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group is deleted") .isFalse(); } @@ -208,11 +225,17 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(app.getTopics().getInputTopics().get(0), List.of("blub", "bla", "blub")) - .useDefaults(); - this.kafkaCluster.send(sendRequest); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(null, "blub"), + new KeyValue<>(null, "bla"), + new KeyValue<>(null, "blub") + )); final List> expectedValues = List.of(new KeyValue<>("blub", 1L), @@ -224,17 +247,19 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) { - this.softly.assertThat(adminClient.exists(app.getUniqueAppId())) + try (final ImprovedAdminClient adminClient = this.createAdminClient(); + final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { + this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group exists") .isTrue(); } - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); - try (final ConsumerGroupClient adminClient = this.createAdminClient().getConsumerGroupClient()) { - adminClient.deleteConsumerGroup(app.getUniqueAppId()); - this.softly.assertThat(adminClient.exists(app.getUniqueAppId())) + try (final ImprovedAdminClient adminClient = this.createAdminClient(); + final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { + consumerGroupClient.deleteConsumerGroup(app.getUniqueAppId()); + this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group is deleted") .isFalse(); } @@ -248,15 +273,19 @@ void shouldDeleteInternalTopics() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final SendKeyValuesTransactional sendRequest = SendKeyValuesTransactional - .inTransaction(app.getTopics().getInputTopics().get(0), - Collections.singletonList(new KeyValue<>("key 1", testRecord))) + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMockExtension.getUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .build(); - this.kafkaCluster.send(sendRequest); + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>("key 1", testRecord) + )); run(executableApp); @@ -268,22 +297,26 @@ void shouldDeleteInternalTopics() throws InterruptedException { uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; - for (final String inputTopic : inputTopics) { - this.softly.assertThat(this.kafkaCluster.exists(inputTopic)).isTrue(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + for (final String inputTopic : inputTopics) { + this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + } + this.softly.assertThat(admin.getTopicClient().exists(internalTopic)).isTrue(); + this.softly.assertThat(admin.getTopicClient().exists(backingTopic)).isTrue(); + this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); } - this.softly.assertThat(this.kafkaCluster.exists(internalTopic)).isTrue(); - this.softly.assertThat(this.kafkaCluster.exists(backingTopic)).isTrue(); - this.softly.assertThat(this.kafkaCluster.exists(manualTopic)).isTrue(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); reset(executableApp); - for (final String inputTopic : inputTopics) { - this.softly.assertThat(this.kafkaCluster.exists(inputTopic)).isTrue(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + for (final String inputTopic : inputTopics) { + this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + } + this.softly.assertThat(admin.getTopicClient().exists(internalTopic)).isFalse(); + this.softly.assertThat(admin.getTopicClient().exists(backingTopic)).isFalse(); + this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); } - this.softly.assertThat(this.kafkaCluster.exists(internalTopic)).isFalse(); - this.softly.assertThat(this.kafkaCluster.exists(backingTopic)).isFalse(); - this.softly.assertThat(this.kafkaCluster.exists(manualTopic)).isTrue(); } } @@ -293,33 +326,41 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final SendKeyValuesTransactional sendRequest = SendKeyValuesTransactional - .inTransaction(app.getTopics().getInputTopics().get(0), - Collections.singletonList(new KeyValue<>("key 1", testRecord))) + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMockExtension.getUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .build(); - this.kafkaCluster.send(sendRequest); + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>("key 1", testRecord) + )); run(executableApp); final List inputTopics = app.getTopics().getInputTopics(); final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; - for (final String inputTopic : inputTopics) { - this.softly.assertThat(this.kafkaCluster.exists(inputTopic)).isTrue(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + for (final String inputTopic : inputTopics) { + this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + } + this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); } - this.softly.assertThat(this.kafkaCluster.exists(manualTopic)).isTrue(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - for (final String inputTopic : inputTopics) { - this.softly.assertThat(this.kafkaCluster.exists(inputTopic)).isTrue(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + for (final String inputTopic : inputTopics) { + this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + } + this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isFalse(); } - this.softly.assertThat(this.kafkaCluster.exists(manualTopic)).isFalse(); } } @@ -328,11 +369,17 @@ void shouldDeleteState() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(app.getTopics().getInputTopics().get(0), List.of("blub", "bla", "blub")) - .useDefaults(); - this.kafkaCluster.send(sendRequest); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(null, "blub"), + new KeyValue<>(null, "bla"), + new KeyValue<>(null, "blub") + )); final List> expectedValues = List.of(new KeyValue<>("blub", 1L), @@ -344,7 +391,7 @@ void shouldDeleteState() throws InterruptedException { this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "All entries are once in the input topic after the 1st run"); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); reset(executableApp); run(executableApp); @@ -361,10 +408,17 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(app.getTopics().getInputTopics().get(0), List.of("a", "b", "c")) - .useDefaults(); - this.kafkaCluster.send(sendRequest); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(null, "a"), + new KeyValue<>(null, "b"), + new KeyValue<>(null, "c") + )); run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); @@ -372,7 +426,7 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { this.assertSize(app.getTopics().getOutputTopic(), 3); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); reset(executableApp); run(executableApp); @@ -388,18 +442,23 @@ void shouldDeleteValueSchema() final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(inputTopic, Collections.singletonList(testRecord)) + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMockExtension.getUrl()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .build(); - this.kafkaCluster.send(sendRequest); + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(null, testRecord) + )); run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-value", inputTopic + "-value"); @@ -418,18 +477,23 @@ void shouldDeleteKeySchema() final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final SendKeyValuesTransactional sendRequest = SendKeyValuesTransactional - .inTransaction(inputTopic, Collections.singletonList(new KeyValue<>(testRecord, "val"))) + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMockExtension.getUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .build(); - this.kafkaCluster.send(sendRequest); + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>(testRecord, "val") + )); run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-key", inputTopic + "-key"); @@ -448,19 +512,24 @@ void shouldDeleteSchemaOfInternalTopics() final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final SendKeyValuesTransactional sendRequest = SendKeyValuesTransactional - .inTransaction(inputTopic, Collections.singletonList(new KeyValue<>("key 1", testRecord))) + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMockExtension.getUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .build(); - this.kafkaCluster.send(sendRequest); + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>("key 1", testRecord) + )); run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); final String inputSubject = inputTopic + "-value"; final String uniqueAppId = app.getUniqueAppId(); final String internalSubject = @@ -487,19 +556,24 @@ void shouldDeleteSchemaOfIntermediateTopics() final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final SendKeyValuesTransactional sendRequest = SendKeyValuesTransactional - .inTransaction(inputTopic, Collections.singletonList(new KeyValue<>("key 1", testRecord))) + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.schemaRegistryMockExtension.getUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) - .build(); - this.kafkaCluster.send(sendRequest); + .to(app.getTopics().getInputTopics().get(0), List.of( + new KeyValue<>("key 1", testRecord) + )); run(executableApp); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); final String inputSubject = inputTopic + "-value"; final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; this.softly.assertThat(client.getAllSubjects()) @@ -521,6 +595,7 @@ void shouldCallCleanupHookForInternalTopics() { verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-repartition"); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000008-changelog"); verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); + verify(this.topicHook).close(); verifyNoMoreInteractions(this.topicHook); } } @@ -536,17 +611,15 @@ void shouldCallCleanUpHookForAllTopics() { verify(this.topicHook).deleted(uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"); verify(this.topicHook).deleted(ComplexTopologyApplication.THROUGH_TOPIC); verify(this.topicHook).deleted(app.getTopics().getOutputTopic()); + verify(this.topicHook).close(); verifyNoMoreInteractions(this.topicHook); } } @Test - void shouldNotThrowExceptionOnMissingInputTopic() throws InterruptedException { + void shouldNotThrowExceptionOnMissingInputTopic() { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { - // if we don't run the app, the coordinator will be unavailable - run(executableApp); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); this.softly.assertThatCode(() -> clean(executableApp)).doesNotThrowAnyException(); } } @@ -557,10 +630,14 @@ void shouldThrowExceptionOnResetterError() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpoint()); final StreamsRunner runner = executableApp.createRunner()) { - this.kafkaCluster.createTopic(TopicConfig.withName(app.getTopics().getInputTopics().get(0)).useDefaults()); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getInputTopics().get(0), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } StreamsRunnerTest.run(runner); // Wait until stream application has consumed all data - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); // should throw exception because consumer group is still active this.softly.assertThatThrownBy(() -> reset(executableApp)) .isInstanceOf(CleanUpException.class) @@ -573,10 +650,20 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException try (final ConfiguredStreamsApp app = createWordCountPatternApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - this.kafkaCluster.send(SendValuesTransactional.inTransaction("input_topic", - Arrays.asList("a", "b")).useDefaults()); - this.kafkaCluster.send(SendValuesTransactional.inTransaction("another_topic", - List.of("c")).useDefaults()); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient() + .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + kafkaContainerHelper.send() + .to("input_topic", List.of( + new KeyValue<>(null, "a"), + new KeyValue<>(null, "b") + )); + kafkaContainerHelper.send() + .to("another_topic", List.of( + new KeyValue<>(null, "c") + )); run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); @@ -584,7 +671,7 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException this.assertSize(app.getTopics().getOutputTopic(), 3); // Wait until all stream application are completely stopped before triggering cleanup - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); reset(executableApp); run(executableApp); @@ -593,7 +680,10 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException } private ConfiguredStreamsApp createComplexApplication() { - this.kafkaCluster.createTopic(TopicConfig.withName(ComplexTopologyApplication.THROUGH_TOPIC).useDefaults()); + try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { + admin.getTopicClient() + .createTopic(ComplexTopologyApplication.THROUGH_TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + } return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder() .inputTopics(List.of("input")) .outputTopic("output") @@ -601,7 +691,10 @@ private ConfiguredStreamsApp createComplexApplication() { } private ConfiguredStreamsApp createComplexCleanUpHookApplication() { - this.kafkaCluster.createTopic(TopicConfig.withName(ComplexTopologyApplication.THROUGH_TOPIC).useDefaults()); + try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { + admin.getTopicClient() + .createTopic(ComplexTopologyApplication.THROUGH_TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + } return configureApp(new ComplexTopologyApplication() { @Override public StreamsCleanUpConfiguration setupCleanUp( @@ -619,22 +712,24 @@ private ImprovedAdminClient createAdminClient() { return ImprovedAdminClient.create(this.createEndpoint().createKafkaProperties()); } - private List> readOutputTopic(final String outputTopic) throws InterruptedException { - final ReadKeyValues readRequest = ReadKeyValues.from(outputTopic, Long.class) - .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class).build(); - return this.kafkaCluster.read(readRequest); + private List> readOutputTopic(final String outputTopic) { + final List> records = this.newContainerHelper().read() + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) + .from(outputTopic, TIMEOUT); + return records.stream() + .map(record -> new KeyValue<>(record.key(), record.value())) + .collect(Collectors.toList()); } private void assertContent(final String outputTopic, - final Iterable> expectedValues, final String description) - throws InterruptedException { + final Iterable> expectedValues, final String description) { final List> output = this.readOutputTopic(outputTopic); this.softly.assertThat(output) .as(description) .containsExactlyInAnyOrderElementsOf(expectedValues); } - private void assertSize(final String outputTopic, final int expectedMessageCount) throws InterruptedException { + private void assertSize(final String outputTopic, final int expectedMessageCount) { final List> records = this.readOutputTopic(outputTopic); this.softly.assertThat(records).hasSize(expectedMessageCount); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 839f05b2..ad4f045d 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,13 +24,15 @@ package com.bakdata.kafka.integration; -import static net.mguenther.kafka.junit.Wait.delay; +import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; +import static java.util.Collections.emptyMap; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredStreamsApp; +import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsExecutionOptions; @@ -39,15 +41,12 @@ import com.bakdata.kafka.TopologyBuilder; import com.bakdata.kafka.test_applications.LabeledInputTopics; import com.bakdata.kafka.test_applications.Mirror; +import com.bakdata.kafka.util.ImprovedAdminClient; import java.lang.Thread.UncaughtExceptionHandler; +import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import lombok.Getter; -import net.mguenther.kafka.junit.KeyValue; -import net.mguenther.kafka.junit.ReadKeyValues; -import net.mguenther.kafka.junit.SendKeyValuesTransactional; -import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes.StringSerde; @@ -55,6 +54,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.StateListener; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; @@ -75,7 +75,7 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class StreamsRunnerTest extends KafkaTest { - private static final int TIMEOUT_SECONDS = 10; + private static final Duration TIMEOUT = Duration.ofSeconds(10); @Mock private StreamsUncaughtExceptionHandler uncaughtExceptionHandler; @Mock @@ -122,32 +122,31 @@ private static ConfiguredStreamsApp createErrorApplication() { } @Test - void shouldRunApp() throws InterruptedException { + void shouldRunApp() { try (final ConfiguredStreamsApp app = createMirrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner()) { final String inputTopic = app.getTopics().getInputTopics().get(0); - this.kafkaCluster.createTopic(TopicConfig.withName(inputTopic).useDefaults()); final String outputTopic = app.getTopics().getOutputTopic(); - this.kafkaCluster.createTopic(TopicConfig.withName(outputTopic).useDefaults()); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); + } run(runner); - final SendKeyValuesTransactional kvSendKeyValuesTransactionalBuilder = - SendKeyValuesTransactional.inTransaction(inputTopic, List.of(new KeyValue<>("foo", "bar"))) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .build(); - this.kafkaCluster.send(kvSendKeyValuesTransactionalBuilder); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - this.softly.assertThat(this.kafkaCluster.read(ReadKeyValues.from(outputTopic, String.class, String.class) + kafkaContainerHelper.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(inputTopic, List.of(new KeyValue<>("foo", "bar"))); + this.softly.assertThat(kafkaContainerHelper.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .build())) + .from(outputTopic, TIMEOUT)) .hasSize(1); } } @Test - void shouldUseMultipleLabeledInputTopics() throws InterruptedException { + void shouldUseMultipleLabeledInputTopics() { try (final ConfiguredStreamsApp app = createLabeledInputTopicsApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) .createRunner()) { @@ -155,25 +154,25 @@ void shouldUseMultipleLabeledInputTopics() throws InterruptedException { final String inputTopic1 = inputTopics.get(0); final String inputTopic2 = inputTopics.get(1); final String outputTopic = app.getTopics().getOutputTopic(); - this.kafkaCluster.createTopic(TopicConfig.withName(inputTopic1).useDefaults()); - this.kafkaCluster.createTopic(TopicConfig.withName(inputTopic2).useDefaults()); - this.kafkaCluster.createTopic(TopicConfig.withName(outputTopic).useDefaults()); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient().createTopic(inputTopic1, DEFAULT_TOPIC_SETTINGS, emptyMap()); + admin.getTopicClient().createTopic(inputTopic2, DEFAULT_TOPIC_SETTINGS, emptyMap()); + admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); + } run(runner); - this.kafkaCluster.send( - SendKeyValuesTransactional.inTransaction(inputTopic1, List.of(new KeyValue<>("foo", "bar"))) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .build()); - this.kafkaCluster.send( - SendKeyValuesTransactional.inTransaction(inputTopic2, List.of(new KeyValue<>("foo", "baz"))) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .build()); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - this.softly.assertThat(this.kafkaCluster.read(ReadKeyValues.from(outputTopic, String.class, String.class) + kafkaContainerHelper.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(inputTopic1, List.of(new KeyValue<>("foo", "bar"))); + kafkaContainerHelper.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(inputTopic2, List.of(new KeyValue<>("foo", "baz"))); + this.softly.assertThat(kafkaContainerHelper.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .build())) + .from(outputTopic, TIMEOUT)) .hasSize(2); } } @@ -190,7 +189,7 @@ void shouldThrowOnMissingInputTopic() throws InterruptedException { final Thread thread = run(runner); final CapturingUncaughtExceptionHandler handler = (CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(thread.isAlive()).isFalse(); this.softly.assertThat(handler.getLastException()).isInstanceOf(MissingSourceTopicException.class); verify(this.uncaughtExceptionHandler).handle(any()); @@ -208,19 +207,19 @@ void shouldCloseOnMapError() throws InterruptedException { .uncaughtExceptionHandler(() -> this.uncaughtExceptionHandler) .build())) { final String inputTopic = app.getTopics().getInputTopics().get(0); - this.kafkaCluster.createTopic(TopicConfig.withName(inputTopic).useDefaults()); final String outputTopic = app.getTopics().getOutputTopic(); - this.kafkaCluster.createTopic(TopicConfig.withName(outputTopic).useDefaults()); + final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); + } final Thread thread = run(runner); final CapturingUncaughtExceptionHandler handler = (CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler(); - final SendKeyValuesTransactional kvSendKeyValuesTransactionalBuilder = - SendKeyValuesTransactional.inTransaction(inputTopic, List.of(new KeyValue<>("foo", "bar"))) - .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .build(); - this.kafkaCluster.send(kvSendKeyValuesTransactionalBuilder); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + kafkaContainerHelper.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(inputTopic, List.of(new KeyValue<>("foo", "bar"))); + Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(thread.isAlive()).isFalse(); this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class) .satisfies(e -> this.softly.assertThat(e.getCause()).hasMessage("Error in map")); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 536854dd..9472184e 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,9 +25,11 @@ package com.bakdata.kafka.util; +import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static net.mguenther.kafka.junit.Wait.delay; +import static java.util.Collections.emptyMap; +import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.TestRecord; import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -36,154 +38,155 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.io.IOException; import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.SendValuesTransactional; -import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +@Testcontainers @Slf4j @ExtendWith(SoftAssertionsExtension.class) class SchemaTopicClientTest { - private static final int TIMEOUT_SECONDS = 10; + private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; @RegisterExtension final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); - private final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); + @Container + private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions SoftAssertions softly; - @BeforeEach - void setup() { - this.kafkaCluster.start(); - } - - @AfterEach - void teardown() { - this.kafkaCluster.stop(); - } - @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() throws InterruptedException, IOException, RestClientException { - this.kafkaCluster.createTopic(TopicConfig.withName(TOPIC).useDefaults()); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .as("Topic is created") - .isTrue(); - - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(TOPIC, List.of(TestRecord.newBuilder().setContent("foo").build())) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) - .build(); - this.kafkaCluster.send(sendRequest); - - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - - try (final SchemaTopicClient schemaTopicClient = this.createClientWithSchemaRegistry()) { - schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + final TopicClient topicClient = admin.getTopicClient(); + topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + this.softly.assertThat(topicClient.exists(TOPIC)) + .as("Topic is created") + .isTrue(); + + kafkaContainerHelper.send() + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl()) + .to(TOPIC, List.of( + new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + )); + + final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + this.softly.assertThat(client.getAllSubjects()) + .contains(TOPIC + "-value"); + + try (final SchemaTopicClient schemaTopicClient = this.createClientWithSchemaRegistry()) { + schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); + } + + Thread.sleep(TIMEOUT.toMillis()); + + this.softly.assertThat(client.getAllSubjects()) + .doesNotContain(TOPIC + "-value"); + this.softly.assertThat(topicClient.exists(TOPIC)) + .isFalse(); } - - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - - this.softly.assertThat(client.getAllSubjects()) - .doesNotContain(TOPIC + "-value"); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .isFalse(); } @Test void shouldResetSchema() throws InterruptedException, IOException, RestClientException { - this.kafkaCluster.createTopic(TopicConfig.withName(TOPIC).useDefaults()); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .as("Topic is created") - .isTrue(); - - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(TOPIC, List.of(TestRecord.newBuilder().setContent("foo").build())) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) - .build(); - this.kafkaCluster.send(sendRequest); - - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - - try (final SchemaTopicClient schemaTopicClient = this.createClientWithSchemaRegistry()) { - schemaTopicClient.resetSchemaRegistry(TOPIC); + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + final TopicClient topicClient = admin.getTopicClient(); + topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + this.softly.assertThat(topicClient.exists(TOPIC)) + .as("Topic is created") + .isTrue(); + + kafkaContainerHelper.send() + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl()) + .to(TOPIC, List.of( + new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + )); + + final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + this.softly.assertThat(client.getAllSubjects()) + .contains(TOPIC + "-value"); + + try (final SchemaTopicClient schemaTopicClient = this.createClientWithSchemaRegistry()) { + schemaTopicClient.resetSchemaRegistry(TOPIC); + } + + Thread.sleep(TIMEOUT.toMillis()); + + this.softly.assertThat(client.getAllSubjects()) + .doesNotContain(TOPIC + "-value"); + this.softly.assertThat(topicClient.exists(TOPIC)) + .isTrue(); } - - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - - this.softly.assertThat(client.getAllSubjects()) - .doesNotContain(TOPIC + "-value"); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .isTrue(); } @Test void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws InterruptedException, RestClientException, IOException { - this.kafkaCluster.createTopic(TopicConfig.withName(TOPIC).useDefaults()); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .as("Topic is created") - .isTrue(); - - final SendValuesTransactional sendRequest = SendValuesTransactional - .inTransaction(TOPIC, List.of(TestRecord.newBuilder().setContent("foo").build())) - .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) - .build(); - this.kafkaCluster.send(sendRequest); - - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - - try (final SchemaTopicClient schemaTopicClient = this.createClientWithNoSchemaRegistry()) { - schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); + final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); + try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + final TopicClient topicClient = admin.getTopicClient(); + topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + this.softly.assertThat(topicClient.exists(TOPIC)) + .as("Topic is created") + .isTrue(); + + kafkaContainerHelper.send() + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + this.schemaRegistryMockExtension.getUrl()) + .to(TOPIC, List.of( + new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + )); + + final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + this.softly.assertThat(client.getAllSubjects()) + .contains(TOPIC + "-value"); + + try (final SchemaTopicClient schemaTopicClient = this.createClientWithNoSchemaRegistry()) { + schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); + } + + Thread.sleep(TIMEOUT.toMillis()); + this.softly.assertThat(client.getAllSubjects()) + .contains(TOPIC + "-value"); + this.softly.assertThat(topicClient.exists(TOPIC)) + .isFalse(); } - - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - this.softly.assertThat(client.getAllSubjects()) - .contains(TOPIC + "-value"); - this.softly.assertThat(this.kafkaCluster.exists(TOPIC)) - .isFalse(); } private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList() + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() ); return SchemaTopicClient.create(kafkaProperties, this.schemaRegistryMockExtension.getUrl(), - Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS)); + TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBrokerList() + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() ); - return SchemaTopicClient.create(kafkaProperties, Duration.of(TIMEOUT_SECONDS, ChronoUnit.SECONDS)); + return SchemaTopicClient.create(kafkaProperties, TIMEOUT); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java index 80c762a4..d3111756 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,42 +24,25 @@ package com.bakdata.kafka.util; -import static com.bakdata.kafka.TestUtil.newKafkaConfig; +import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; +import static com.bakdata.kafka.TestUtil.newKafkaCluster; import static java.util.Collections.emptyMap; -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; -import static net.mguenther.kafka.junit.Wait.delay; import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; import java.util.Map; -import java.util.concurrent.TimeUnit; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; -import net.mguenther.kafka.junit.TopicConfig; import org.apache.kafka.clients.admin.AdminClientConfig; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.ConfluentKafkaContainer; +@Testcontainers class TopicClientTest { private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L); - private final EmbeddedKafkaCluster kafkaCluster = provisionWith(newClusterConfig() - .configure(newKafkaConfig() - .withNumberOfBrokers(2) - .build()) - .build()); - - @BeforeEach - void setup() throws InterruptedException { - this.kafkaCluster.start(); - delay(20, TimeUnit.SECONDS); - } - - @AfterEach - void teardown() { - this.kafkaCluster.stop(); - } + @Container + private final ConfluentKafkaContainer kafkaCluster = newKafkaCluster(); @Test void shouldNotFindTopic() { @@ -70,8 +53,10 @@ void shouldNotFindTopic() { @Test void shouldFindTopic() throws InterruptedException { - this.kafkaCluster.createTopic(TopicConfig.withName("exists").useDefaults()); - delay((int) CLIENT_TIMEOUT.toSeconds()); + try (final TopicClient client = this.createClient()) { + client.createTopic("exists", DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { assertThat(client.exists("exists")).isTrue(); } @@ -79,9 +64,11 @@ void shouldFindTopic() throws InterruptedException { @Test void shouldListTopics() throws InterruptedException { - this.kafkaCluster.createTopic(TopicConfig.withName("foo").useDefaults()); - this.kafkaCluster.createTopic(TopicConfig.withName("bar").useDefaults()); - delay((int) CLIENT_TIMEOUT.toSeconds()); + try (final TopicClient client = this.createClient()) { + client.createTopic("foo", DEFAULT_TOPIC_SETTINGS, emptyMap()); + client.createTopic("bar", DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { assertThat(client.listTopics()) .hasSize(2) @@ -91,8 +78,10 @@ void shouldListTopics() throws InterruptedException { @Test void shouldDeleteTopic() throws InterruptedException { - this.kafkaCluster.createTopic(TopicConfig.withName("foo").useDefaults()); - delay((int) CLIENT_TIMEOUT.toSeconds()); + try (final TopicClient client = this.createClient()) { + client.createTopic("foo", DEFAULT_TOPIC_SETTINGS, emptyMap()); + } + Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { assertThat(client.listTopics()) .hasSize(1) @@ -109,21 +98,22 @@ void shouldCreateTopic() throws InterruptedException { assertThat(client.exists("topic")).isFalse(); final TopicSettings settings = TopicSettings.builder() .partitions(5) - .replicationFactor((short) 2) +// .replicationFactor((short) 2) // FIXME setup testcontainers with multiple brokers + .replicationFactor((short) 1) .build(); client.createTopic("topic", settings, emptyMap()); - delay((int) CLIENT_TIMEOUT.toSeconds(), TimeUnit.SECONDS); + Thread.sleep(CLIENT_TIMEOUT.toMillis()); assertThat(client.exists("topic")).isTrue(); assertThat(client.describe("topic")) .satisfies(info -> { - assertThat(info.getReplicationFactor()).isEqualTo((short) 2); + assertThat(info.getReplicationFactor()).isEqualTo((short) 1); assertThat(info.getPartitions()).isEqualTo(5); }); } } private TopicClient createClient() { - final String brokerList = this.kafkaCluster.getBrokerList(); + final String brokerList = this.kafkaCluster.getBootstrapServers(); final Map config = Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); return TopicClient.create(config, CLIENT_TIMEOUT); } diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java new file mode 100644 index 00000000..c1f4d8d8 --- /dev/null +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java @@ -0,0 +1,142 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicSettings; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.testcontainers.kafka.ConfluentKafkaContainer; + +@RequiredArgsConstructor +public class KafkaContainerHelper { + + public static final TopicSettings DEFAULT_TOPIC_SETTINGS = TopicSettings.builder() + .partitions(1) + .replicationFactor((short) 1) + .build(); + private final @NonNull ConfluentKafkaContainer kafkaCluster; + + private static List> pollAll(final Consumer consumer, final Duration timeout) { + final List> records = new ArrayList<>(); + ConsumerRecords poll = consumer.poll(timeout); + while (!poll.isEmpty()) { + poll.forEach(records::add); + poll = consumer.poll(timeout); + } + return records; + } + + public SenderBuilder send() { + return new SenderBuilder(Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class + )); + } + + public ReaderBuilder read() { + return new ReaderBuilder(Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class + )); + } + + public ImprovedAdminClient admin() { + return ImprovedAdminClient.create(Map.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaContainerHelper.this.kafkaCluster.getBootstrapServers() + )); + } + + @RequiredArgsConstructor + public class SenderBuilder { + + private final @NonNull Map properties; + + public SenderBuilder with(final String key, final Object value) { + final Map newProperties = new HashMap<>(this.properties); + newProperties.put(key, value); + return new SenderBuilder(Map.copyOf(newProperties)); + } + + public void to(final String topic, final Iterable> records) { + final Map producerConfig = new HashMap<>(this.properties); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + KafkaContainerHelper.this.kafkaCluster.getBootstrapServers()); + try (final Producer producer = new KafkaProducer<>(producerConfig)) { + records.forEach(kv -> producer.send(new ProducerRecord<>(topic, kv.key, kv.value))); + } + } + + } + + @RequiredArgsConstructor + public class ReaderBuilder { + + private final @NonNull Map properties; + + public ReaderBuilder with(final String key, final Object value) { + final Map newProperties = new HashMap<>(this.properties); + newProperties.put(key, value); + return new ReaderBuilder(Map.copyOf(newProperties)); + } + + public List> from(final String output, final Duration timeout) { + final Map consumerConfig = new HashMap<>(this.properties); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + KafkaContainerHelper.this.kafkaCluster.getBootstrapServers()); + try (final Consumer consumer = new KafkaConsumer<>(consumerConfig)) { + final List partitionInfos = consumer.listTopics().get(output); + final List topicPartitions = partitionInfos.stream() + .map(partition -> new TopicPartition(partition.topic(), partition.partition())) + .collect(Collectors.toList()); + consumer.assign(topicPartitions); + consumer.seekToBeginning(topicPartitions); + return pollAll(consumer, timeout); + } + } + + } +} diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/TestUtil.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java similarity index 64% rename from streams-bootstrap-cli/src/test/java/com/bakdata/kafka/TestUtil.java rename to streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java index c89ffa4f..1af3402d 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/TestUtil.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,21 +24,13 @@ package com.bakdata.kafka; -import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith; -import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.newClusterConfig; -import static net.mguenther.kafka.junit.EmbeddedKafkaConfig.brokers; - import lombok.experimental.UtilityClass; -import net.mguenther.kafka.junit.EmbeddedKafkaCluster; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; @UtilityClass public class TestUtil { - public static EmbeddedKafkaCluster newKafkaCluster() { - return provisionWith(newClusterConfig() - .configure(brokers() - .with("transaction.state.log.num.partitions", 10) - .with("offsets.topic.num.partitions", 10) - .build()) - .build()); + public static ConfluentKafkaContainer newKafkaCluster() { + return new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0")); } }