From cc28c79a839ea35f9896c58ee48cf4f4989b1808 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 20 Jan 2025 10:12:22 +0100 Subject: [PATCH] Add methods for simplified testing of Kafka endpoints (#270) --- .../test/java/com/bakdata/kafka/CliTest.java | 44 +-- .../kafka/integration/RunProducerAppTest.java | 40 +-- .../kafka/integration/RunStreamsAppTest.java | 35 +-- .../kafka/integration/StreamsCleanUpTest.java | 70 ++--- streams-bootstrap-core/build.gradle.kts | 2 +- .../com/bakdata/kafka/util/TopicClient.java | 24 ++ .../ProducerCleanUpRunnerTest.java | 12 +- .../kafka/integration/ProducerRunnerTest.java | 10 +- .../integration/StreamsCleanUpRunnerTest.java | 265 ++++++++---------- .../kafka/integration/StreamsRunnerTest.java | 54 ++-- .../kafka/util/SchemaTopicClientTest.java | 75 ++--- .../bakdata/kafka/util/TopicClientTest.java | 22 +- .../bakdata/kafka/KafkaContainerHelper.java | 142 ---------- .../java/com/bakdata/kafka}/KafkaTest.java | 38 ++- .../java/com/bakdata/kafka/TestUtil.java | 36 --- .../com/bakdata/kafka/KafkaTestClient.java | 132 +++++++++ .../java/com/bakdata/kafka/ReaderBuilder.java | 111 ++++++++ .../java/com/bakdata/kafka/SenderBuilder.java | 129 +++++++++ 18 files changed, 691 insertions(+), 550 deletions(-) delete mode 100644 streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java rename streams-bootstrap-core/src/{test/java/com/bakdata/kafka/integration => testFixtures/java/com/bakdata/kafka}/KafkaTest.java (64%) delete mode 100644 streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java create mode 100644 streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java create mode 100644 streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java create mode 100644 streams-bootstrap-test/src/main/java/com/bakdata/kafka/SenderBuilder.java diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 7912746e..b777d932 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -24,20 +24,22 @@ package com.bakdata.kafka; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; +import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT; +import static com.bakdata.kafka.KafkaTest.newCluster; import static org.assertj.core.api.Assertions.assertThat; -import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.ginsberg.junit.exit.ExpectSystemExitWithStatus; import java.time.Duration; import java.util.List; import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.Consumed; import org.junit.jupiter.api.Test; import org.testcontainers.kafka.KafkaContainer; @@ -214,7 +216,7 @@ public SerdeConfig defaultSerializationConfig() { @ExpectSystemExitWithStatus(1) void shouldExitWithErrorInTopology() throws InterruptedException { final String input = "input"; - try (final KafkaContainer kafkaCluster = newKafkaCluster(); + try (final KafkaContainer kafkaCluster = newCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -240,8 +242,12 @@ public SerdeConfig defaultSerializationConfig() { "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input ); - new KafkaContainerHelper(kafkaCluster).send() - .to(input, List.of(new KeyValue<>("foo", "bar"))); + new KafkaTestClient(KafkaEndpointConfig.builder() + .bootstrapServers(kafkaCluster.getBootstrapServers()) + .build()).send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); Thread.sleep(Duration.ofSeconds(10).toMillis()); } } @@ -251,7 +257,7 @@ public SerdeConfig defaultSerializationConfig() { void shouldExitWithSuccessCodeOnShutdown() { final String input = "input"; final String output = "output"; - try (final KafkaContainer kafkaCluster = newKafkaCluster(); + try (final KafkaContainer kafkaCluster = newCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -270,20 +276,24 @@ public SerdeConfig defaultSerializationConfig() { } })) { kafkaCluster.start(); - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = new KafkaTestClient(KafkaEndpointConfig.builder() + .bootstrapServers(kafkaCluster.getBootstrapServers()) + .build()); + testClient.createTopic(output); runApp(app, "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input, "--output-topic", output ); - kafkaContainerHelper.send() - .to(input, List.of(new KeyValue<>("foo", "bar"))); - final List> keyValues = kafkaContainerHelper.read() - .from(output, Duration.ofSeconds(10)); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); + final List> keyValues = testClient.read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from(output, POLL_TIMEOUT); assertThat(keyValues) .hasSize(1) .anySatisfy(kv -> { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 40bff219..f5249942 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -24,11 +24,11 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; import static org.assertj.core.api.Assertions.assertThat; -import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaProducerApplication; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerBuilder; import com.bakdata.kafka.ProducerRunnable; @@ -36,7 +36,6 @@ import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.util.ImprovedAdminClient; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.time.Duration; @@ -45,29 +44,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers -class RunProducerAppTest { +class RunProducerAppTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - private static final String SCHEMA_REGISTRY_URL = "mock://"; - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); - - @BeforeEach - void setup() { - this.kafkaCluster.start(); - } - - @AfterEach - void tearDown() { - this.kafkaCluster.stop(); - } @Test void shouldRunApp() throws InterruptedException { @@ -88,16 +68,16 @@ public SerializerConfig defaultSerializationConfig() { return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); } })) { - app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); - app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL); + app.setBootstrapServers(this.getBootstrapServers()); + final String schemaRegistryUrl = this.getSchemaRegistryUrl(); + app.setSchemaRegistryUrl(schemaRegistryUrl); app.setOutputTopic(output); app.run(); - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - assertThat(kafkaContainerHelper.read() + final KafkaTestClient testClient = this.newTestClient(); + assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) - .from(output, TIMEOUT)) + .from(output, POLL_TIMEOUT)) .hasSize(1) .anySatisfy(kv -> { assertThat(kv.key()).isEqualTo("foo"); @@ -105,7 +85,7 @@ public SerializerConfig defaultSerializationConfig() { }); app.clean(); Thread.sleep(TIMEOUT.toMillis()); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin()) { assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic is deleted") .isFalse(); diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index c1b46273..60569ad8 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -24,48 +24,35 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; -import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaStreamsApplication; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.Mirror; -import com.bakdata.kafka.util.ImprovedAdminClient; -import java.time.Duration; import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers @ExtendWith(MockitoExtension.class) -class RunStreamsAppTest { - private static final Duration TIMEOUT = Duration.ofSeconds(10); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); +class RunStreamsAppTest extends KafkaTest { @Test void shouldRunApp() { final String input = "input"; final String output = "output"; - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(output, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(output); try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(Mirror::new)) { - app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); + app.setBootstrapServers(this.getBootstrapServers()); app.setKafkaConfig(Map.of( ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" )); @@ -73,14 +60,14 @@ void shouldRunApp() { app.setOutputTopic(output); // run in Thread because the application blocks indefinitely new Thread(app).start(); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(input, List.of(new KeyValue<>("foo", "bar"))); - assertThat(kafkaContainerHelper.read() + .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); + assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .from(output, TIMEOUT)) + .from(output, POLL_TIMEOUT)) .hasSize(1); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 5f8cd2b5..c309c6f8 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -25,13 +25,11 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; - import com.bakdata.kafka.CloseFlagApp; -import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaStreamsApplication; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.WordCount; import com.bakdata.kafka.util.ImprovedAdminClient; @@ -43,7 +41,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.assertj.core.api.SoftAssertions; @@ -54,19 +55,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers @Slf4j @ExtendWith(SoftAssertionsExtension.class) @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) -class StreamsCleanUpTest { +class StreamsCleanUpTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions private SoftAssertions softly; @@ -85,15 +80,15 @@ private static void runApp(final KafkaStreamsApplication app) throws Interrup @Test void shouldClean() throws InterruptedException { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = List.of( @@ -107,15 +102,13 @@ void shouldClean() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); app.clean(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic is deleted") .isFalse(); } - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + testClient.createTopic(app.getOutputTopic()); this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 2nd run", app); } } @@ -123,15 +116,15 @@ void shouldClean() throws InterruptedException { @Test void shouldReset() throws InterruptedException { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = List.of( @@ -145,7 +138,7 @@ void shouldReset() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); app.reset(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic exists") .isTrue(); @@ -161,9 +154,7 @@ void shouldReset() throws InterruptedException { @Test void shouldCallClose() throws InterruptedException { try (final CloseFlagApp app = this.createCloseFlagApplication()) { - try (final ImprovedAdminClient admin = new KafkaContainerHelper(this.kafkaCluster).admin()) { - admin.getTopicClient().createTopic(app.getInputTopics().get(0), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + this.newTestClient().createTopic(app.getInputTopics().get(0)); Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); @@ -184,9 +175,10 @@ private CloseFlagApp createCloseFlagApplication() { } private List> readOutputTopic(final String outputTopic) { - final List> records = new KafkaContainerHelper(this.kafkaCluster).read() + final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) - .from(outputTopic, TIMEOUT); + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(consumerRecord -> new KeyValue<>(consumerRecord.key(), consumerRecord.value())) .collect(Collectors.toList()); @@ -211,7 +203,7 @@ private KafkaStreamsApplication createWordCountApplication() { } private > T configure(final T application) { - application.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); + application.setBootstrapServers(this.getBootstrapServers()); application.setKafkaConfig(Map.of( StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, "0", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index c85e0870..77d4a1f5 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -31,7 +31,7 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) - testImplementation(project(":streams-bootstrap-test")) + testFixturesApi(project(":streams-bootstrap-test")) val testContainersVersion: String by project testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 318135ac..65dcb037 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -24,6 +24,8 @@ package com.bakdata.kafka.util; +import static java.util.Collections.emptyMap; + import java.time.Duration; import java.util.Collection; import java.util.List; @@ -102,6 +104,18 @@ public void createIfNotExists(final String topicName, final TopicSettings settin } } + /** + * Creates a new Kafka topic with the specified number of partitions if it does not yet exist. + * + * @param topicName the topic name + * @param settings settings for number of partitions and replicationFactor + * @see #createTopic(String, TopicSettings, Map) + * @see #exists(String) + */ + public void createIfNotExists(final String topicName, final TopicSettings settings) { + this.createIfNotExists(topicName, settings, emptyMap()); + } + /** * Delete a Kafka topic. * @@ -225,6 +239,16 @@ public void createTopic(final String topicName, final TopicSettings settings, fi } } + /** + * Creates a new Kafka topic with the specified number of partitions. + * + * @param topicName the topic name + * @param settings settings for number of partitions and replicationFactor + */ + public void createTopic(final String topicName, final TopicSettings settings) { + this.createTopic(topicName, settings, emptyMap()); + } + /** * List Kafka topics. * diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 155890b0..13263734 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -35,6 +35,7 @@ import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableProducerApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; +import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerCleanUpConfiguration; import com.bakdata.kafka.ProducerTopicConfig; @@ -46,10 +47,11 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import java.io.IOException; -import java.time.Duration; import java.util.List; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; @@ -109,7 +111,7 @@ void shouldDeleteTopic() { clean(executableApp); - try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { + try (final ImprovedAdminClient admin = this.newTestClient().admin()) { this.softly.assertThat(admin.getTopicClient().exists(app.getTopics().getOutputTopic())) .as("Output topic is deleted") .isFalse(); @@ -173,8 +175,10 @@ public ProducerCleanUpConfiguration setupCleanUp( } private List> readOutputTopic(final String outputTopic) { - final List> records = - this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L)); + final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java index 7653a83c..9daec3ce 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java @@ -28,13 +28,15 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredProducerApp; +import com.bakdata.kafka.KafkaTest; import com.bakdata.kafka.ProducerApp; import com.bakdata.kafka.ProducerRunner; import com.bakdata.kafka.ProducerTopicConfig; -import java.time.Duration; import java.util.List; import java.util.stream.Collectors; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; @@ -71,8 +73,10 @@ void shouldRunApp() { } private List> readOutputTopic(final String outputTopic) { - final List> records = - this.newContainerHelper().read().from(outputTopic, Duration.ofSeconds(1L)); + final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 5e8cfcea..58ca2f85 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -25,9 +25,7 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; import static com.bakdata.kafka.integration.StreamsRunnerTest.configureApp; -import static java.util.Collections.emptyMap; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -38,7 +36,9 @@ import com.bakdata.kafka.ExecutableApp; import com.bakdata.kafka.ExecutableStreamsApp; import com.bakdata.kafka.HasTopicHooks.TopicHook; -import com.bakdata.kafka.KafkaContainerHelper; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsCleanUpConfiguration; import com.bakdata.kafka.StreamsCleanUpRunner; @@ -52,9 +52,9 @@ import com.bakdata.kafka.test_applications.WordCountPattern; import com.bakdata.kafka.util.ConsumerGroupClient; import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; import java.time.Duration; @@ -66,6 +66,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.assertj.core.api.SoftAssertions; @@ -145,16 +146,15 @@ void shouldDeleteTopic() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -170,8 +170,9 @@ void shouldDeleteTopic() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - this.softly.assertThat(admin.getTopicClient().exists(app.getTopics().getOutputTopic())) + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + this.softly.assertThat(topicClient.exists(app.getTopics().getOutputTopic())) .as("Output topic is deleted") .isFalse(); } @@ -183,16 +184,15 @@ void shouldDeleteConsumerGroup() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -205,7 +205,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group exists") @@ -215,7 +215,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group is deleted") @@ -229,16 +229,15 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -251,7 +250,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) .as("Consumer group exists") @@ -260,7 +259,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept Thread.sleep(TIMEOUT.toMillis()); - try (final ImprovedAdminClient adminClient = this.createAdminClient(); + try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { consumerGroupClient.deleteConsumerGroup(app.getUniqueAppId()); this.softly.assertThat(consumerGroupClient.exists(app.getUniqueAppId())) @@ -277,17 +276,13 @@ void shouldDeleteInternalTopics() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -300,25 +295,27 @@ void shouldDeleteInternalTopics() throws InterruptedException { uniqueAppId + "-KSTREAM-REDUCE-STATE-STORE-0000000003-changelog"; final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(internalTopic)).isTrue(); - this.softly.assertThat(admin.getTopicClient().exists(backingTopic)).isTrue(); - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(internalTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(backingTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } Thread.sleep(TIMEOUT.toMillis()); reset(executableApp); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(internalTopic)).isFalse(); - this.softly.assertThat(admin.getTopicClient().exists(backingTopic)).isFalse(); - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(internalTopic)).isFalse(); + this.softly.assertThat(topicClient.exists(backingTopic)).isFalse(); + this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } } } @@ -329,17 +326,13 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -347,21 +340,23 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { final List inputTopics = app.getTopics().getInputTopics(); final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } Thread.sleep(TIMEOUT.toMillis()); clean(executableApp); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { for (final String inputTopic : inputTopics) { - this.softly.assertThat(admin.getTopicClient().exists(inputTopic)).isTrue(); + this.softly.assertThat(topicClient.exists(inputTopic)).isTrue(); } - this.softly.assertThat(admin.getTopicClient().exists(manualTopic)).isFalse(); + this.softly.assertThat(topicClient.exists(manualTopic)).isFalse(); } } } @@ -371,16 +366,15 @@ void shouldDeleteState() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "blub"), - new KeyValue<>(null, "bla"), - new KeyValue<>(null, "blub") + new SimpleProducerRecord<>(null, "blub"), + new SimpleProducerRecord<>(null, "bla"), + new SimpleProducerRecord<>(null, "blub") )); final List> expectedValues = @@ -410,16 +404,15 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, "a"), - new KeyValue<>(null, "b"), - new KeyValue<>(null, "c") + new SimpleProducerRecord<>(null, "a"), + new SimpleProducerRecord<>(null, "b"), + new SimpleProducerRecord<>(null, "c") )); run(executableApp); @@ -444,16 +437,14 @@ void shouldDeleteValueSchema() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(null, testRecord) + new SimpleProducerRecord<>(null, testRecord) )); run(executableApp); @@ -478,16 +469,13 @@ void shouldDeleteKeySchema() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>(testRecord, "val") + new SimpleProducerRecord<>(testRecord, "val") )); run(executableApp); @@ -512,17 +500,13 @@ void shouldDeleteSchemaOfInternalTopics() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -554,17 +538,13 @@ void shouldDeleteSchemaOfIntermediateTopics() final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( - new KeyValue<>("key 1", testRecord) + new SimpleProducerRecord<>("key 1", testRecord) )); run(executableApp); @@ -627,11 +607,8 @@ void shouldThrowExceptionOnResetterError() throws InterruptedException { final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpoint()); final StreamsRunner runner = executableApp.createRunner()) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getInputTopics().get(0), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getInputTopics().get(0)); StreamsRunnerTest.run(runner); // Wait until stream application has consumed all data Thread.sleep(TIMEOUT.toMillis()); @@ -647,19 +624,20 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException try (final ConfiguredStreamsApp app = createWordCountPatternApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient() - .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); - } - kafkaContainerHelper.send() + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(app.getTopics().getOutputTopic()); + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to("input_topic", List.of( - new KeyValue<>(null, "a"), - new KeyValue<>(null, "b") + new SimpleProducerRecord<>(null, "a"), + new SimpleProducerRecord<>(null, "b") )); - kafkaContainerHelper.send() + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) + .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to("another_topic", List.of( - new KeyValue<>(null, "c") + new SimpleProducerRecord<>(null, "c") )); run(executableApp); @@ -677,10 +655,7 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException } private ConfiguredStreamsApp createComplexApplication() { - try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { - admin.getTopicClient() - .createTopic(ComplexTopologyApplication.THROUGH_TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder() .inputTopics(List.of("input")) .outputTopic("output") @@ -688,10 +663,7 @@ private ConfiguredStreamsApp createComplexApplication() { } private ConfiguredStreamsApp createComplexCleanUpHookApplication() { - try (final ImprovedAdminClient admin = this.newContainerHelper().admin()) { - admin.getTopicClient() - .createTopic(ComplexTopologyApplication.THROUGH_TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); return configureApp(new ComplexTopologyApplication() { @Override public StreamsCleanUpConfiguration setupCleanUp( @@ -705,14 +677,11 @@ public StreamsCleanUpConfiguration setupCleanUp( .build()); } - private ImprovedAdminClient createAdminClient() { - return ImprovedAdminClient.create(this.createEndpoint().createKafkaProperties()); - } - private List> readOutputTopic(final String outputTopic) { - final List> records = this.newContainerHelper().read() + final List> records = this.newTestClient().read() + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class) - .from(outputTopic, TIMEOUT); + .from(outputTopic, POLL_TIMEOUT); return records.stream() .map(StreamsCleanUpRunnerTest::toKeyValue) .collect(Collectors.toList()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index 7e9441ba..bfe14596 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -24,15 +24,15 @@ package com.bakdata.kafka.integration; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static java.util.Collections.emptyMap; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.ConfiguredStreamsApp; -import com.bakdata.kafka.KafkaContainerHelper; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.SerdeConfig; import com.bakdata.kafka.StreamsApp; import com.bakdata.kafka.StreamsExecutionOptions; @@ -41,7 +41,6 @@ import com.bakdata.kafka.TopologyBuilder; import com.bakdata.kafka.test_applications.LabeledInputTopics; import com.bakdata.kafka.test_applications.Mirror; -import com.bakdata.kafka.util.ImprovedAdminClient; import java.lang.Thread.UncaughtExceptionHandler; import java.time.Duration; import java.util.List; @@ -54,7 +53,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.StateListener; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; @@ -128,19 +126,17 @@ void shouldRunApp() { .createRunner()) { final String inputTopic = app.getTopics().getInputTopics().get(0); final String outputTopic = app.getTopics().getOutputTopic(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(outputTopic); run(runner); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic, List.of(new KeyValue<>("foo", "bar"))); - this.softly.assertThat(kafkaContainerHelper.read() + .to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar"))); + this.softly.assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .from(outputTopic, TIMEOUT)) + .from(outputTopic, POLL_TIMEOUT)) .hasSize(1); } } @@ -154,25 +150,23 @@ void shouldUseMultipleLabeledInputTopics() { final String inputTopic1 = inputTopics.get(0); final String inputTopic2 = inputTopics.get(1); final String outputTopic = app.getTopics().getOutputTopic(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(inputTopic1, DEFAULT_TOPIC_SETTINGS, emptyMap()); - admin.getTopicClient().createTopic(inputTopic2, DEFAULT_TOPIC_SETTINGS, emptyMap()); - admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(inputTopic1); + testClient.createTopic(inputTopic2); + testClient.createTopic(outputTopic); run(runner); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic1, List.of(new KeyValue<>("foo", "bar"))); - kafkaContainerHelper.send() + .to(inputTopic1, List.of(new SimpleProducerRecord<>("foo", "bar"))); + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic2, List.of(new KeyValue<>("foo", "baz"))); - this.softly.assertThat(kafkaContainerHelper.read() + .to(inputTopic2, List.of(new SimpleProducerRecord<>("foo", "baz"))); + this.softly.assertThat(testClient.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .from(outputTopic, TIMEOUT)) + .from(outputTopic, POLL_TIMEOUT)) .hasSize(2); } } @@ -208,17 +202,15 @@ void shouldCloseOnMapError() throws InterruptedException { .build())) { final String inputTopic = app.getTopics().getInputTopics().get(0); final String outputTopic = app.getTopics().getOutputTopic(); - final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - admin.getTopicClient().createTopic(outputTopic, DEFAULT_TOPIC_SETTINGS, emptyMap()); - } + final KafkaTestClient testClient = this.newTestClient(); + testClient.createTopic(outputTopic); final Thread thread = run(runner); final CapturingUncaughtExceptionHandler handler = (CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler(); - kafkaContainerHelper.send() + testClient.send() .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) - .to(inputTopic, List.of(new KeyValue<>("foo", "bar"))); + .to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar"))); Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(thread.isAlive()).isFalse(); this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 2775ef33..932a3819 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -25,16 +25,14 @@ package com.bakdata.kafka.util; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; -import static java.util.Collections.emptyMap; +import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings; -import com.bakdata.kafka.KafkaContainerHelper; +import com.bakdata.kafka.KafkaTest; +import com.bakdata.kafka.KafkaTestClient; +import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.bakdata.kafka.TestRecord; -import com.bakdata.kafka.TestTopologyFactory; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import java.io.IOException; import java.time.Duration; @@ -43,25 +41,18 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers @Slf4j @ExtendWith(SoftAssertionsExtension.class) -class SchemaTopicClientTest { +class SchemaTopicClientTest extends KafkaTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; - private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); @InjectSoftAssertions SoftAssertions softly; @@ -69,19 +60,19 @@ class SchemaTopicClientTest { @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() throws InterruptedException, IOException, RestClientException { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - final TopicClient topicClient = admin.getTopicClient(); - topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + final KafkaTestClient testClient = this.newTestClient(); + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(TOPIC, defaultTopicSettings().build()); this.softly.assertThat(topicClient.exists(TOPIC)) .as("Topic is created") .isTrue(); - kafkaContainerHelper.send() + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( - new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) )); final SchemaRegistryClient client = this.getSchemaRegistryClient(); @@ -103,19 +94,19 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() @Test void shouldResetSchema() throws InterruptedException, IOException, RestClientException { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - final TopicClient topicClient = admin.getTopicClient(); - topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + final KafkaTestClient testClient = this.newTestClient(); + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(TOPIC, defaultTopicSettings().build()); this.softly.assertThat(topicClient.exists(TOPIC)) .as("Topic is created") .isTrue(); - kafkaContainerHelper.send() + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( - new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) )); final SchemaRegistryClient client = this.getSchemaRegistryClient(); @@ -138,19 +129,19 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc @Test void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws InterruptedException, RestClientException, IOException { - final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); - try (final ImprovedAdminClient admin = kafkaContainerHelper.admin()) { - final TopicClient topicClient = admin.getTopicClient(); - topicClient.createTopic(TOPIC, DEFAULT_TOPIC_SETTINGS, emptyMap()); + final KafkaTestClient testClient = this.newTestClient(); + try (final ImprovedAdminClient admin = testClient.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(TOPIC, defaultTopicSettings().build()); this.softly.assertThat(topicClient.exists(TOPIC)) .as("Topic is created") .isTrue(); - kafkaContainerHelper.send() + testClient.send() + .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( - new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) + new SimpleProducerRecord<>(null, TestRecord.newBuilder().setContent("foo").build()) )); final SchemaRegistryClient client = this.getSchemaRegistryClient(); @@ -171,24 +162,16 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBootstrapServers() ); return SchemaTopicClient.create(kafkaProperties, this.getSchemaRegistryUrl(), TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { final Map kafkaProperties = Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBootstrapServers() ); return SchemaTopicClient.create(kafkaProperties, TIMEOUT); } - private String getSchemaRegistryUrl() { - return this.testTopologyFactory.getSchemaRegistryUrl(); - } - - private SchemaRegistryClient getSchemaRegistryClient() { - return this.testTopologyFactory.getSchemaRegistryClient(); - } - } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java index ac738b72..268b2279 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java @@ -24,25 +24,19 @@ package com.bakdata.kafka.util; -import static com.bakdata.kafka.KafkaContainerHelper.DEFAULT_TOPIC_SETTINGS; -import static com.bakdata.kafka.TestUtil.newKafkaCluster; +import static com.bakdata.kafka.KafkaTestClient.defaultTopicSettings; import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; +import com.bakdata.kafka.KafkaTest; import java.time.Duration; import java.util.Map; import org.apache.kafka.clients.admin.AdminClientConfig; import org.junit.jupiter.api.Test; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.KafkaContainer; -@Testcontainers -class TopicClientTest { +class TopicClientTest extends KafkaTest { private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10L); - @Container - private final KafkaContainer kafkaCluster = newKafkaCluster(); @Test void shouldNotFindTopic() { @@ -54,7 +48,7 @@ void shouldNotFindTopic() { @Test void shouldFindTopic() throws InterruptedException { try (final TopicClient client = this.createClient()) { - client.createTopic("exists", DEFAULT_TOPIC_SETTINGS, emptyMap()); + client.createTopic("exists", defaultTopicSettings().build()); } Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { @@ -65,8 +59,8 @@ void shouldFindTopic() throws InterruptedException { @Test void shouldListTopics() throws InterruptedException { try (final TopicClient client = this.createClient()) { - client.createTopic("foo", DEFAULT_TOPIC_SETTINGS, emptyMap()); - client.createTopic("bar", DEFAULT_TOPIC_SETTINGS, emptyMap()); + client.createTopic("foo", defaultTopicSettings().build()); + client.createTopic("bar", defaultTopicSettings().build()); } Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { @@ -79,7 +73,7 @@ void shouldListTopics() throws InterruptedException { @Test void shouldDeleteTopic() throws InterruptedException { try (final TopicClient client = this.createClient()) { - client.createTopic("foo", DEFAULT_TOPIC_SETTINGS, emptyMap()); + client.createTopic("foo", defaultTopicSettings().build()); } Thread.sleep(CLIENT_TIMEOUT.toMillis()); try (final TopicClient client = this.createClient()) { @@ -113,7 +107,7 @@ void shouldCreateTopic() throws InterruptedException { } private TopicClient createClient() { - final String brokerList = this.kafkaCluster.getBootstrapServers(); + final String brokerList = this.getBootstrapServers(); final Map config = Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); return TopicClient.create(config, CLIENT_TIMEOUT); } diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java deleted file mode 100644 index 51fa262e..00000000 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaContainerHelper.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import com.bakdata.kafka.util.ImprovedAdminClient; -import com.bakdata.kafka.util.TopicSettings; -import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; -import org.testcontainers.kafka.KafkaContainer; - -@RequiredArgsConstructor -public class KafkaContainerHelper { - - public static final TopicSettings DEFAULT_TOPIC_SETTINGS = TopicSettings.builder() - .partitions(1) - .replicationFactor((short) 1) - .build(); - private final @NonNull KafkaContainer kafkaCluster; - - private static List> pollAll(final Consumer consumer, final Duration timeout) { - final List> records = new ArrayList<>(); - ConsumerRecords poll = consumer.poll(timeout); - while (!poll.isEmpty()) { - poll.forEach(records::add); - poll = consumer.poll(timeout); - } - return records; - } - - public SenderBuilder send() { - return new SenderBuilder(Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class - )); - } - - public ReaderBuilder read() { - return new ReaderBuilder(Map.of( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class - )); - } - - public ImprovedAdminClient admin() { - return ImprovedAdminClient.create(Map.of( - AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaContainerHelper.this.kafkaCluster.getBootstrapServers() - )); - } - - @RequiredArgsConstructor - public class SenderBuilder { - - private final @NonNull Map properties; - - public SenderBuilder with(final String key, final Object value) { - final Map newProperties = new HashMap<>(this.properties); - newProperties.put(key, value); - return new SenderBuilder(Map.copyOf(newProperties)); - } - - public void to(final String topic, final Iterable> records) { - final Map producerConfig = new HashMap<>(this.properties); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - KafkaContainerHelper.this.kafkaCluster.getBootstrapServers()); - try (final Producer producer = new KafkaProducer<>(producerConfig)) { - records.forEach(kv -> producer.send(new ProducerRecord<>(topic, kv.key, kv.value))); - } - } - - } - - @RequiredArgsConstructor - public class ReaderBuilder { - - private final @NonNull Map properties; - - public ReaderBuilder with(final String key, final Object value) { - final Map newProperties = new HashMap<>(this.properties); - newProperties.put(key, value); - return new ReaderBuilder(Map.copyOf(newProperties)); - } - - public List> from(final String output, final Duration timeout) { - final Map consumerConfig = new HashMap<>(this.properties); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - KafkaContainerHelper.this.kafkaCluster.getBootstrapServers()); - try (final Consumer consumer = new KafkaConsumer<>(consumerConfig)) { - final List partitionInfos = consumer.listTopics().get(output); - final List topicPartitions = partitionInfos.stream() - .map(partition -> new TopicPartition(partition.topic(), partition.partition())) - .collect(Collectors.toList()); - consumer.assign(topicPartitions); - consumer.seekToBeginning(topicPartitions); - return pollAll(consumer, timeout); - } - } - - } -} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java similarity index 64% rename from streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java rename to streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index f04f2001..94872064 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -22,45 +22,53 @@ * SOFTWARE. */ -package com.bakdata.kafka.integration; +package com.bakdata.kafka; -import com.bakdata.kafka.KafkaContainerHelper; -import com.bakdata.kafka.KafkaEndpointConfig; -import com.bakdata.kafka.TestTopologyFactory; -import com.bakdata.kafka.TestUtil; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import java.time.Duration; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; +import org.testcontainers.utility.DockerImageName; @Testcontainers -abstract class KafkaTest { +public abstract class KafkaTest { + protected static final Duration POLL_TIMEOUT = Duration.ofSeconds(10); private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container - private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); + private final KafkaContainer kafkaCluster = newCluster(); - KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { + public static KafkaContainer newCluster() { + return new KafkaContainer(DockerImageName.parse("apache/kafka-native") + .withTag("3.8.1")); + } + + protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() - .bootstrapServers(this.kafkaCluster.getBootstrapServers()) + .bootstrapServers(this.getBootstrapServers()) .build(); } - KafkaEndpointConfig createEndpoint() { + protected KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() - .bootstrapServers(this.kafkaCluster.getBootstrapServers()) + .bootstrapServers(this.getBootstrapServers()) .schemaRegistryUrl(this.getSchemaRegistryUrl()) .build(); } - KafkaContainerHelper newContainerHelper() { - return new KafkaContainerHelper(this.kafkaCluster); + protected String getBootstrapServers() { + return this.kafkaCluster.getBootstrapServers(); + } + + protected KafkaTestClient newTestClient() { + return new KafkaTestClient(this.createEndpoint()); } - String getSchemaRegistryUrl() { + protected String getSchemaRegistryUrl() { return this.testTopologyFactory.getSchemaRegistryUrl(); } - SchemaRegistryClient getSchemaRegistryClient() { + protected SchemaRegistryClient getSchemaRegistryClient() { return this.testTopologyFactory.getSchemaRegistryClient(); } } diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java deleted file mode 100644 index d9a4d7fc..00000000 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/TestUtil.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2025 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.kafka; - -import lombok.experimental.UtilityClass; -import org.testcontainers.kafka.KafkaContainer; -import org.testcontainers.utility.DockerImageName; - -@UtilityClass -public class TestUtil { - public static KafkaContainer newKafkaCluster() { - return new KafkaContainer(DockerImageName.parse("apache/kafka-native:3.8.1")); - } -} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java new file mode 100644 index 00000000..1d1a68c1 --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/KafkaTestClient.java @@ -0,0 +1,132 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import static java.util.Collections.emptyMap; + +import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicClient; +import com.bakdata.kafka.util.TopicSettings; +import com.bakdata.kafka.util.TopicSettings.TopicSettingsBuilder; +import java.util.Map; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; + +/** + * Client that supports communication with Kafka clusters in test setups, including topic management, reading from + * and sending to topics. + */ +@RequiredArgsConstructor +public class KafkaTestClient { + + private final @NonNull KafkaEndpointConfig endpointConfig; + + /** + * Create a new {@code TopicSettingsBuilder} which uses a single partition and no replicas + * @return default topic settings + */ + public static TopicSettingsBuilder defaultTopicSettings() { + return TopicSettings.builder() + .partitions(1) + .replicationFactor((short) 1); + } + + /** + * Prepare sending new data to the cluster + * @return configured {@code SenderBuilder} + */ + public SenderBuilder send() { + return new SenderBuilder(this.endpointConfig.createKafkaProperties()); + } + + /** + * Prepare reading data from the cluster. {@link ConsumerConfig#AUTO_OFFSET_RESET_CONFIG} is configured to + * {@link OffsetResetStrategy#EARLIEST} + * @return configured {@code ReaderBuilder} + */ + public ReaderBuilder read() { + return new ReaderBuilder(this.endpointConfig.createKafkaProperties()) + .with(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString()); + } + + /** + * Create a new {@code ImprovedAdminClient} for the cluster + * @return configured admin client + */ + public ImprovedAdminClient admin() { + return ImprovedAdminClient.create(this.endpointConfig.createKafkaProperties()); + } + + /** + * Creates a new Kafka topic with the specified settings. + * + * @param topicName the topic name + * @param settings settings for number of partitions and replicationFactor + * @param config topic configuration + */ + public void createTopic(final String topicName, final TopicSettings settings, final Map config) { + try (final ImprovedAdminClient admin = this.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + topicClient.createTopic(topicName, settings, config); + } + } + + /** + * Creates a new Kafka topic with the specified settings. No configs are used. + * + * @param topicName the topic name + * @param settings settings for number of partitions and replicationFactor + * @see #createTopic(String, TopicSettings, Map) + */ + public void createTopic(final String topicName, final TopicSettings settings) { + this.createTopic(topicName, settings, emptyMap()); + } + + /** + * Creates a new Kafka topic with default settings. + * + * @param topicName the topic name + * @see #createTopic(String, TopicSettings) + * @see #defaultTopicSettings() + */ + public void createTopic(final String topicName) { + this.createTopic(topicName, defaultTopicSettings().build()); + } + + /** + * Checks whether a Kafka topic exists. + * + * @param topicName the topic name + * @return whether a Kafka topic with the specified name exists or not + */ + public boolean existsTopic(final String topicName) { + try (final ImprovedAdminClient admin = this.admin(); + final TopicClient topicClient = admin.getTopicClient()) { + return topicClient.exists(topicName); + } + } +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java new file mode 100644 index 00000000..0d867dac --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ReaderBuilder.java @@ -0,0 +1,111 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; + +/** + * Read data from a Kafka cluster + */ +@RequiredArgsConstructor +public class ReaderBuilder { + + private final @NonNull Map properties; + + private static List> pollAll(final Consumer consumer, final Duration timeout) { + final List> records = new ArrayList<>(); + ConsumerRecords poll; + do { + poll = consumer.poll(timeout); + poll.forEach(records::add); + } while (!poll.isEmpty()); + return records; + } + + private static List> readAll(final Consumer consumer, final String topic, + final Duration timeout) { + final List partitionInfos = consumer.listTopics().get(topic); + final List topicPartitions = partitionInfos.stream() + .map(ReaderBuilder::toTopicPartition) + .collect(Collectors.toList()); + consumer.assign(topicPartitions); + return pollAll(consumer, timeout); + } + + private static TopicPartition toTopicPartition(final PartitionInfo partition) { + return new TopicPartition(partition.topic(), partition.partition()); + } + + /** + * Add a consumer configuration + * @param key configuration key + * @param value configuration value + * @return {@code ReaderBuilder} with added configuration + */ + public ReaderBuilder with(final String key, final Object value) { + final Map newProperties = new HashMap<>(this.properties); + newProperties.put(key, value); + return new ReaderBuilder(Map.copyOf(newProperties)); + } + + /** + * Read all data from a topic. This method is idempotent, meaning calling it multiple times will read the same + * data unless the data in the topic changes. + * @param topic topic to read from + * @param timeout consumer poll timeout + * @return consumed records + * @param type of keys + * @param type of values + */ + public List> from(final String topic, final Duration timeout) { + try (final Consumer consumer = this.createConsumer()) { + return readAll(consumer, topic, timeout); + } + } + + /** + * Create a new {@code Consumer} for a Kafka cluster + * @return {@code Consumer} + * @param type of keys + * @param type of values + */ + public Consumer createConsumer() { + return new KafkaConsumer<>(this.properties); + } + +} diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SenderBuilder.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SenderBuilder.java new file mode 100644 index 00000000..dfb4dead --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/SenderBuilder.java @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; + +/** + * Send data to a Kafka cluster + */ +@RequiredArgsConstructor +public class SenderBuilder { + + private final @NonNull Map properties; + + /** + * Add a producer configuration + * @param key configuration key + * @param value configuration value + * @return {@code SenderBuilder} with added configuration + */ + public SenderBuilder with(final String key, final Object value) { + final Map newProperties = new HashMap<>(this.properties); + newProperties.put(key, value); + return new SenderBuilder(Map.copyOf(newProperties)); + } + + /** + * Send data to a topic + * @param topic topic to send to + * @param records records to send + * @param type of keys + * @param type of values + */ + public void to(final String topic, final Iterable> records) { + try (final Producer producer = this.createProducer()) { + records.forEach(kv -> producer.send(kv.toProducerRecord(topic))); + } + } + + /** + * Create a new {@code Producer} for a Kafka cluster + * @return {@code Producer} + * @param type of keys + * @param type of values + */ + public Producer createProducer() { + return new KafkaProducer<>(this.properties); + } + + /** + * Represents a {@link ProducerRecord} without topic assignment + * @param type of keys + * @param type of values + */ + @Value + @RequiredArgsConstructor + public static class SimpleProducerRecord { + K key; + V value; + Instant timestamp; + Iterable
headers; + + /** + * Create a new {@code SimpleProducerRecord} without timestamp and headers + * @param key key + * @param value value + */ + public SimpleProducerRecord(final K key, final V value) { + this(key, value, (Instant) null); + } + + /** + * Create a new {@code SimpleProducerRecord} without headers + * @param key key + * @param value value + * @param timestamp timestamp + */ + public SimpleProducerRecord(final K key, final V value, final Instant timestamp) { + this(key, value, timestamp, null); + } + + /** + * Create a new {@code SimpleProducerRecord} without timestamp + * @param key key + * @param value value + * @param headers headers + */ + public SimpleProducerRecord(final K key, final V value, final Iterable
headers) { + this(key, value, null, headers); + } + + private ProducerRecord toProducerRecord(final String topic) { + final Long timestampMillis = this.timestamp == null ? null : this.timestamp.toEpochMilli(); + return new ProducerRecord<>(topic, null, timestampMillis, this.key, this.value, this.headers); + } + } + +}