Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Awaitility #273

Merged
merged 48 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d313f77
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
aea9e77
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
eb08875
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
b8b38dc
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
3c4afb3
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
9cffdcf
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
9a3a968
Bump CI versions
yannick-roeder Jan 16, 2025
ad822fe
Fix Helm publish
yannick-roeder Jan 16, 2025
5ef5e4c
Fix Helm publish
yannick-roeder Jan 16, 2025
f191c92
Fix Helm publish
yannick-roeder Jan 16, 2025
faac25d
Fix Helm publish
yannick-roeder Jan 16, 2025
1309e18
Fix Helm publish
yannick-roeder Jan 16, 2025
0dbeb38
Fix Helm publish
yannick-roeder Jan 16, 2025
8759f60
Merge remote-tracking branch 'origin/fix/ci' into feature/test-improv…
philipp94831 Jan 16, 2025
f353ec1
Add methods for simplified testing of Kafka endpoints
philipp94831 Jan 16, 2025
692796c
Merge remote-tracking branch 'origin/master' into feature/test-improv…
philipp94831 Jan 17, 2025
21a0293
Update
philipp94831 Jan 17, 2025
f2fd574
Update
philipp94831 Jan 17, 2025
5637380
Update
philipp94831 Jan 17, 2025
16de422
Add docs
philipp94831 Jan 17, 2025
b8b23e2
Add docs
philipp94831 Jan 17, 2025
90f6856
Use Awaitility
philipp94831 Jan 17, 2025
f68a1c3
Use Awaitility
philipp94831 Jan 17, 2025
25ed943
Use Awaitility
philipp94831 Jan 17, 2025
0a2d27b
Use Awaitility
philipp94831 Jan 17, 2025
4bf9fb5
Update
philipp94831 Jan 17, 2025
282a7be
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 17, 2025
d0c5e3c
Update
philipp94831 Jan 17, 2025
8307752
Update
philipp94831 Jan 17, 2025
dbe16f7
Update
philipp94831 Jan 17, 2025
464d759
Update
philipp94831 Jan 17, 2025
9c5f6dc
Update
philipp94831 Jan 17, 2025
afa135c
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 17, 2025
97ccd80
Update
philipp94831 Jan 17, 2025
0bc1440
Update
philipp94831 Jan 17, 2025
857a910
Update
philipp94831 Jan 17, 2025
83fc24c
Update
philipp94831 Jan 17, 2025
9f667ea
Update
philipp94831 Jan 17, 2025
3c98bea
Update
philipp94831 Jan 17, 2025
1601954
Update
philipp94831 Jan 20, 2025
a1ecd38
Update
philipp94831 Jan 20, 2025
a4b4c33
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 20, 2025
09f386c
Address review
philipp94831 Jan 20, 2025
3d7300b
Address review
philipp94831 Jan 20, 2025
2045f42
Address review
philipp94831 Jan 20, 2025
87e21cd
Address review
philipp94831 Jan 20, 2025
89c2edf
Merge branch 'feature/test-improvements' into feature/awaitility
philipp94831 Jan 20, 2025
42c3270
Merge remote-tracking branch 'origin/master' into feature/awaitility
philipp94831 Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ junitVersion=5.11.4
mockitoVersion=5.15.2
assertJVersion=3.27.2
log4jVersion=2.24.3
awaitilityVersion=4.2.2
org.gradle.jvmargs=-Xmx4096m
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -95,7 +95,7 @@ public abstract class KafkaApplication<R extends Runner, CR extends CleanUpRunne
private Map<String, String> kafkaConfig = emptyMap();

/**
* <p>This methods needs to be called in the executable custom application class inheriting from
* <p>This method needs to be called in the executable custom application class inheriting from
* {@code KafkaApplication}.</p>
* <p>This method calls System exit</p>
*
Expand All @@ -109,7 +109,7 @@ public static void startApplication(final KafkaApplication<?, ?, ?, ?, ?, ?, ?>
}

/**
* <p>This methods needs to be called in the executable custom application class inheriting from
* <p>This method needs to be called in the executable custom application class inheriting from
* {@code KafkaApplication}.</p>
*
* @param app An instance of the custom application class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT;
import static com.bakdata.kafka.KafkaTest.newCluster;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.ginsberg.junit.exit.ExpectSystemExitWithStatus;
Expand All @@ -46,8 +47,10 @@

class CliTest {

private static void runApp(final KafkaStreamsApplication<?> app, final String... args) {
new Thread(() -> KafkaApplication.startApplication(app, args)).start();
private static Thread runApp(final KafkaStreamsApplication<?> app, final String... args) {
final Thread thread = new Thread(() -> KafkaApplication.startApplication(app, args));
thread.start();
return thread;
}

@Test
Expand Down Expand Up @@ -214,7 +217,7 @@ public SerdeConfig defaultSerializationConfig() {

@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorInTopology() throws InterruptedException {
void shouldExitWithErrorInTopology() {
final String input = "input";
try (final KafkaContainer kafkaCluster = newCluster();
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
Expand All @@ -238,7 +241,7 @@ public SerdeConfig defaultSerializationConfig() {
})) {
kafkaCluster.start();

runApp(app,
final Thread thread = runApp(app,
"--bootstrap-server", kafkaCluster.getBootstrapServers(),
"--input-topics", input
);
Expand All @@ -248,7 +251,7 @@ public SerdeConfig defaultSerializationConfig() {
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.to(input, List.of(new SimpleProducerRecord<>("foo", "bar")));
Thread.sleep(Duration.ofSeconds(10).toMillis());
await("Thread is dead").atMost(Duration.ofSeconds(10L)).until(() -> !thread.isAlive());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.bakdata.kafka.util.ImprovedAdminClient;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -47,10 +46,9 @@
import org.junit.jupiter.api.Test;

class RunProducerAppTest extends KafkaTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);

@Test
void shouldRunApp() throws InterruptedException {
void shouldRunApp() {
final String output = "output";
try (final KafkaProducerApplication<?> app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() {
@Override
Expand Down Expand Up @@ -84,7 +82,6 @@ public SerializerConfig defaultSerializationConfig() {
assertThat(kv.value().getContent()).isEqualTo("bar");
});
app.clean();
Thread.sleep(TIMEOUT.toMillis());
try (final ImprovedAdminClient admin = testClient.admin()) {
assertThat(admin.getTopicClient().exists(app.getOutputTopic()))
.as("Output topic is deleted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class RunStreamsAppTest extends KafkaTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.test_applications.WordCount;
import com.bakdata.kafka.util.ImprovedAdminClient;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -52,33 +51,15 @@
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@Slf4j
@ExtendWith(SoftAssertionsExtension.class)
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class StreamsCleanUpTest extends KafkaTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@InjectSoftAssertions
private SoftAssertions softly;

private static void runAppAndClose(final KafkaStreamsApplication<?> app) throws InterruptedException {
runApp(app);
app.stop();
}

private static void runApp(final KafkaStreamsApplication<?> app) throws InterruptedException {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
Thread.sleep(TIMEOUT.toMillis());
}

@Test
void shouldClean() throws InterruptedException {
void shouldClean() {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
Expand All @@ -98,8 +79,8 @@ void shouldClean() throws InterruptedException {
);
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
// Wait until all stream applications are completely stopped before triggering cleanup
this.awaitClosed(app.createExecutableApp());
app.clean();

try (final ImprovedAdminClient admin = testClient.admin()) {
Expand All @@ -114,7 +95,7 @@ void shouldClean() throws InterruptedException {
}

@Test
void shouldReset() throws InterruptedException {
void shouldReset() {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(app.getOutputTopic());
Expand All @@ -134,8 +115,8 @@ void shouldReset() throws InterruptedException {
);
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream application are completely stopped before triggering cleanup
Thread.sleep(TIMEOUT.toMillis());
// Wait until all stream applications are completely stopped before triggering cleanup
this.awaitClosed(app.createExecutableApp());
app.reset();

try (final ImprovedAdminClient admin = testClient.admin()) {
Expand All @@ -152,21 +133,31 @@ void shouldReset() throws InterruptedException {
}

@Test
void shouldCallClose() throws InterruptedException {
void shouldCallClose() {
try (final CloseFlagApp app = this.createCloseFlagApplication()) {
this.newTestClient().createTopic(app.getInputTopics().get(0));
Thread.sleep(TIMEOUT.toMillis());
this.softly.assertThat(app.isClosed()).isFalse();
this.softly.assertThat(app.isAppClosed()).isFalse();
app.clean();
this.softly.assertThat(app.isAppClosed()).isTrue();
app.setAppClosed(false);
Thread.sleep(TIMEOUT.toMillis());
app.reset();
this.softly.assertThat(app.isAppClosed()).isTrue();
}
}

private void runAppAndClose(final KafkaStreamsApplication<?> app) {
this.runApp(app);
app.stop();
}

private void runApp(final KafkaStreamsApplication<?> app) {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
this.awaitProcessing(app.createExecutableApp());
}

private CloseFlagApp createCloseFlagApplication() {
final CloseFlagApp app = new CloseFlagApp();
app.setInputTopics(List.of("input"));
Expand All @@ -185,9 +176,8 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) {
}

private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>> expectedValues,
final String description, final KafkaStreamsApplication<?> app)
throws InterruptedException {
runAppAndClose(app);
final String description, final KafkaStreamsApplication<?> app) {
this.runAppAndClose(app);

final List<KeyValue<String, Long>> output = this.readOutputTopic(app.getOutputTopic());
this.softly.assertThat(output)
Expand Down
2 changes: 2 additions & 0 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ dependencies {
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
val awaitilityVersion: String by project
testFixturesApi(group = "org.awaitility", name = "awaitility", version = awaitilityVersion)
}

tasks.withType<Test> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;

/**
Expand Down Expand Up @@ -68,6 +71,14 @@ private static KafkaAdminException failedToListGroups(final Throwable ex) {
return new KafkaAdminException("Failed to list consumer groups", ex);
}

private static KafkaAdminException failedToListOffsets(final String groupName, final Throwable ex) {
return new KafkaAdminException("Failed to list offsets for consumer group" + groupName, ex);
}

private static KafkaAdminException failedToDescribeGroup(final String groupName, final Throwable ex) {
return new KafkaAdminException("Failed to describe consumer group" + groupName, ex);
}

/**
* Delete a consumer group.
*
Expand All @@ -93,6 +104,63 @@ public void deleteConsumerGroup(final String groupName) {
}
}

/**
* Describe a consumer group.
*
* @param groupName the consumer group name
* @return consumer group description
*/
public ConsumerGroupDescription describe(final String groupName) {
log.info("Describing consumer group '{}'", groupName);
try {
final ConsumerGroupDescription description =
this.adminClient.describeConsumerGroups(List.of(groupName))
.all()
.get(this.timeout.toSeconds(), TimeUnit.SECONDS)
.get(groupName);
log.info("Described consumer group '{}'", groupName);
return description;
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw failedToDescribeGroup(groupName, ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw failedToDescribeGroup(groupName, ex);
} catch (final TimeoutException ex) {
throw failedToDescribeGroup(groupName, ex);
}
}

/**
* List offsets for a consumer group.
*
* @param groupName the consumer group name
* @return consumer group offsets
*/
public Map<TopicPartition, OffsetAndMetadata> listOffsets(final String groupName) {
log.info("Listing offsets for consumer group '{}'", groupName);
try {
final Map<TopicPartition, OffsetAndMetadata> offsets =
this.adminClient.listConsumerGroupOffsets(groupName)
.partitionsToOffsetAndMetadata(groupName)
.get(this.timeout.toSeconds(), TimeUnit.SECONDS);
log.info("Listed offsets for consumer group '{}'", groupName);
return offsets;
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw failedToListOffsets(groupName, ex);
} catch (final ExecutionException ex) {
if (ex.getCause() instanceof RuntimeException) {
throw (RuntimeException) ex.getCause();
}
throw failedToListOffsets(groupName, ex);
} catch (final TimeoutException ex) {
throw failedToListOffsets(groupName, ex);
}
}

@Override
public void close() {
this.adminClient.close();
Expand Down
Loading