From 49e71220c62e2e17a870c508db5b0d42ee63885b Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Tue, 20 Aug 2024 09:21:32 +0200 Subject: [PATCH] Add hook to prepare running of app (#256) --- README.md | 4 +- .../com/bakdata/kafka/KafkaApplication.java | 61 +++++++------------ .../kafka/KafkaStreamsApplication.java | 9 +++ .../kafka/SimpleKafkaProducerApplication.java | 15 +---- .../kafka/SimpleKafkaStreamsApplication.java | 16 +---- .../test/java/com/bakdata/kafka/CliTest.java | 12 ++-- .../java/com/bakdata/kafka/CloseFlagApp.java | 2 +- 7 files changed, 47 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index 33ff216f..b3f2d63c 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,7 @@ public class MyStreamsApplication extends KafkaStreamsApplication { } @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -163,7 +163,7 @@ public class MyProducerApplication extends KafkaProducerApplication { } @Override - public ProducerApp createApp(final boolean cleanUp) { + public ProducerApp createApp() { return new ProducerApp() { @Override public ProducerRunnable buildRunnable(final ProducerBuilder builder) { diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 9392847e..bac2a236 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -152,25 +152,15 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) { /** * Create a new app that will be configured and executed according to this application. * - * @param cleanUp whether app is created for clean up purposes. In that case, the user might want - * to skip initialization of expensive resources. * @return app */ - public abstract A createApp(boolean cleanUp); - - /** - * Create a new app that will be configured and executed according to this application. - * - * @return app - */ - public A createApp() { - return this.createApp(false); - } + public abstract A createApp(); /** * Clean all resources associated with this application */ public void clean() { + this.prepareClean(); try (final CleanableApp cleanableApp = this.createCleanableApp()) { final CR cleanUpRunner = cleanableApp.getCleanUpRunner(); cleanUpRunner.clean(); @@ -197,6 +187,7 @@ public final void stop() { */ @Override public void run() { + this.prepareRun(); try (final RunnableApp runnableApp = this.createRunnableApp()) { final R runner = runnableApp.getRunner(); runner.run(); @@ -210,45 +201,25 @@ public KafkaEndpointConfig getEndpointConfig() { .build(); } - /** - * Create a new {@code ExecutableApp} that will be executed according to the requested command. - * @return {@code ExecutableApp} - */ - public final E createExecutableApp() { - return this.createExecutableApp(false); - } - /** * Create a new {@code ExecutableApp} that will be executed according to the requested command. * - * @param cleanUp whether app is created for clean up purposes. In that case, the user might want to skip - * initialization of expensive resources. * @return {@code ExecutableApp} */ - public final E createExecutableApp(final boolean cleanUp) { - final ConfiguredApp configuredStreamsApp = this.createConfiguredApp(cleanUp); + public final E createExecutableApp() { + final ConfiguredApp configuredStreamsApp = this.createConfiguredApp(); final KafkaEndpointConfig endpointConfig = this.getEndpointConfig(); return configuredStreamsApp.withEndpoint(endpointConfig); } - /** - * Create a new {@code ConfiguredApp} that will be executed according to this application. - * @return {@code ConfiguredApp} - */ - public final CA createConfiguredApp() { - return this.createConfiguredApp(false); - } - /** * Create a new {@code ConfiguredApp} that will be executed according to this application. * - * @param cleanUp whether {@code ConfiguredApp} is created for clean up purposes. In that case, the user might want - * to skip initialization of expensive resources. * @return {@code ConfiguredApp} */ - public final CA createConfiguredApp(final boolean cleanUp) { + public final CA createConfiguredApp() { final AppConfiguration configuration = this.createConfiguration(); - final A app = this.createApp(cleanUp); + final A app = this.createApp(); return this.createConfiguredApp(app, configuration); } @@ -266,7 +237,7 @@ public final AppConfiguration createConfiguration() { * @return {@code RunnableApp} */ public final RunnableApp createRunnableApp() { - final ExecutableApp app = this.createExecutableApp(false); + final ExecutableApp app = this.createExecutableApp(); final Optional executionOptions = this.createExecutionOptions(); final R runner = executionOptions.map(app::createRunner).orElseGet(app::createRunner); final RunnableApp runnableApp = new RunnableApp<>(app, runner, this.activeApps::remove); @@ -279,7 +250,7 @@ public final RunnableApp createRunnableApp() { * @return {@code CleanableApp} */ public final CleanableApp createCleanableApp() { - final ExecutableApp executableApp = this.createExecutableApp(true); + final ExecutableApp executableApp = this.createExecutableApp(); final CR cleanUpRunner = executableApp.createCleanUpRunner(); final CleanableApp cleanableApp = new CleanableApp<>(executableApp, cleanUpRunner, this.activeApps::remove); this.activeApps.add(cleanableApp); @@ -302,6 +273,20 @@ protected void onApplicationStart() { // do nothing by default } + /** + * Called before running the application, i.e., invoking {@link #run()} + */ + protected void prepareRun() { + // do nothing by default + } + + /** + * Called before cleaning the application, i.e., invoking {@link #clean()} + */ + protected void prepareClean() { + // do nothing by default + } + private void startApplication() { Runtime.getRuntime().addShutdownHook(new Thread(this::close)); this.onApplicationStart(); diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index f0281306..3095b468 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -106,6 +106,7 @@ public void clean() { @Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the " + "Kafka Streams application.") public void reset() { + this.prepareClean(); try (final CleanableApp app = this.createCleanableApp()) { final StreamsCleanUpRunner runner = app.getCleanUpRunner(); runner.reset(); @@ -147,6 +148,14 @@ public final ConfiguredStreamsApp createConfiguredApp(final StreamsA return configuredApp; } + /** + * Called before cleaning the application, i.e., invoking {@link #clean()} or {@link #reset()} + */ + @Override + protected void prepareClean() { + super.prepareClean(); + } + /** * Create a {@link StateListener} to use for Kafka Streams. * diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java index e14e764a..3882199e 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import java.util.function.Function; import java.util.function.Supplier; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -34,18 +33,10 @@ */ @RequiredArgsConstructor public final class SimpleKafkaProducerApplication extends KafkaProducerApplication { - private final @NonNull Function appFactory; - - /** - * Create new {@code SimpleKafkaProducerApplication} - * @param appFactory factory to create {@code ProducerApp} without any parameters - */ - public SimpleKafkaProducerApplication(final Supplier appFactory) { - this(cleanUp -> appFactory.get()); - } + private final @NonNull Supplier appFactory; @Override - public ProducerApp createApp(final boolean cleanUp) { - return this.appFactory.apply(cleanUp); + public ProducerApp createApp() { + return this.appFactory.get(); } } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java index 1279cdd8..7f3b828b 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import java.util.function.Function; import java.util.function.Supplier; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -34,19 +33,10 @@ */ @RequiredArgsConstructor public final class SimpleKafkaStreamsApplication extends KafkaStreamsApplication { - - private final @NonNull Function appFactory; - - /** - * Create new {@code SimpleKafkaStreamsApplication} - * @param appFactory factory to create {@code StreamsApp} without any parameters - */ - public SimpleKafkaStreamsApplication(final Supplier appFactory) { - this(cleanUp -> appFactory.get()); - } + private final @NonNull Supplier appFactory; @Override - public StreamsApp createApp(final boolean cleanUp) { - return this.appFactory.apply(cleanUp); + public StreamsApp createApp() { + return this.appFactory.get(); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index 42f6756e..c6cbff75 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -53,7 +53,7 @@ private static void runApp(final KafkaStreamsApplication app, final String... ar void shouldExitWithSuccessCode() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -113,7 +113,7 @@ public SerdeConfig defaultSerializationConfig() { void shouldExitWithErrorCodeOnCleanupError() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -149,7 +149,7 @@ public void clean() { void shouldExitWithErrorCodeOnMissingBootstrapServersParameter() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -183,7 +183,7 @@ public void run() { void shouldExitWithErrorCodeOnInconsistentAppId() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -295,7 +295,7 @@ public SerdeConfig defaultSerializationConfig() { void shouldExitWithErrorOnCleanupError() { KafkaApplication.startApplication(new KafkaStreamsApplication() { @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { @@ -325,7 +325,7 @@ public SerdeConfig defaultSerializationConfig() { void shouldParseArguments() { try (final KafkaStreamsApplication app = new KafkaStreamsApplication() { @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java index 640328eb..2d00025d 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java @@ -45,7 +45,7 @@ public void close() { } @Override - public StreamsApp createApp(final boolean cleanUp) { + public StreamsApp createApp() { return new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) {