Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retain type of Streams/ProducerApp in CLI module #261

Merged
merged 1 commit into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class MyStreamsApplication extends KafkaStreamsApplication {
public class MyStreamsApplication extends KafkaStreamsApplication<StreamsApp> {
public static void main(final String[] args) {
startApplication(new MyStreamsApplication(), args);
}
Expand Down Expand Up @@ -157,7 +157,7 @@ import java.util.Map;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;

public class MyProducerApplication extends KafkaProducerApplication {
public class MyProducerApplication extends KafkaProducerApplication<ProducerApp> {
public static void main(final String[] args) {
startApplication(new MyProducerApplication(), args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,19 @@
* <p>The base class for creating Kafka Producer applications.</p>
* This class provides all configuration options provided by {@link KafkaApplication}.
* To implement your Kafka Producer application inherit from this class and add your custom options. Run it by
* calling {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
* calling {@link #startApplication(KafkaApplication, String[])} with an instance of your class from your main.
*
* @param <T> type of {@link ProducerApp} created by this application
*/
@ToString(callSuper = true)
@Getter
@Setter
@RequiredArgsConstructor
@Slf4j
@Command(description = "Run a Kafka Producer application")
public abstract class KafkaProducerApplication extends
KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions,
ExecutableProducerApp<ProducerApp>, ConfiguredProducerApp<ProducerApp>, ProducerTopicConfig,
ProducerApp> {
public abstract class KafkaProducerApplication<T extends ProducerApp> extends
KafkaApplication<ProducerRunner, ProducerCleanUpRunner, ProducerExecutionOptions, ExecutableProducerApp<T>,
ConfiguredProducerApp<T>, ProducerTopicConfig, T> {

/**
* Delete all output topics associated with the Kafka Producer application.
Expand All @@ -73,7 +74,7 @@ public final ProducerTopicConfig createTopicConfig() {
}

@Override
public final ConfiguredProducerApp<ProducerApp> createConfiguredApp(final ProducerApp app,
public final ConfiguredProducerApp<T> createConfiguredApp(final T app,
final AppConfiguration<ProducerTopicConfig> configuration) {
return new ConfiguredProducerApp<>(app, configuration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,19 @@
* <li>{@link #volatileGroupInstanceId}</li>
* </ul>
* To implement your Kafka Streams application inherit from this class and add your custom options. Run it by calling
* {@link #startApplication(KafkaApplication, String[])} with a instance of your class from your main.
* {@link #startApplication(KafkaApplication, String[])} with an instance of your class from your main.
*
* @param <T> type of {@link StreamsApp} created by this application
*/
@ToString(callSuper = true)
@Getter
@Setter
@RequiredArgsConstructor
@Slf4j
@Command(description = "Run a Kafka Streams application.")
public abstract class KafkaStreamsApplication extends
public abstract class KafkaStreamsApplication<T extends StreamsApp> extends
KafkaApplication<StreamsRunner, StreamsCleanUpRunner, StreamsExecutionOptions,
ExecutableStreamsApp<StreamsApp>, ConfiguredStreamsApp<StreamsApp>, StreamsTopicConfig, StreamsApp> {
ExecutableStreamsApp<T>, ConfiguredStreamsApp<T>, StreamsTopicConfig, T> {
@CommandLine.Option(names = "--input-topics", description = "Input topics", split = ",")
private List<String> inputTopics = emptyList();
@CommandLine.Option(names = "--input-pattern", description = "Input pattern")
Expand Down Expand Up @@ -138,9 +140,9 @@ public final StreamsTopicConfig createTopicConfig() {
}

@Override
public final ConfiguredStreamsApp<StreamsApp> createConfiguredApp(final StreamsApp app,
public final ConfiguredStreamsApp<T> createConfiguredApp(final T app,
final AppConfiguration<StreamsTopicConfig> configuration) {
final ConfiguredStreamsApp<StreamsApp> configuredApp = new ConfiguredStreamsApp<>(app, configuration);
final ConfiguredStreamsApp<T> configuredApp = new ConfiguredStreamsApp<>(app, configuration);
if (this.applicationId != null && !configuredApp.getUniqueAppId().equals(this.applicationId)) {
throw new IllegalArgumentException(
"Application ID provided via --application-id does not match StreamsApp#getUniqueAppId()");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@

/**
* {@code KafkaProducerApplication} without any additional configuration options.
*
* @param <T> type of {@link ProducerApp} created by this application
*/
@RequiredArgsConstructor
public final class SimpleKafkaProducerApplication extends KafkaProducerApplication {
private final @NonNull Supplier<ProducerApp> appFactory;
public final class SimpleKafkaProducerApplication<T extends ProducerApp> extends KafkaProducerApplication<T> {
private final @NonNull Supplier<T> appFactory;

@Override
public ProducerApp createApp() {
public T createApp() {
return this.appFactory.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@

/**
* {@code KafkaStreamsApplication} without any additional configuration options.
*
* @param <T> type of {@link StreamsApp} created by this application
*/
@RequiredArgsConstructor
public final class SimpleKafkaStreamsApplication extends KafkaStreamsApplication {
private final @NonNull Supplier<StreamsApp> appFactory;
public final class SimpleKafkaStreamsApplication<T extends StreamsApp> extends KafkaStreamsApplication<T> {
private final @NonNull Supplier<T> appFactory;

@Override
public StreamsApp createApp() {
public T createApp() {
return this.appFactory.get();
}
}
20 changes: 10 additions & 10 deletions streams-bootstrap-cli/src/test/java/com/bakdata/kafka/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@

class CliTest {

private static void runApp(final KafkaStreamsApplication app, final String... args) {
private static void runApp(final KafkaStreamsApplication<?> app, final String... args) {
new Thread(() -> KafkaApplication.startApplication(app, args)).start();
}

@Test
@ExpectSystemExitWithStatus(0)
void shouldExitWithSuccessCode() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -86,7 +86,7 @@ public void run() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorCodeOnRunError() {
KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
KafkaApplication.startApplication(new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
throw new UnsupportedOperationException();
Expand All @@ -111,7 +111,7 @@ public SerdeConfig defaultSerializationConfig() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorCodeOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -147,7 +147,7 @@ public void clean() {
@Test
@ExpectSystemExitWithStatus(2)
void shouldExitWithErrorCodeOnMissingBootstrapServersParameter() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -181,7 +181,7 @@ public void run() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorCodeOnInconsistentAppId() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -215,7 +215,7 @@ public SerdeConfig defaultSerializationConfig() {
void shouldExitWithErrorInTopology() throws InterruptedException {
final String input = "input";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
Expand Down Expand Up @@ -252,7 +252,7 @@ void shouldExitWithSuccessCodeOnShutdown() throws InterruptedException {
final String input = "input";
final String output = "output";
try (final EmbeddedKafkaCluster kafkaCluster = newKafkaCluster();
final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(() -> new StreamsApp() {
final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(() -> new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
Expand Down Expand Up @@ -293,7 +293,7 @@ public SerdeConfig defaultSerializationConfig() {
@Test
@ExpectSystemExitWithStatus(1)
void shouldExitWithErrorOnCleanupError() {
KafkaApplication.startApplication(new KafkaStreamsApplication() {
KafkaApplication.startApplication(new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down Expand Up @@ -323,7 +323,7 @@ public SerdeConfig defaultSerializationConfig() {

@Test
void shouldParseArguments() {
try (final KafkaStreamsApplication app = new KafkaStreamsApplication() {
try (final KafkaStreamsApplication<?> app = new KafkaStreamsApplication<>() {
@Override
public StreamsApp createApp() {
return new StreamsApp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@NoArgsConstructor
@Getter
@Setter
public class CloseFlagApp extends KafkaStreamsApplication {
public class CloseFlagApp extends KafkaStreamsApplication<StreamsApp> {

private boolean closed = false;
private boolean appClosed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void tearDown() {
void shouldRunApp() throws InterruptedException {
final String output = "output";
this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults());
try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication(() -> new ProducerApp() {
try (final KafkaProducerApplication<?> app = new SimpleKafkaProducerApplication<>(() -> new ProducerApp() {
@Override
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void shouldRunApp() throws InterruptedException {
final String output = "output";
this.kafkaCluster.createTopic(TopicConfig.withName(input).useDefaults());
this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults());
try (final KafkaStreamsApplication app = new SimpleKafkaStreamsApplication(Mirror::new)) {
try (final KafkaStreamsApplication<?> app = new SimpleKafkaStreamsApplication<>(Mirror::new)) {
app.setBootstrapServers(this.kafkaCluster.getBrokerList());
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ class StreamsCleanUpTest {
@InjectSoftAssertions
private SoftAssertions softly;

private static void runAppAndClose(final KafkaStreamsApplication app) throws InterruptedException {
private static void runAppAndClose(final KafkaStreamsApplication<?> app) throws InterruptedException {
runApp(app);
app.stop();
}

private static void runApp(final KafkaStreamsApplication app) throws InterruptedException {
private static void runApp(final KafkaStreamsApplication<?> app) throws InterruptedException {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
Expand All @@ -93,7 +93,7 @@ void tearDown() throws InterruptedException {

@Test
void shouldClean() throws InterruptedException {
try (final KafkaStreamsApplication app = this.createWordCountApplication()) {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final SendValuesTransactional<String> sendRequest =
SendValuesTransactional.inTransaction(app.getInputTopics().get(0),
List.of("blub", "bla", "blub")).useDefaults();
Expand All @@ -120,7 +120,7 @@ void shouldClean() throws InterruptedException {

@Test
void shouldReset() throws InterruptedException {
try (final KafkaStreamsApplication app = this.createWordCountApplication()) {
try (final KafkaStreamsApplication<?> app = this.createWordCountApplication()) {
final SendValuesTransactional<String> sendRequest =
SendValuesTransactional.inTransaction(app.getInputTopics().get(0),
List.of("blub", "bla", "blub")).useDefaults();
Expand Down Expand Up @@ -179,7 +179,7 @@ private List<KeyValue<String, Long>> readOutputTopic(final String outputTopic) t
}

private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>> expectedValues,
final String description, final KafkaStreamsApplication app)
final String description, final KafkaStreamsApplication<?> app)
throws InterruptedException {
runAppAndClose(app);

Expand All @@ -189,14 +189,14 @@ private void runAndAssertContent(final Iterable<? extends KeyValue<String, Long>
.containsExactlyInAnyOrderElementsOf(expectedValues);
}

private KafkaStreamsApplication createWordCountApplication() {
final KafkaStreamsApplication application = new SimpleKafkaStreamsApplication(WordCount::new);
private KafkaStreamsApplication<?> createWordCountApplication() {
final KafkaStreamsApplication<?> application = new SimpleKafkaStreamsApplication<>(WordCount::new);
application.setOutputTopic("word_output");
application.setInputTopics(List.of("word_input"));
return this.configure(application);
}

private <T extends KafkaStreamsApplication> T configure(final T application) {
private <T extends KafkaStreamsApplication<?>> T configure(final T application) {
application.setBootstrapServers(this.kafkaCluster.getBrokerList());
application.setKafkaConfig(Map.of(
StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0",
Expand Down