Skip to content

Commit

Permalink
Use generics to determine type of Streams/ProducerApp
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Dec 5, 2024
1 parent 7316811 commit cc84fd1
Show file tree
Hide file tree
Showing 10 changed files with 47 additions and 40 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class MyStreamsApplication extends KafkaStreamsApplication {
public class MyStreamsApplication extends KafkaStreamsApplication<StreamsApp> {
public static void main(final String[] args) {
startApplication(new MyStreamsApplication(), args);
}
Expand Down Expand Up @@ -157,7 +157,7 @@ import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyProducerApplication extends KafkaProducerApplication {
public class MyProducerApplication extends KafkaProducerApplication<ProducerApp> {
public static void main(final String[] args) {
startApplication(new MyProducerApplication(), args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@
* <p>The base class for creating Kafka Producer applications.</p>
* This class provides all configuration options provided by {@link KafkaApplication}.
* To implement your Kafka Producer application inherit from this class and add your custom options. Run it by
* calling {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
* calling {@link #startApplication(KafkaApplication, String[])} with an instance of your class from your main.
*
* @param <T> type of {@link ProducerApp} created by this application
*/
@ToString(callSuper = true)
@Getter
@Setter
@RequiredArgsConstructor
@Slf4j
@Command(description = "Run a Kafka Producer application")
public abstract class KafkaProducerApplication extends
KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions,
ExecutableProducerApp<ProducerApp>, ConfiguredProducerApp<ProducerApp>, ProducerTopicConfig,
ProducerApp> {
public abstract class KafkaProducerApplication<T extends ProducerApp> extends
KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions, ExecutableProducerApp<T>,
ConfiguredProducerApp<T>, ProducerTopicConfig, T> {

/**
* Delete all output topics associated with the Kafka Producer application.
Expand All @@ -73,7 +74,7 @@ public final ProducerTopicConfig createTopicConfig() {
}

@Override
public final ConfiguredProducerApp<ProducerApp> createConfiguredApp(final ProducerApp app,
public final ConfiguredProducerApp<T> createConfiguredApp(final T app,
final AppConfiguration<ProducerTopicConfig> configuration) {
return new ConfiguredProducerApp<>(app, configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,19 @@
* <li>{@link #volatileGroupInstanceId}</li>
* </ul>
* To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling
* {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
* {@link #startApplication(KafkaApplication, String[])} with an instance of your class from your main.
*
* @param <T> type of {@link StreamsApp} created by this application
*/
@ToString(callSuper = true)
@Getter
@Setter
@RequiredArgsConstructor
@Slf4j
@Command(description = "Run a Kafka Streams application.")
public abstract class KafkaStreamsApplication extends
public abstract class KafkaStreamsApplication<T extends StreamsApp> extends
KafkaApplication<StreamsRunner, StreamsCleanUpRunner, StreamsExecutionOptions,
ExecutableStreamsApp<StreamsApp>, ConfiguredStreamsApp<StreamsApp>, StreamsTopicConfig, StreamsApp> {
ExecutableStreamsApp<T>, ConfiguredStreamsApp<T>, StreamsTopicConfig, T> {
@CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",")
private List<String> inputTopics = emptyList();
@CommandLine.Option(names = "--input-pattern", description = "Input pattern")
Expand Down Expand Up @@ -138,9 +140,9 @@ public final StreamsTopicConfig createTopicConfig() {
}

@Override
public final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final StreamsApp app,
public final ConfiguredStreamsApp<T> createConfiguredApp(final T app,
final AppConfiguration<StreamsTopicConfig> configuration) {
final ConfiguredStreamsApp<StreamsApp> configuredApp = new ConfiguredStreamsApp<>(app, configuration);
final ConfiguredStreamsApp<T> configuredApp = new ConfiguredStreamsApp<>(app, configuration);
if (this.applicationId != null && !configuredApp.getUniqueAppId().equals(this.applicationId)) {
throw new IllegalArgumentException(
"Application ID provided via --application-id does not match StreamsApp#getUniqueAppId()");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@

/**
* {@code KafkaProducerApplication} without any additional configuration options.
*
* @param <T> type of {@link ProducerApp} created by this application
*/
@RequiredArgsConstructor
public final class SimpleKafkaProducerApplication extends KafkaProducerApplication {
private final @NonNull Supplier<ProducerApp> appFactory;
public final class SimpleKafkaProducerApplication<T extends ProducerApp> extends KafkaProducerApplication<T> {
private final @NonNull Supplier<T> appFactory;

@Override
public ProducerApp createApp() {
public T createApp() {
return this.appFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@

/**
* {@code KafkaStreamsApplication} without any additional configuration options.
*
* @param <T> type of {@link StreamsApp} created by this application
*/
@RequiredArgsConstructor
public final class SimpleKafkaStreamsApplication extends KafkaStreamsApplication {
private final @NonNull Supplier<StreamsApp> appFactory;
public final class SimpleKafkaStreamsApplication<T extends StreamsApp> extends KafkaStreamsApplication<T> {
private final @NonNull Supplier<T> appFactory;

@Override
public StreamsApp createApp() {
public T createApp() {
return this.appFactory.get();
}
}
20 changes: 10 additions & 10 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@

class CliTest {

private static void runApp(final KafkaStreamsApplication app, final String... args) {
private static void runApp(final KafkaStreamsApplication<?> app, final String... args) {
new Thread(() -> KafkaApplication.startApplication(app, args)).start();
}

@Test
@ExpectSystemExitWithStatus(0)
void shouldExitWithSuccessCode() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -86,7 +86,7 @@ public void run() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorCodeOnRunError() {
KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
KafkaApplication.startApplication(new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
Expand All @@ -111,7 +111,7 @@ public SerdeConfig defaultSerializationConfig() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorCodeOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -147,7 +147,7 @@ public void clean() {
@Test
@ExpectSystemExitWithStatus(2)
void shouldExitWithErrorCodeOnMissingBootstrapServersParameter() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -181,7 +181,7 @@ public void run() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorCodeOnInconsistentAppId() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -215,7 +215,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldExitWithErrorInTopology() throws InterruptedException {
final String input = "input";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
Expand Down Expand Up @@ -252,7 +252,7 @@ void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException {
final String input = "input";
final String output = "output";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
Expand Down Expand Up @@ -293,7 +293,7 @@ public SerdeConfig defaultSerializationConfig() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -323,7 +323,7 @@ public SerdeConfig defaultSerializationConfig() {

@Test
void shouldParseArguments() {
try (final KafkaStreamsApplication app = new KafkaStreamsApplication() {
try (final KafkaStreamsApplication<?> app = new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@NoArgsConstructor
@Getter
@Setter
public class CloseFlagApp extends KafkaStreamsApplication {
public class CloseFlagApp extends KafkaStreamsApplication<StreamsApp> {

private boolean closed = false;
private boolean appClosed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void tearDown() {
void shouldRunApp() throws InterruptedException {
final String output = "output";
this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults());
try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication(() -> new ProducerApp() {
try (final KafkaProducerApplication<?> app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() {
@Override
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void shouldRunApp() throws InterruptedException {
final String output = "output";
this.kafkaCluster.createTopic(TopicConfig.withName(input).useDefaults());
this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults());
try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(Mirror::new)) {
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
app.setBootstrapServers(this.kafkaCluster.getBrokerList());
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ class StreamsCleanUpTest {
@InjectSoftAssertions
private SoftAssertions softly;

private static void runAppAndClose(final KafkaStreamsApplication app) throws InterruptedException {
private static void runAppAndClose(final KafkaStreamsApplication<?> app) throws InterruptedException {
runApp(app);
app.stop();
}

private static void runApp(final KafkaStreamsApplication app) throws InterruptedException {
private static void runApp(final KafkaStreamsApplication<?> app) throws InterruptedException {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
Expand All @@ -93,7 +93,7 @@ void tearDown() throws InterruptedException {

@Test
void shouldClean() throws InterruptedException {
try (final KafkaStreamsApplication app = this.createWordCountApplication()) {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final SendValuesTransactional<String> sendRequest =
SendValuesTransactional.inTransaction(app.getInputTopics().get(0),
List.of("blub", "bla", "blub")).useDefaults();
Expand All @@ -120,7 +120,7 @@ void shouldClean() throws InterruptedException {

@Test
void shouldReset() throws InterruptedException {
try (final KafkaStreamsApplication app = this.createWordCountApplication()) {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final SendValuesTransactional<String> sendRequest =
SendValuesTransactional.inTransaction(app.getInputTopics().get(0),
List.of("blub", "bla", "blub")).useDefaults();
Expand Down Expand Up @@ -179,7 +179,7 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) t
}

private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>> expectedValues,
final String description, final KafkaStreamsApplication app)
final String description, final KafkaStreamsApplication<?> app)
throws InterruptedException {
runAppAndClose(app);

Expand All @@ -189,14 +189,14 @@ private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>
.containsExactlyInAnyOrderElementsOf(expectedValues);
}

private KafkaStreamsApplication createWordCountApplication() {
final KafkaStreamsApplication application = new SimpleKafkaStreamsApplication(WordCount::new);
private KafkaStreamsApplication<?> createWordCountApplication() {
final KafkaStreamsApplication<?> application = new SimpleKafkaStreamsApplication<>(WordCount::new);
application.setOutputTopic("word_output");
application.setInputTopics(List.of("word_input"));
return this.configure(application);
}

private <T extends KafkaStreamsApplication> T configure(final T application) {
private <T extends KafkaStreamsApplication<?>> T configure(final T application) {
application.setBootstrapServers(this.kafkaCluster.getBrokerList());
application.setKafkaConfig(Map.of(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0",
Expand Down

0 comments on commit cc84fd1

Please sign in to comment.