Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 23, 2025
1 parent 40623a9 commit 4aebdde
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
@RequiredArgsConstructor
public final class TestApplicationHelper {

private final @NonNull String bootstrapServers;
@Getter
private final @NonNull SchemaRegistryEnv schemaRegistryEnv;

Expand Down Expand Up @@ -77,14 +78,15 @@ public <K, V> TestTopology<K, V> createTopologyExtension(final KafkaStreamsAppli
return testTopologyFactory.createTopologyExtension(configuredApp);
}

public KafkaTestClient newTestClient(final String bootstrapServers) {
public KafkaTestClient newTestClient() {
return new KafkaTestClient(KafkaEndpointConfig.builder()
.bootstrapServers(bootstrapServers)
.bootstrapServers(this.bootstrapServers)
.schemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl())
.build());
}

public void configure(final KafkaStreamsApplication<? extends StreamsApp> app) {
app.setBootstrapServers(this.bootstrapServers);
app.setSchemaRegistryUrl(this.schemaRegistryEnv.getSchemaRegistryUrl());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ void shouldRunApp() {
final KafkaTestClient testClient = this.newTestClient();
testClient.createTopic(output);
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
app.setBootstrapServers(this.getBootstrapServers());
app.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir));
app.setInputTopics(List.of(input));
app.setOutputTopic(output);
new TestApplicationHelper(withoutSchemaRegistry()).runApplication(app);
new TestApplicationHelper(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app);
testClient.send()
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private void runAppAndClose(final KafkaStreamsApplication<?> app) {
}

private void runApp(final KafkaStreamsApplication<?> app) {
new TestApplicationHelper(withoutSchemaRegistry()).runApplication(app);
new TestApplicationHelper(this.getBootstrapServers(), withoutSchemaRegistry()).runApplication(app);
// Wait until stream application has consumed all data
this.awaitProcessing(app.createExecutableApp());
}
Expand Down Expand Up @@ -198,7 +198,6 @@ private KafkaStreamsApplication<?> createWordCountApplication() {
}

private <T extends KafkaStreamsApplication<?>> T configure(final T application) {
application.setBootstrapServers(this.getBootstrapServers());
application.setKafkaConfig(TestTopologyFactory.createStreamsTestConfig(this.stateDir));
return application;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.mockito.Mockito.when;

import com.bakdata.kafka.AppConfiguration;
import com.bakdata.kafka.CapturingUncaughtExceptionHandler;
import com.bakdata.kafka.ConfiguredStreamsApp;
import com.bakdata.kafka.KafkaTest;
import com.bakdata.kafka.KafkaTestClient;
Expand All @@ -48,7 +49,6 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
Expand Down Expand Up @@ -229,16 +229,6 @@ private ConfiguredStreamsApp<StreamsApp> createErrorApplication() {
.build());
}

@Getter
private static class CapturingUncaughtExceptionHandler implements UncaughtExceptionHandler {
private Throwable lastException;

@Override
public void uncaughtException(final Thread t, final Throwable e) {
this.lastException = e;
}
}

private static class ErrorApplication implements StreamsApp {

@Override
Expand Down

0 comments on commit 4aebdde

Please sign in to comment.