diff --git a/README.md b/README.md index b3f2d63c..5d66f18e 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ import java.util.Map; import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.streams.kstream.KStream; -public class MyStreamsApplication extends KafkaStreamsApplication { +public class MyStreamsApplication extends KafkaStreamsApplication { public static void main(final String[] args) { startApplication(new MyStreamsApplication(), args); } @@ -157,7 +157,7 @@ import java.util.Map; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringSerializer; -public class MyProducerApplication extends KafkaProducerApplication { +public class MyProducerApplication extends KafkaProducerApplication { public static void main(final String[] args) { startApplication(new MyProducerApplication(), args); } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java index 87b5a53d..cd71f14b 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaProducerApplication.java @@ -37,7 +37,9 @@ *

The base class for creating Kafka Producer applications.

* This class provides all configuration options provided by {@link KafkaApplication}. * To implement your Kafka Producer application inherit from this class and add your custom options. Run it by - * calling {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main. + * calling {@link #startApplication(KafkaApplication, String[])} with an instance of your class from your main. + * + * @param type of {@link ProducerApp} created by this application */ @ToString(callSuper = true) @Getter @@ -45,10 +47,9 @@ @RequiredArgsConstructor @Slf4j @Command(description = "Run a Kafka Producer application") -public abstract class KafkaProducerApplication extends - KafkaApplication, ConfiguredProducerApp, ProducerTopicConfig, - ProducerApp> { +public abstract class KafkaProducerApplication extends + KafkaApplication, + ConfiguredProducerApp, ProducerTopicConfig, T> { /** * Delete all output topics associated with the Kafka Producer application. @@ -73,7 +74,7 @@ public final ProducerTopicConfig createTopicConfig() { } @Override - public final ConfiguredProducerApp createConfiguredApp(final ProducerApp app, + public final ConfiguredProducerApp createConfiguredApp(final T app, final AppConfiguration configuration) { return new ConfiguredProducerApp<>(app, configuration); } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java index 3095b468..6f561ad5 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/KafkaStreamsApplication.java @@ -57,7 +57,9 @@ *
  • {@link #volatileGroupInstanceId}
  • * * To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling - * {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main. + * {@link #startApplication(KafkaApplication, String[])} with an instance of your class from your main. + * + * @param type of {@link StreamsApp} created by this application */ @ToString(callSuper = true) @Getter @@ -65,9 +67,9 @@ @RequiredArgsConstructor @Slf4j @Command(description = "Run a Kafka Streams application.") -public abstract class KafkaStreamsApplication extends +public abstract class KafkaStreamsApplication extends KafkaApplication, ConfiguredStreamsApp, StreamsTopicConfig, StreamsApp> { + ExecutableStreamsApp, ConfiguredStreamsApp, StreamsTopicConfig, T> { @CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",") private List inputTopics = emptyList(); @CommandLine.Option(names = "--input-pattern", description = "Input pattern") @@ -138,9 +140,9 @@ public final StreamsTopicConfig createTopicConfig() { } @Override - public final ConfiguredStreamsApp createConfiguredApp(final StreamsApp app, + public final ConfiguredStreamsApp createConfiguredApp(final T app, final AppConfiguration configuration) { - final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(app, configuration); + final ConfiguredStreamsApp configuredApp = new ConfiguredStreamsApp<>(app, configuration); if (this.applicationId != null && !configuredApp.getUniqueAppId().equals(this.applicationId)) { throw new IllegalArgumentException( "Application ID provided via --application-id does not match StreamsApp#getUniqueAppId()"); diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java index 3882199e..2d6e77e3 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaProducerApplication.java @@ -30,13 +30,15 @@ /** * {@code KafkaProducerApplication} without any additional configuration options. + * + * @param type of {@link ProducerApp} created by this application */ @RequiredArgsConstructor -public final class SimpleKafkaProducerApplication extends KafkaProducerApplication { - private final @NonNull Supplier appFactory; +public final class SimpleKafkaProducerApplication extends KafkaProducerApplication { + private final @NonNull Supplier appFactory; @Override - public ProducerApp createApp() { + public T createApp() { return this.appFactory.get(); } } diff --git a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java index 7f3b828b..6ddd7ff7 100644 --- a/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java +++ b/streams-bootstrap-cli/src/main/java/com/bakdata/kafka/SimpleKafkaStreamsApplication.java @@ -30,13 +30,15 @@ /** * {@code KafkaStreamsApplication} without any additional configuration options. + * + * @param type of {@link StreamsApp} created by this application */ @RequiredArgsConstructor -public final class SimpleKafkaStreamsApplication extends KafkaStreamsApplication { - private final @NonNull Supplier appFactory; +public final class SimpleKafkaStreamsApplication extends KafkaStreamsApplication { + private final @NonNull Supplier appFactory; @Override - public StreamsApp createApp() { + public T createApp() { return this.appFactory.get(); } } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java index c6cbff75..05060156 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java @@ -44,14 +44,14 @@ class CliTest { - private static void runApp(final KafkaStreamsApplication app, final String... args) { + private static void runApp(final KafkaStreamsApplication app, final String... args) { new Thread(() -> KafkaApplication.startApplication(app, args)).start(); } @Test @ExpectSystemExitWithStatus(0) void shouldExitWithSuccessCode() { - KafkaApplication.startApplication(new KafkaStreamsApplication() { + KafkaApplication.startApplication(new KafkaStreamsApplication<>() { @Override public StreamsApp createApp() { return new StreamsApp() { @@ -86,7 +86,7 @@ public void run() { @Test @ExpectSystemExitWithStatus(1) void shouldExitWithErrorCodeOnRunError() { - KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(() -> new StreamsApp() { + KafkaApplication.startApplication(new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { throw new UnsupportedOperationException(); @@ -111,7 +111,7 @@ public SerdeConfig defaultSerializationConfig() { @Test @ExpectSystemExitWithStatus(1) void shouldExitWithErrorCodeOnCleanupError() { - KafkaApplication.startApplication(new KafkaStreamsApplication() { + KafkaApplication.startApplication(new KafkaStreamsApplication<>() { @Override public StreamsApp createApp() { return new StreamsApp() { @@ -147,7 +147,7 @@ public void clean() { @Test @ExpectSystemExitWithStatus(2) void shouldExitWithErrorCodeOnMissingBootstrapServersParameter() { - KafkaApplication.startApplication(new KafkaStreamsApplication() { + KafkaApplication.startApplication(new KafkaStreamsApplication<>() { @Override public StreamsApp createApp() { return new StreamsApp() { @@ -181,7 +181,7 @@ public void run() { @Test @ExpectSystemExitWithStatus(1) void shouldExitWithErrorCodeOnInconsistentAppId() { - KafkaApplication.startApplication(new KafkaStreamsApplication() { + KafkaApplication.startApplication(new KafkaStreamsApplication<>() { @Override public StreamsApp createApp() { return new StreamsApp() { @@ -215,7 +215,7 @@ public SerdeConfig defaultSerializationConfig() { void shouldExitWithErrorInTopology() throws InterruptedException { final String input = "input"; try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); - final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() { + final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())) @@ -252,7 +252,7 @@ void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException { final String input = "input"; final String output = "output"; try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster(); - final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() { + final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() { @Override public void buildTopology(final TopologyBuilder builder) { builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())) @@ -293,7 +293,7 @@ public SerdeConfig defaultSerializationConfig() { @Test @ExpectSystemExitWithStatus(1) void shouldExitWithErrorOnCleanupError() { - KafkaApplication.startApplication(new KafkaStreamsApplication() { + KafkaApplication.startApplication(new KafkaStreamsApplication<>() { @Override public StreamsApp createApp() { return new StreamsApp() { @@ -323,7 +323,7 @@ public SerdeConfig defaultSerializationConfig() { @Test void shouldParseArguments() { - try (final KafkaStreamsApplication app = new KafkaStreamsApplication() { + try (final KafkaStreamsApplication app = new KafkaStreamsApplication<>() { @Override public StreamsApp createApp() { return new StreamsApp() { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java index 2d00025d..cdd7b0d9 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CloseFlagApp.java @@ -33,7 +33,7 @@ @NoArgsConstructor @Getter @Setter -public class CloseFlagApp extends KafkaStreamsApplication { +public class CloseFlagApp extends KafkaStreamsApplication { private boolean closed = false; private boolean appClosed = false; diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 2fa17df3..b5b74aa5 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -74,7 +74,7 @@ void tearDown() { void shouldRunApp() throws InterruptedException { final String output = "output"; this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults()); - try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication(() -> new ProducerApp() { + try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() { @Override public ProducerRunnable buildRunnable(final ProducerBuilder builder) { return () -> { diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java index 97db53d9..5657040b 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunStreamsAppTest.java @@ -70,7 +70,7 @@ void shouldRunApp() throws InterruptedException { final String output = "output"; this.kafkaCluster.createTopic(TopicConfig.withName(input).useDefaults()); this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults()); - try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(Mirror::new)) { + try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication<>(Mirror::new)) { app.setBootstrapServers(this.kafkaCluster.getBrokerList()); app.setKafkaConfig(Map.of( ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java index 9eadda6b..2f56fc8d 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpTest.java @@ -67,12 +67,12 @@ class StreamsCleanUpTest { @InjectSoftAssertions private SoftAssertions softly; - private static void runAppAndClose(final KafkaStreamsApplication app) throws InterruptedException { + private static void runAppAndClose(final KafkaStreamsApplication app) throws InterruptedException { runApp(app); app.stop(); } - private static void runApp(final KafkaStreamsApplication app) throws InterruptedException { + private static void runApp(final KafkaStreamsApplication app) throws InterruptedException { // run in Thread because the application blocks indefinitely new Thread(app).start(); // Wait until stream application has consumed all data @@ -93,7 +93,7 @@ void tearDown() throws InterruptedException { @Test void shouldClean() throws InterruptedException { - try (final KafkaStreamsApplication app = this.createWordCountApplication()) { + try (final KafkaStreamsApplication app = this.createWordCountApplication()) { final SendValuesTransactional sendRequest = SendValuesTransactional.inTransaction(app.getInputTopics().get(0), List.of("blub", "bla", "blub")).useDefaults(); @@ -120,7 +120,7 @@ void shouldClean() throws InterruptedException { @Test void shouldReset() throws InterruptedException { - try (final KafkaStreamsApplication app = this.createWordCountApplication()) { + try (final KafkaStreamsApplication app = this.createWordCountApplication()) { final SendValuesTransactional sendRequest = SendValuesTransactional.inTransaction(app.getInputTopics().get(0), List.of("blub", "bla", "blub")).useDefaults(); @@ -179,7 +179,7 @@ private List> readOutputTopic(final String outputTopic) t } private void runAndAssertContent(final Iterable> expectedValues, - final String description, final KafkaStreamsApplication app) + final String description, final KafkaStreamsApplication app) throws InterruptedException { runAppAndClose(app); @@ -189,14 +189,14 @@ private void runAndAssertContent(final Iterable .containsExactlyInAnyOrderElementsOf(expectedValues); } - private KafkaStreamsApplication createWordCountApplication() { - final KafkaStreamsApplication application = new SimpleKafkaStreamsApplication(WordCount::new); + private KafkaStreamsApplication createWordCountApplication() { + final KafkaStreamsApplication application = new SimpleKafkaStreamsApplication<>(WordCount::new); application.setOutputTopic("word_output"); application.setInputTopics(List.of("word_input")); return this.configure(application); } - private T configure(final T application) { + private > T configure(final T application) { application.setBootstrapServers(this.kafkaCluster.getBrokerList()); application.setKafkaConfig(Map.of( StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0",