From d967deb44338f1c595f0aed9bfce057a55dcf62a Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 20 Jan 2025 11:56:58 +0100 Subject: [PATCH] Use Awaitility (#273) --- gradle.properties | 1 + .../com/bakdata/kafka/KafkaApplication.java | 6 +- .../test/java/com/bakdata/kafka/CliTest.java | 13 +- .../kafka/integration/RunProducerAppTest.java | 5 +- .../kafka/integration/RunStreamsAppTest.java | 3 - .../kafka/integration/StreamsCleanUpTest.java | 52 +++---- streams-bootstrap-core/build.gradle.kts | 2 + .../kafka/util/ConsumerGroupClient.java | 68 +++++++++ .../com/bakdata/kafka/util/TopicClient.java | 101 ++++++++------ .../kafka/integration/ProducerRunnerTest.java | 5 - .../integration/StreamsCleanUpRunnerTest.java | 119 ++++++++-------- .../kafka/integration/StreamsRunnerTest.java | 16 ++- .../kafka/util/SchemaTopicClientTest.java | 17 +-- .../bakdata/kafka/util/TopicClientTest.java | 22 +-- .../java/com/bakdata/kafka/KafkaTest.java | 31 +++++ .../bakdata/kafka/ConsumerGroupVerifier.java | 129 ++++++++++++++++++ 16 files changed, 405 insertions(+), 185 deletions(-) create mode 100644 streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java diff --git a/gradle.properties b/gradle.properties index 86973b74..dc74ebe6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -10,4 +10,5 @@ junitVersion=5.11.4 mockitoVersion=5.15.2 assertJVersion=3.27.2 log4jVersion=2.24.3 +awaitilityVersion=4.2.2 org.gradle.jvmargs=-Xmx4096m diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index bac2a236..9f91614b 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -95,7 +95,7 @@ public abstract class KafkaApplication kafkaConfig = emptyMap(); /** - *

This methods needs to be called in the executable custom application class inheriting from + *

This method needs to be called in the executable custom application class inheriting from * {@code KafkaApplication}.

*

This method calls System exit

* @@ -109,7 +109,7 @@ public static void startApplication(final KafkaApplication } /** - *

This methods needs to be called in the executable custom application class inheriting from + *

This method needs to be called in the executable custom application class inheriting from * {@code KafkaApplication}.

* * @param app An instance of the custom application class. diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index b777d932..120e58f4 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -27,6 +27,7 @@ import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT; import static com.bakdata.kafka.KafkaTest.newCluster; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord; import com.ginsberg.junit.exit.ExpectSystemExitWithStatus; @@ -46,8 +47,10 @@ class CliTest { - private static void runApp(final KafkaStreamsApplication app, final String... args) { - new Thread(() -> KafkaApplication.startApplication(app, args)).start(); + private static Thread runApp(final KafkaStreamsApplication app, final String... args) { + final Thread thread = new Thread(() -> KafkaApplication.startApplication(app, args)); + thread.start(); + return thread; } @Test @@ -214,7 +217,7 @@ public SerdeConfig defaultSerializationConfig() { @Test @ExpectSystemExitWithStatus(1) - void shouldExitWithErrorInTopology() throws InterruptedException { + void shouldExitWithErrorInTopology() { final String input = "input"; try (final KafkaContainer kafkaCluster = newCluster(); final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @@ -238,7 +241,7 @@ public SerdeConfig defaultSerializationConfig() { })) { kafkaCluster.start(); - runApp(app, + final Thread thread = runApp(app, "--bootstrap-server", kafkaCluster.getBootstrapServers(), "--input-topics", input ); @@ -248,7 +251,7 @@ public SerdeConfig defaultSerializationConfig() { .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(input, List.of(new SimpleProducerRecord<>("foo", "bar"))); - Thread.sleep(Duration.ofSeconds(10).toMillis()); + await("Thread is dead").atMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive()); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index f5249942..528a7751 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -38,7 +38,6 @@ import com.bakdata.kafka.util.ImprovedAdminClient; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; -import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -47,10 +46,9 @@ import org.junit.jupiter.api.Test; class RunProducerAppTest extends KafkaTest { - private static final Duration TIMEOUT = Duration.ofSeconds(10); @Test - void shouldRunApp() throws InterruptedException { + void shouldRunApp() { final String output = "output"; try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() { @Override @@ -84,7 +82,6 @@ public SerializerConfig defaultSerializationConfig() { assertThat(kv.value().getContent()).isEqualTo("bar"); }); app.clean(); - Thread.sleep(TIMEOUT.toMillis()); try (final ImprovedAdminClient admin = testClient.admin()) { assertThat(admin.getTopicClient().exists(app.getOutputTopic())) .as("Output topic is deleted") diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 60569ad8..d6742200 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -39,10 +39,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -@ExtendWith(MockitoExtension.class) class RunStreamsAppTest extends KafkaTest { @Test diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index c309c6f8..4a1676cd 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -33,7 +33,6 @@ import com.bakdata.kafka.SimpleKafkaStreamsApplication; import com.bakdata.kafka.test_applications.WordCount; import com.bakdata.kafka.util.ImprovedAdminClient; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -52,33 +51,15 @@ import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; @Slf4j @ExtendWith(SoftAssertionsExtension.class) -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) class StreamsCleanUpTest extends KafkaTest { - private static final Duration TIMEOUT = Duration.ofSeconds(10); @InjectSoftAssertions private SoftAssertions softly; - private static void runAppAndClose(final KafkaStreamsApplication app) throws InterruptedException { - runApp(app); - app.stop(); - } - - private static void runApp(final KafkaStreamsApplication app) throws InterruptedException { - // run in Thread because the application blocks indefinitely - new Thread(app).start(); - // Wait until stream application has consumed all data - Thread.sleep(TIMEOUT.toMillis()); - } - @Test - void shouldClean() throws InterruptedException { + void shouldClean() { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getOutputTopic()); @@ -98,8 +79,8 @@ void shouldClean() throws InterruptedException { ); this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(app.createExecutableApp()); app.clean(); try (final ImprovedAdminClient admin = testClient.admin()) { @@ -114,7 +95,7 @@ void shouldClean() throws InterruptedException { } @Test - void shouldReset() throws InterruptedException { + void shouldReset() { try (final KafkaStreamsApplication app = this.createWordCountApplication()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getOutputTopic()); @@ -134,8 +115,8 @@ void shouldReset() throws InterruptedException { ); this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(app.createExecutableApp()); app.reset(); try (final ImprovedAdminClient admin = testClient.admin()) { @@ -152,21 +133,31 @@ void shouldReset() throws InterruptedException { } @Test - void shouldCallClose() throws InterruptedException { + void shouldCallClose() { try (final CloseFlagApp app = this.createCloseFlagApplication()) { this.newTestClient().createTopic(app.getInputTopics().get(0)); - Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(app.isClosed()).isFalse(); this.softly.assertThat(app.isAppClosed()).isFalse(); app.clean(); this.softly.assertThat(app.isAppClosed()).isTrue(); app.setAppClosed(false); - Thread.sleep(TIMEOUT.toMillis()); app.reset(); this.softly.assertThat(app.isAppClosed()).isTrue(); } } + private void runAppAndClose(final KafkaStreamsApplication app) { + this.runApp(app); + app.stop(); + } + + private void runApp(final KafkaStreamsApplication app) { + // run in Thread because the application blocks indefinitely + new Thread(app).start(); + // Wait until stream application has consumed all data + this.awaitProcessing(app.createExecutableApp()); + } + private CloseFlagApp createCloseFlagApplication() { final CloseFlagApp app = new CloseFlagApp(); app.setInputTopics(List.of("input")); @@ -185,9 +176,8 @@ private List> readOutputTopic(final String outputTopic) { } private void runAndAssertContent(final Iterable> expectedValues, - final String description, final KafkaStreamsApplication app) - throws InterruptedException { - runAppAndClose(app); + final String description, final KafkaStreamsApplication app) { + this.runAppAndClose(app); final List> output = this.readOutputTopic(app.getOutputTopic()); this.softly.assertThat(output) diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index 77d4a1f5..c5018061 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -38,6 +38,8 @@ dependencies { testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) + val awaitilityVersion: String by project + testFixturesApi(group = "org.awaitility", name = "awaitility", version = awaitilityVersion) } tasks.withType { diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java index b0322838..79d220a0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/ConsumerGroupClient.java @@ -36,7 +36,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.ConsumerGroupListing; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupIdNotFoundException; /** @@ -68,6 +71,14 @@ private static KafkaAdminException failedToListGroups(final Throwable ex) { return new KafkaAdminException("Failed to list consumer groups", ex); } + private static KafkaAdminException failedToListOffsets(final String groupName, final Throwable ex) { + return new KafkaAdminException("Failed to list offsets for consumer group" + groupName, ex); + } + + private static KafkaAdminException failedToDescribeGroup(final String groupName, final Throwable ex) { + return new KafkaAdminException("Failed to describe consumer group" + groupName, ex); + } + /** * Delete a consumer group. * @@ -93,6 +104,63 @@ public void deleteConsumerGroup(final String groupName) { } } + /** + * Describe a consumer group. + * + * @param groupName the consumer group name + * @return consumer group description + */ + public ConsumerGroupDescription describe(final String groupName) { + log.info("Describing consumer group '{}'", groupName); + try { + final ConsumerGroupDescription description = + this.adminClient.describeConsumerGroups(List.of(groupName)) + .all() + .get(this.timeout.toSeconds(), TimeUnit.SECONDS) + .get(groupName); + log.info("Described consumer group '{}'", groupName); + return description; + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw failedToDescribeGroup(groupName, ex); + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw failedToDescribeGroup(groupName, ex); + } catch (final TimeoutException ex) { + throw failedToDescribeGroup(groupName, ex); + } + } + + /** + * List offsets for a consumer group. + * + * @param groupName the consumer group name + * @return consumer group offsets + */ + public Map listOffsets(final String groupName) { + log.info("Listing offsets for consumer group '{}'", groupName); + try { + final Map offsets = + this.adminClient.listConsumerGroupOffsets(groupName) + .partitionsToOffsetAndMetadata(groupName) + .get(this.timeout.toSeconds(), TimeUnit.SECONDS); + log.info("Listed offsets for consumer group '{}'", groupName); + return offsets; + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + throw failedToListOffsets(groupName, ex); + } catch (final ExecutionException ex) { + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException) ex.getCause(); + } + throw failedToListOffsets(groupName, ex); + } catch (final TimeoutException ex) { + throw failedToListOffsets(groupName, ex); + } + } + @Override public void close() { this.adminClient.close(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java index 65dcb037..82c5ba58 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/util/TopicClient.java @@ -33,16 +33,20 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.jooq.lambda.Seq; /** * This class offers helpers to interact with Kafka topics. @@ -73,10 +77,6 @@ private static KafkaAdminException failedToRetrieveTopicDescription(final String return new KafkaAdminException("Failed to retrieve description of topic " + topicName, e); } - private static KafkaAdminException failedToCheckIfTopicExists(final String topicName, final Throwable e) { - return new KafkaAdminException("Failed to check if Kafka topic " + topicName + " exists", e); - } - private static KafkaAdminException failedToListTopics(final Throwable ex) { return new KafkaAdminException("Failed to list topics", ex); } @@ -85,6 +85,10 @@ private static KafkaAdminException failedToCreateTopic(final String topicName, f return new KafkaAdminException("Failed to create topic " + topicName, ex); } + private static KafkaAdminException failedToListOffsets(final Throwable ex) { + return new KafkaAdminException("Failed to list offsets", ex); + } + /** * Creates a new Kafka topic with the specified number of partitions if it does not yet exist. If the topic exists, * its configuration is not updated. @@ -150,32 +154,17 @@ public void deleteTopic(final String topicName) { * @return settings of topic including number of partitions and replicationFactor */ public TopicSettings describe(final String topicName) { - try { - final Map> kafkaTopicMap = - this.adminClient.describeTopics(List.of(topicName)).topicNameValues(); - final TopicDescription description = - kafkaTopicMap.get(topicName).get(this.timeout.toSeconds(), TimeUnit.SECONDS); - final List partitions = description.partitions(); - final int replicationFactor = partitions.stream() - .findFirst() - .map(TopicPartitionInfo::replicas) - .map(List::size) - .orElseThrow(() -> new IllegalStateException("Topic " + topicName + " has no partitions")); - return TopicSettings.builder() - .replicationFactor((short) replicationFactor) - .partitions(partitions.size()) - .build(); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw failedToRetrieveTopicDescription(topicName, e); - } catch (final ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException) e.getCause(); - } - throw failedToRetrieveTopicDescription(topicName, e); - } catch (final TimeoutException e) { - throw failedToRetrieveTopicDescription(topicName, e); - } + final TopicDescription description = this.getDescription(topicName); + final List partitions = description.partitions(); + final int replicationFactor = partitions.stream() + .findFirst() + .map(TopicPartitionInfo::replicas) + .map(List::size) + .orElseThrow(() -> new IllegalStateException("Topic " + topicName + " has no partitions")); + return TopicSettings.builder() + .replicationFactor((short) replicationFactor) + .partitions(partitions.size()) + .build(); } @Override @@ -190,24 +179,57 @@ public void close() { * @return whether a Kafka topic with the specified name exists or not */ public boolean exists(final String topicName) { + final Collection topics = this.listTopics(); + return topics.stream() + .anyMatch(t -> t.equals(topicName)); + } + + /** + * Describe a Kafka topic. + * + * @param topicName the topic name + * @return topic description + */ + public TopicDescription getDescription(final String topicName) { try { final Map> kafkaTopicMap = this.adminClient.describeTopics(List.of(topicName)).topicNameValues(); - kafkaTopicMap.get(topicName).get(this.timeout.toSeconds(), TimeUnit.SECONDS); - return true; + return kafkaTopicMap.get(topicName).get(this.timeout.toSeconds(), TimeUnit.SECONDS); } catch (final ExecutionException e) { - if (e.getCause() instanceof UnknownTopicOrPartitionException) { - return false; + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); } + throw failedToRetrieveTopicDescription(topicName, e); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw failedToRetrieveTopicDescription(topicName, e); + } catch (final TimeoutException e) { + throw failedToRetrieveTopicDescription(topicName, e); + } + } + + /** + * List offsets for a set of partitions. + * + * @param topicPartitions partitions to list offsets for + * @return partition offsets + */ + public Map listOffsets(final Iterable topicPartitions) { + try { + final Map offsetRequest = Seq.seq(topicPartitions) + .toMap(Function.identity(), o -> OffsetSpec.latest()); + return this.adminClient.listOffsets(offsetRequest).all() + .get(this.timeout.toSeconds(), TimeUnit.SECONDS); + } catch (final ExecutionException e) { if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); } - throw failedToCheckIfTopicExists(topicName, e); + throw failedToListOffsets(e); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - throw failedToCheckIfTopicExists(topicName, e); + throw failedToListOffsets(e); } catch (final TimeoutException e) { - throw failedToCheckIfTopicExists(topicName, e); + throw failedToListOffsets(e); } } @@ -237,6 +259,9 @@ public void createTopic(final String topicName, final TopicSettings settings, fi } catch (final TimeoutException ex) { throw failedToCreateTopic(topicName, ex); } + if (!this.exists(topicName)) { + throw new IllegalStateException(String.format("Creation of topic %s failed", topicName)); + } } /** diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java index 9daec3ce..75daedc5 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerRunnerTest.java @@ -43,13 +43,8 @@ import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.junit.jupiter.MockitoSettings; -import org.mockito.quality.Strictness; @ExtendWith(SoftAssertionsExtension.class) -@ExtendWith(MockitoExtension.class) -@MockitoSettings(strictness = Strictness.STRICT_STUBS) class ProducerRunnerTest extends KafkaTest { @InjectSoftAssertions private SoftAssertions softly; diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 58ca2f85..e1fc3c96 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -57,7 +57,6 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; -import java.time.Duration; import java.util.List; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -83,7 +82,6 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class StreamsCleanUpRunnerTest extends KafkaTest { - private static final Duration TIMEOUT = Duration.ofSeconds(10); @InjectSoftAssertions private SoftAssertions softly; @Mock @@ -133,16 +131,8 @@ private static void clean(final ExecutableApp app } } - private static void run(final ExecutableApp app) throws InterruptedException { - try (final StreamsRunner runner = app.createRunner()) { - StreamsRunnerTest.run(runner); - // Wait until stream application has consumed all data - Thread.sleep(TIMEOUT.toMillis()); - } - } - @Test - void shouldDeleteTopic() throws InterruptedException { + void shouldDeleteTopic() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -163,11 +153,11 @@ void shouldDeleteTopic() throws InterruptedException { new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(executableApp); clean(executableApp); try (final ImprovedAdminClient admin = testClient.admin(); @@ -180,7 +170,7 @@ void shouldDeleteTopic() throws InterruptedException { } @Test - void shouldDeleteConsumerGroup() throws InterruptedException { + void shouldDeleteConsumerGroup() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -201,7 +191,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); @@ -212,7 +202,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { .isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(executableApp); clean(executableApp); try (final ImprovedAdminClient adminClient = testClient.admin(); @@ -225,7 +215,7 @@ void shouldDeleteConsumerGroup() throws InterruptedException { } @Test - void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedException { + void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -246,7 +236,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "WordCount contains all elements after first run"); @@ -257,7 +247,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept .isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(executableApp); try (final ImprovedAdminClient adminClient = testClient.admin(); final ConsumerGroupClient consumerGroupClient = adminClient.getConsumerGroupClient()) { @@ -271,7 +261,7 @@ void shouldNotThrowAnErrorIfConsumerGroupDoesNotExist() throws InterruptedExcept } @Test - void shouldDeleteInternalTopics() throws InterruptedException { + void shouldDeleteInternalTopics() { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { @@ -285,7 +275,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); final List inputTopics = app.getTopics().getInputTopics(); final String uniqueAppId = app.getUniqueAppId(); @@ -305,7 +295,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(executableApp); reset(executableApp); try (final ImprovedAdminClient admin = testClient.admin(); @@ -321,7 +311,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { } @Test - void shouldDeleteIntermediateTopics() throws InterruptedException { + void shouldDeleteIntermediateTopics() { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint())) { @@ -335,7 +325,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); final List inputTopics = app.getTopics().getInputTopics(); final String manualTopic = ComplexTopologyApplication.THROUGH_TOPIC; @@ -348,7 +338,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { this.softly.assertThat(topicClient.exists(manualTopic)).isTrue(); } - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(executableApp); clean(executableApp); try (final ImprovedAdminClient admin = testClient.admin(); @@ -362,7 +352,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { } @Test - void shouldDeleteState() throws InterruptedException { + void shouldDeleteState() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -383,14 +373,14 @@ void shouldDeleteState() throws InterruptedException { new KeyValue<>("blub", 2L) ); - run(executableApp); + this.run(executableApp); this.assertContent(app.getTopics().getOutputTopic(), expectedValues, "All entries are once in the input topic after the 1st run"); - Thread.sleep(TIMEOUT.toMillis()); + this.awaitClosed(executableApp); reset(executableApp); - run(executableApp); + this.run(executableApp); final List> entriesTwice = expectedValues.stream() .flatMap(entry -> Stream.of(entry, entry)) .collect(Collectors.toList()); @@ -400,7 +390,7 @@ void shouldDeleteState() throws InterruptedException { } @Test - void shouldReprocessAlreadySeenRecords() throws InterruptedException { + void shouldReprocessAlreadySeenRecords() { try (final ConfiguredStreamsApp app = createWordCountApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -415,23 +405,23 @@ void shouldReprocessAlreadySeenRecords() throws InterruptedException { new SimpleProducerRecord<>(null, "c") )); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(executableApp); reset(executableApp); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 6); } } @Test void shouldDeleteValueSchema() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorValueApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -447,10 +437,10 @@ void shouldDeleteValueSchema() new SimpleProducerRecord<>(null, testRecord) )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-value", inputTopic + "-value"); @@ -463,7 +453,7 @@ void shouldDeleteValueSchema() @Test void shouldDeleteKeySchema() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -478,10 +468,10 @@ void shouldDeleteKeySchema() new SimpleProducerRecord<>(testRecord, "val") )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); this.softly.assertThat(client.getAllSubjects()) .contains(outputTopic + "-key", inputTopic + "-key"); @@ -494,7 +484,7 @@ void shouldDeleteKeySchema() @Test void shouldDeleteSchemaOfInternalTopics() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -509,10 +499,10 @@ void shouldDeleteSchemaOfInternalTopics() new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(executableApp); final String inputSubject = inputTopic + "-value"; final String uniqueAppId = app.getUniqueAppId(); final String internalSubject = @@ -532,7 +522,7 @@ void shouldDeleteSchemaOfInternalTopics() @Test void shouldDeleteSchemaOfIntermediateTopics() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final SchemaRegistryClient client = this.getSchemaRegistryClient()) { @@ -547,10 +537,10 @@ void shouldDeleteSchemaOfIntermediateTopics() new SimpleProducerRecord<>("key 1", testRecord) )); - run(executableApp); + this.run(executableApp); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all stream applications are completely stopped before triggering cleanup + this.awaitClosed(executableApp); final String inputSubject = inputTopic + "-value"; final String manualSubject = ComplexTopologyApplication.THROUGH_TOPIC + "-value"; this.softly.assertThat(client.getAllSubjects()) @@ -602,16 +592,15 @@ void shouldNotThrowExceptionOnMissingInputTopic() { } @Test - void shouldThrowExceptionOnResetterError() throws InterruptedException { + void shouldThrowExceptionOnResetterError() { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); - final ExecutableStreamsApp executableApp = app.withEndpoint( - this.createEndpoint()); + final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); final StreamsRunner runner = executableApp.createRunner()) { final KafkaTestClient testClient = this.newTestClient(); testClient.createTopic(app.getTopics().getInputTopics().get(0)); StreamsRunnerTest.run(runner); // Wait until stream application has consumed all data - Thread.sleep(TIMEOUT.toMillis()); + this.awaitActive(executableApp); // should throw exception because consumer group is still active this.softly.assertThatThrownBy(() -> reset(executableApp)) .isInstanceOf(CleanUpException.class) @@ -620,7 +609,7 @@ void shouldThrowExceptionOnResetterError() throws InterruptedException { } @Test - void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException { + void shouldReprocessAlreadySeenRecordsWithPattern() { try (final ConfiguredStreamsApp app = createWordCountPatternApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint( this.createEndpointWithoutSchemaRegistry())) { @@ -640,20 +629,28 @@ void shouldReprocessAlreadySeenRecordsWithPattern() throws InterruptedException new SimpleProducerRecord<>(null, "c") )); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 3); - // Wait until all stream application are completely stopped before triggering cleanup - Thread.sleep(TIMEOUT.toMillis()); + // Wait until all streams application are completely stopped before triggering cleanup + this.awaitClosed(executableApp); reset(executableApp); - run(executableApp); + this.run(executableApp); this.assertSize(app.getTopics().getOutputTopic(), 6); } } + private void run(final ExecutableStreamsApp app) { + try (final StreamsRunner runner = app.createRunner()) { + StreamsRunnerTest.run(runner); + // Wait until stream application has consumed all data + this.awaitProcessing(app); + } + } + private ConfiguredStreamsApp createComplexApplication() { this.newTestClient().createTopic(ComplexTopologyApplication.THROUGH_TOPIC); return configureApp(new ComplexTopologyApplication(), StreamsTopicConfig.builder() diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java index bfe14596..4dbe0270 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsRunnerTest.java @@ -24,6 +24,7 @@ package com.bakdata.kafka.integration; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -73,7 +74,6 @@ @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) class StreamsRunnerTest extends KafkaTest { - private static final Duration TIMEOUT = Duration.ofSeconds(10); @Mock private StreamsUncaughtExceptionHandler uncaughtExceptionHandler; @Mock @@ -119,6 +119,10 @@ private static ConfiguredStreamsApp createErrorApplication() { .build()); } + private static void awaitThreadIsDead(final Thread thread) { + await("Thread is dead").atMost(Duration.ofSeconds(10)).until(() -> !thread.isAlive()); + } + @Test void shouldRunApp() { try (final ConfiguredStreamsApp app = createMirrorApplication(); @@ -172,7 +176,7 @@ void shouldUseMultipleLabeledInputTopics() { } @Test - void shouldThrowOnMissingInputTopic() throws InterruptedException { + void shouldThrowOnMissingInputTopic() { when(this.uncaughtExceptionHandler.handle(any())).thenReturn(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); try (final ConfiguredStreamsApp app = createMirrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) @@ -183,8 +187,7 @@ void shouldThrowOnMissingInputTopic() throws InterruptedException { final Thread thread = run(runner); final CapturingUncaughtExceptionHandler handler = (CapturingUncaughtExceptionHandler) thread.getUncaughtExceptionHandler(); - Thread.sleep(TIMEOUT.toMillis()); - this.softly.assertThat(thread.isAlive()).isFalse(); + awaitThreadIsDead(thread); this.softly.assertThat(handler.getLastException()).isInstanceOf(MissingSourceTopicException.class); verify(this.uncaughtExceptionHandler).handle(any()); verify(this.stateListener).onChange(State.ERROR, State.PENDING_ERROR); @@ -192,7 +195,7 @@ void shouldThrowOnMissingInputTopic() throws InterruptedException { } @Test - void shouldCloseOnMapError() throws InterruptedException { + void shouldCloseOnMapError() { when(this.uncaughtExceptionHandler.handle(any())).thenReturn(StreamThreadExceptionResponse.SHUTDOWN_CLIENT); try (final ConfiguredStreamsApp app = createErrorApplication(); final StreamsRunner runner = app.withEndpoint(this.createEndpointWithoutSchemaRegistry()) @@ -211,8 +214,7 @@ void shouldCloseOnMapError() throws InterruptedException { .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .to(inputTopic, List.of(new SimpleProducerRecord<>("foo", "bar"))); - Thread.sleep(TIMEOUT.toMillis()); - this.softly.assertThat(thread.isAlive()).isFalse(); + awaitThreadIsDead(thread); this.softly.assertThat(handler.getLastException()).isInstanceOf(StreamsException.class) .satisfies(e -> this.softly.assertThat(e.getCause()).hasMessage("Error in map")); verify(this.uncaughtExceptionHandler).handle(any()); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 932a3819..41c83499 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -51,7 +51,7 @@ @Slf4j @ExtendWith(SoftAssertionsExtension.class) class SchemaTopicClientTest extends KafkaTest { - private static final Duration TIMEOUT = Duration.ofSeconds(10); + private static final Duration CLIENT_TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; @InjectSoftAssertions @@ -59,7 +59,7 @@ class SchemaTopicClientTest extends KafkaTest { @Test void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() - throws InterruptedException, IOException, RestClientException { + throws IOException, RestClientException { final KafkaTestClient testClient = this.newTestClient(); try (final ImprovedAdminClient admin = testClient.admin(); final TopicClient topicClient = admin.getTopicClient()) { @@ -83,8 +83,6 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); } - Thread.sleep(TIMEOUT.toMillis()); - this.softly.assertThat(client.getAllSubjects()) .doesNotContain(TOPIC + "-value"); this.softly.assertThat(topicClient.exists(TOPIC)) @@ -93,7 +91,7 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() } @Test - void shouldResetSchema() throws InterruptedException, IOException, RestClientException { + void shouldResetSchema() throws IOException, RestClientException { final KafkaTestClient testClient = this.newTestClient(); try (final ImprovedAdminClient admin = testClient.admin(); final TopicClient topicClient = admin.getTopicClient()) { @@ -117,8 +115,6 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc schemaTopicClient.resetSchemaRegistry(TOPIC); } - Thread.sleep(TIMEOUT.toMillis()); - this.softly.assertThat(client.getAllSubjects()) .doesNotContain(TOPIC + "-value"); this.softly.assertThat(topicClient.exists(TOPIC)) @@ -127,7 +123,7 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc } @Test - void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws InterruptedException, RestClientException, + void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws RestClientException, IOException { final KafkaTestClient testClient = this.newTestClient(); try (final ImprovedAdminClient admin = testClient.admin(); @@ -152,7 +148,6 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr schemaTopicClient.deleteTopicAndResetSchemaRegistry(TOPIC); } - Thread.sleep(TIMEOUT.toMillis()); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); this.softly.assertThat(topicClient.exists(TOPIC)) @@ -164,14 +159,14 @@ private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBootstrapServers() ); - return SchemaTopicClient.create(kafkaProperties, this.getSchemaRegistryUrl(), TIMEOUT); + return SchemaTopicClient.create(kafkaProperties, this.getSchemaRegistryUrl(), CLIENT_TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { final Map kafkaProperties = Map.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.getBootstrapServers() ); - return SchemaTopicClient.create(kafkaProperties, TIMEOUT); + return SchemaTopicClient.create(kafkaProperties, CLIENT_TIMEOUT); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java index 268b2279..9b3845dd 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/TopicClientTest.java @@ -46,24 +46,18 @@ void shouldNotFindTopic() { } @Test - void shouldFindTopic() throws InterruptedException { + void shouldFindTopic() { try (final TopicClient client = this.createClient()) { client.createTopic("exists", defaultTopicSettings().build()); - } - Thread.sleep(CLIENT_TIMEOUT.toMillis()); - try (final TopicClient client = this.createClient()) { assertThat(client.exists("exists")).isTrue(); } } @Test - void shouldListTopics() throws InterruptedException { + void shouldListTopics() { try (final TopicClient client = this.createClient()) { client.createTopic("foo", defaultTopicSettings().build()); client.createTopic("bar", defaultTopicSettings().build()); - } - Thread.sleep(CLIENT_TIMEOUT.toMillis()); - try (final TopicClient client = this.createClient()) { assertThat(client.listTopics()) .hasSize(2) .containsExactlyInAnyOrder("foo", "bar"); @@ -71,15 +65,10 @@ void shouldListTopics() throws InterruptedException { } @Test - void shouldDeleteTopic() throws InterruptedException { + void shouldDeleteTopic() { try (final TopicClient client = this.createClient()) { client.createTopic("foo", defaultTopicSettings().build()); - } - Thread.sleep(CLIENT_TIMEOUT.toMillis()); - try (final TopicClient client = this.createClient()) { - assertThat(client.listTopics()) - .hasSize(1) - .containsExactlyInAnyOrder("foo"); + assertThat(client.exists("foo")).isTrue(); client.deleteTopic("foo"); assertThat(client.listTopics()) .isEmpty(); @@ -87,7 +76,7 @@ void shouldDeleteTopic() throws InterruptedException { } @Test - void shouldCreateTopic() throws InterruptedException { + void shouldCreateTopic() { try (final TopicClient client = this.createClient()) { assertThat(client.exists("topic")).isFalse(); final TopicSettings settings = TopicSettings.builder() @@ -96,7 +85,6 @@ void shouldCreateTopic() throws InterruptedException { .replicationFactor((short) 1) .build(); client.createTopic("topic", settings, emptyMap()); - Thread.sleep(CLIENT_TIMEOUT.toMillis()); assertThat(client.exists("topic")).isTrue(); assertThat(client.describe("topic")) .satisfies(info -> { diff --git a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java index 94872064..20a05f59 100644 --- a/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java +++ b/streams-bootstrap-core/src/testFixtures/java/com/bakdata/kafka/KafkaTest.java @@ -26,6 +26,8 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import java.time.Duration; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @@ -43,6 +45,12 @@ public static KafkaContainer newCluster() { .withTag("3.8.1")); } + private static ConditionFactory await() { + return Awaitility.await() + .pollInterval(Duration.ofSeconds(2L)) + .atMost(Duration.ofSeconds(20L)); + } + protected KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { return KafkaEndpointConfig.builder() .bootstrapServers(this.getBootstrapServers()) @@ -71,4 +79,27 @@ protected String getSchemaRegistryUrl() { protected SchemaRegistryClient getSchemaRegistryClient() { return this.testTopologyFactory.getSchemaRegistryClient(); } + + protected void awaitProcessing(final ExecutableStreamsApp app) { + this.awaitActive(app); + final ConsumerGroupVerifier verifier = ConsumerGroupVerifier.verify(app); + await() + .alias("Consumer group has finished processing") + .until(verifier::hasFinishedProcessing); + } + + protected void awaitActive(final ExecutableStreamsApp app) { + final ConsumerGroupVerifier verifier = ConsumerGroupVerifier.verify(app); + await() + .alias("Consumer group is active") + .until(verifier::isActive); + } + + protected void awaitClosed(final ExecutableStreamsApp app) { + final ConsumerGroupVerifier verifier = ConsumerGroupVerifier.verify(app); + await() + .alias("Consumer group is closed") + .until(verifier::isClosed); + } + } diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java new file mode 100644 index 00000000..b7260919 --- /dev/null +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/ConsumerGroupVerifier.java @@ -0,0 +1,129 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka; + +import com.bakdata.kafka.util.ConsumerGroupClient; +import com.bakdata.kafka.util.ImprovedAdminClient; +import com.bakdata.kafka.util.TopicClient; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.TopicPartition; + +/** + * Utility class to verify the state of a Kafka consumer group + */ +@Slf4j +@RequiredArgsConstructor +public class ConsumerGroupVerifier { + + private final @NonNull String group; + private final @NonNull Supplier adminClientSupplier; + + /** + * Create a new verifier from an {@code ExecutableStreamsApp} + * @param app app to create verifier from + * @return verifier + */ + public static ConsumerGroupVerifier verify(final ExecutableStreamsApp app) { + final EffectiveAppConfiguration config = app.getEffectiveConfig(); + final ImprovedStreamsConfig streamsConfig = new ImprovedStreamsConfig(app.getConfig()); + return new ConsumerGroupVerifier(streamsConfig.getAppId(), config::createAdminClient); + } + + /** + * Check whether consumer group has state {@link ConsumerGroupState#STABLE} + * @return true if consumer group has state {@link ConsumerGroupState#STABLE} + */ + public boolean isActive() { + return this.getState() == ConsumerGroupState.STABLE; + } + + /** + * Check whether consumer group has state {@link ConsumerGroupState#EMPTY} + * @return true if consumer group has state {@link ConsumerGroupState#EMPTY} + */ + public boolean isClosed() { + return this.getState() == ConsumerGroupState.EMPTY; + } + + /** + * Get current state of consumer group + * @return current state of consumer group + */ + public ConsumerGroupState getState() { + try (final ImprovedAdminClient admin = this.adminClientSupplier.get(); + final ConsumerGroupClient consumerGroupClient = admin.getConsumerGroupClient()) { + final ConsumerGroupDescription description = consumerGroupClient.describe(this.group); + final ConsumerGroupState state = description.state(); + log.debug("Consumer group '{}' has state {}", this.group, state); + return state; + } + } + + /** + * Check whether consumer group has assigned partitions and lag is 0 + * @return true if consumer group has assigned partitions and lag is 0 + */ + public boolean hasFinishedProcessing() { + return this.computeLag().filter(lag -> lag == 0).isPresent(); + } + + /** + * Compute lag of consumer group + * @return lag of consumer group. If no partitions are assigned, an empty {@code Optional} is returned + */ + public Optional computeLag() { + try (final ImprovedAdminClient admin = this.adminClientSupplier.get(); + final ConsumerGroupClient consumerGroupClient = admin.getConsumerGroupClient(); + final TopicClient topicClient = admin.getTopicClient()) { + final Map consumerOffsets = + consumerGroupClient.listOffsets(this.group); + log.debug("Consumer group '{}' has {} subscribed partitions", this.group, consumerOffsets.size()); + if (consumerOffsets.isEmpty()) { + return Optional.empty(); + } + final Map partitionOffsets = + topicClient.listOffsets(consumerOffsets.keySet()); + final long lag = consumerOffsets.entrySet().stream() + .mapToLong(e -> { + final TopicPartition topicPartition = e.getKey(); + final OffsetAndMetadata consumerOffset = e.getValue(); + final ListOffsetsResultInfo partitionOffset = partitionOffsets.get(topicPartition); + return partitionOffset.offset() - consumerOffset.offset(); + }) + .sum(); + log.debug("Consumer group '{}' has lag {}", this.group, lag); + return Optional.of(lag); + } + } +}