Skip to content

Commit

Permalink
Add hook to prepare running of app (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Aug 20, 2024
1 parent 8a75197 commit 49e7122
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 72 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class MyStreamsApplication extends KafkaStreamsApplication {
}

@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -163,7 +163,7 @@ public class MyProducerApplication extends KafkaProducerApplication {
}

@Override
public ProducerApp createApp(final boolean cleanUp) {
public ProducerApp createApp() {
return new ProducerApp() {
@Override
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,15 @@ private static String[] addEnvironmentVariablesArguments(final String[] args) {
/**
* Create a new app that will be configured and executed according to this application.
*
* @param cleanUp whether app is created for clean up purposes. In that case, the user might want
* to skip initialization of expensive resources.
* @return app
*/
public abstract A createApp(boolean cleanUp);

/**
* Create a new app that will be configured and executed according to this application.
*
* @return app
*/
public A createApp() {
return this.createApp(false);
}
public abstract A createApp();

/**
* Clean all resources associated with this application
*/
public void clean() {
this.prepareClean();
try (final CleanableApp<CR> cleanableApp = this.createCleanableApp()) {
final CR cleanUpRunner = cleanableApp.getCleanUpRunner();
cleanUpRunner.clean();
Expand All @@ -197,6 +187,7 @@ public final void stop() {
*/
@Override
public void run() {
this.prepareRun();
try (final RunnableApp<R> runnableApp = this.createRunnableApp()) {
final R runner = runnableApp.getRunner();
runner.run();
Expand All @@ -210,45 +201,25 @@ public KafkaEndpointConfig getEndpointConfig() {
.build();
}

/**
* Create a new {@code ExecutableApp} that will be executed according to the requested command.
* @return {@code ExecutableApp}
*/
public final E createExecutableApp() {
return this.createExecutableApp(false);
}

/**
* Create a new {@code ExecutableApp} that will be executed according to the requested command.
*
* @param cleanUp whether app is created for clean up purposes. In that case, the user might want to skip
* initialization of expensive resources.
* @return {@code ExecutableApp}
*/
public final E createExecutableApp(final boolean cleanUp) {
final ConfiguredApp<E> configuredStreamsApp = this.createConfiguredApp(cleanUp);
public final E createExecutableApp() {
final ConfiguredApp<E> configuredStreamsApp = this.createConfiguredApp();
final KafkaEndpointConfig endpointConfig = this.getEndpointConfig();
return configuredStreamsApp.withEndpoint(endpointConfig);
}

/**
* Create a new {@code ConfiguredApp} that will be executed according to this application.
* @return {@code ConfiguredApp}
*/
public final CA createConfiguredApp() {
return this.createConfiguredApp(false);
}

/**
* Create a new {@code ConfiguredApp} that will be executed according to this application.
*
* @param cleanUp whether {@code ConfiguredApp} is created for clean up purposes. In that case, the user might want
* to skip initialization of expensive resources.
* @return {@code ConfiguredApp}
*/
public final CA createConfiguredApp(final boolean cleanUp) {
public final CA createConfiguredApp() {
final AppConfiguration<T> configuration = this.createConfiguration();
final A app = this.createApp(cleanUp);
final A app = this.createApp();
return this.createConfiguredApp(app, configuration);
}

Expand All @@ -266,7 +237,7 @@ public final AppConfiguration<T> createConfiguration() {
* @return {@code RunnableApp}
*/
public final RunnableApp<R> createRunnableApp() {
final ExecutableApp<R, ?, O> app = this.createExecutableApp(false);
final ExecutableApp<R, ?, O> app = this.createExecutableApp();
final Optional<O> executionOptions = this.createExecutionOptions();
final R runner = executionOptions.map(app::createRunner).orElseGet(app::createRunner);
final RunnableApp<R> runnableApp = new RunnableApp<>(app, runner, this.activeApps::remove);
Expand All @@ -279,7 +250,7 @@ public final RunnableApp<R> createRunnableApp() {
* @return {@code CleanableApp}
*/
public final CleanableApp<CR> createCleanableApp() {
final ExecutableApp<R, CR, O> executableApp = this.createExecutableApp(true);
final ExecutableApp<R, CR, O> executableApp = this.createExecutableApp();
final CR cleanUpRunner = executableApp.createCleanUpRunner();
final CleanableApp<CR> cleanableApp = new CleanableApp<>(executableApp, cleanUpRunner, this.activeApps::remove);
this.activeApps.add(cleanableApp);
Expand All @@ -302,6 +273,20 @@ protected void onApplicationStart() {
// do nothing by default
}

/**
* Called before running the application, i.e., invoking {@link #run()}
*/
protected void prepareRun() {
// do nothing by default
}

/**
* Called before cleaning the application, i.e., invoking {@link #clean()}
*/
protected void prepareClean() {
// do nothing by default
}

private void startApplication() {
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
this.onApplicationStart();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public void clean() {
@Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka Streams application.")
public void reset() {
this.prepareClean();
try (final CleanableApp<StreamsCleanUpRunner> app = this.createCleanableApp()) {
final StreamsCleanUpRunner runner = app.getCleanUpRunner();
runner.reset();
Expand Down Expand Up @@ -147,6 +148,14 @@ public final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final StreamsA
return configuredApp;
}

/**
* Called before cleaning the application, i.e., invoking {@link #clean()} or {@link #reset()}
*/
@Override
protected void prepareClean() {
super.prepareClean();
}

/**
* Create a {@link StateListener} to use for Kafka Streams.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import java.util.function.Function;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -34,18 +33,10 @@
*/
@RequiredArgsConstructor
public final class SimpleKafkaProducerApplication extends KafkaProducerApplication {
private final @NonNull Function<Boolean, ProducerApp> appFactory;

/**
* Create new {@code SimpleKafkaProducerApplication}
* @param appFactory factory to create {@code ProducerApp} without any parameters
*/
public SimpleKafkaProducerApplication(final Supplier<? extends ProducerApp> appFactory) {
this(cleanUp -> appFactory.get());
}
private final @NonNull Supplier<ProducerApp> appFactory;

@Override
public ProducerApp createApp(final boolean cleanUp) {
return this.appFactory.apply(cleanUp);
public ProducerApp createApp() {
return this.appFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import java.util.function.Function;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -34,19 +33,10 @@
*/
@RequiredArgsConstructor
public final class SimpleKafkaStreamsApplication extends KafkaStreamsApplication {

private final @NonNull Function<Boolean, StreamsApp> appFactory;

/**
* Create new {@code SimpleKafkaStreamsApplication}
* @param appFactory factory to create {@code StreamsApp} without any parameters
*/
public SimpleKafkaStreamsApplication(final Supplier<? extends StreamsApp> appFactory) {
this(cleanUp -> appFactory.get());
}
private final @NonNull Supplier<StreamsApp> appFactory;

@Override
public StreamsApp createApp(final boolean cleanUp) {
return this.appFactory.apply(cleanUp);
public StreamsApp createApp() {
return this.appFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private static void runApp(final KafkaStreamsApplication app, final String... ar
void shouldExitWithSuccessCode() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -113,7 +113,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldExitWithErrorCodeOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -149,7 +149,7 @@ public void clean() {
void shouldExitWithErrorCodeOnMissingBootstrapServersParameter() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -183,7 +183,7 @@ public void run() {
void shouldExitWithErrorCodeOnInconsistentAppId() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -295,7 +295,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldExitWithErrorOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down Expand Up @@ -325,7 +325,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldParseArguments() {
try (final KafkaStreamsApplication app = new KafkaStreamsApplication() {
@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void close() {
}

@Override
public StreamsApp createApp(final boolean cleanUp) {
public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
Expand Down

0 comments on commit 49e7122

Please sign in to comment.