Skip to content

Commit

Permalink
Make methods running before application start public
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 23, 2025
1 parent a725afb commit d40198d
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 4 deletions.
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ include(
":streams-bootstrap-test",
":streams-bootstrap-large-messages",
":streams-bootstrap-cli",
":streams-bootstrap-cli-test",
)
6 changes: 6 additions & 0 deletions streams-bootstrap-cli-test/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
description = "Utils for testing your Kafka Streams Application"

dependencies {
api(project(":streams-bootstrap-test"))
api(project(":streams-bootstrap-cli"))
}
3 changes: 3 additions & 0 deletions streams-bootstrap-cli-test/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This file is generated by the 'io.freefair.lombok' Gradle plugin
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.lang.Thread.UncaughtExceptionHandler;
import lombok.Getter;

@Getter
public class CapturingUncaughtExceptionHandler implements UncaughtExceptionHandler {
private Throwable lastException;

@Override
public void uncaughtException(final Thread t, final Throwable e) {
this.lastException = e;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import java.lang.Thread.UncaughtExceptionHandler;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import picocli.CommandLine;

@RequiredArgsConstructor
public final class TestApplicationHelper {

@Delegate
private final @NonNull TestTopologyFactory topologyFactory;

public static TestApplicationHelper withoutSchemaRegistry() {
return new TestApplicationHelper(TestTopologyFactory.withoutSchemaRegistry());
}

public static TestApplicationHelper withSchemaRegistry() {
return new TestApplicationHelper(TestTopologyFactory.withSchemaRegistry());
}

public static TestApplicationHelper withSchemaRegistry(final String schemaRegistryUrl) {
return new TestApplicationHelper(TestTopologyFactory.withSchemaRegistry(schemaRegistryUrl));
}

public Thread runApplication(final KafkaStreamsApplication<? extends StreamsApp> app) {
this.configure(app);
new CommandLine(app); // initialize all mixins
app.onApplicationStart();
final Thread thread = new Thread(app);
final UncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
thread.setUncaughtExceptionHandler(handler);
thread.start();
return thread;
}

public ConsumerGroupVerifier verify(final KafkaStreamsApplication<? extends StreamsApp> app) {
this.configure(app);
final KafkaEndpointConfig endpointConfig = app.getEndpointConfig();
final KafkaTestClient testClient = new KafkaTestClient(endpointConfig);
try (final ConfiguredStreamsApp<? extends StreamsApp> configuredApp = app.createConfiguredApp()) {
final String uniqueAppId = configuredApp.getUniqueAppId();
return new ConsumerGroupVerifier(uniqueAppId, testClient::admin);
}
}

public ConfiguredStreamsApp<? extends StreamsApp> createConfiguredApp(
final KafkaStreamsApplication<? extends StreamsApp> app) {
this.configure(app);
app.prepareRun();
return app.createConfiguredApp();
}

public <K, V> TestTopology<K, V> createTopology(final KafkaStreamsApplication<? extends StreamsApp> app) {
final ConfiguredStreamsApp<? extends StreamsApp> configuredApp = this.createConfiguredApp(app);
return this.<K, V>createTopology(configuredApp);
}

public <K, V> TestTopology<K, V> createTopologyExtension(final KafkaStreamsApplication<? extends StreamsApp> app) {
final ConfiguredStreamsApp<? extends StreamsApp> configuredApp = this.createConfiguredApp(app);
return this.<K, V>createTopologyExtension(configuredApp);
}

public KafkaTestClient newTestClient(final String bootstrapServers) {
return new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(bootstrapServers)
.schemaRegistryUrl(this.getSchemaRegistryUrl())
.build());
}

public void configure(final KafkaStreamsApplication<? extends StreamsApp> app) {
app.setSchemaRegistryUrl(this.getSchemaRegistryUrl());
}

}
1 change: 1 addition & 0 deletions streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion)
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
testImplementation(testFixtures(project(":streams-bootstrap-core")))
testImplementation(project(":streams-bootstrap-cli-test"))
testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2")
val confluentVersion: String by project
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.TestApplicationHelper;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.test_applications.Mirror;
import java.nio.file.Path;
Expand All @@ -57,8 +58,7 @@ void shouldRunApp() {
app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir));
app.setInputTopics(List.of(input));
app.setOutputTopic(output);
// run in Thread because the application blocks indefinitely
new Thread(app).start();
TestApplicationHelper.withoutSchemaRegistry().runApplication(app);
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.bakdata.kafka.KafkaTestClient;
import com.bakdata.kafka.SenderBuilder.SimpleProducerRecord;
import com.bakdata.kafka.SimpleKafkaStreamsApplication;
import com.bakdata.kafka.TestApplicationHelper;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.test_applications.WordCount;
import com.bakdata.kafka.util.ImprovedAdminClient;
Expand Down Expand Up @@ -155,8 +156,7 @@ private void runAppAndClose(final KafkaStreamsApplication<?> app) {
}

private void runApp(final KafkaStreamsApplication<?> app) {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
TestApplicationHelper.withoutSchemaRegistry().runApplication(app);
// Wait until stream application has consumed all data
this.awaitProcessing(app.createExecutableApp());
}
Expand Down

0 comments on commit d40198d

Please sign in to comment.