From 96e2dadec3e4d415a1e2b5a3df7f4b4145044955 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Fri, 19 Apr 2024 10:57:51 +0200 Subject: [PATCH] Create v3 --- .../bakdata/kafka/LargeMessageProducerApp.java | 17 ++++++++++++----- .../bakdata/kafka/LargeMessageStreamsApp.java | 17 ++++++++++++----- .../com/bakdata/kafka/KafkaApplication.java | 3 +-- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java index 48152900..acd2c2a0 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageProducerApp.java @@ -29,17 +29,24 @@ */ public interface LargeMessageProducerApp extends ProducerApp { - static ProducerCleanUpConfiguration registerLargeMessageCleanUpHook(final ProducerCleanUpConfiguration configurer, - final EffectiveAppConfiguration configuration) { - return configurer.registerTopicHook( + /** + * Register a hook that cleans up LargeMessage files associated with a topic + * @param cleanUpConfiguration Configuration to register hook on + * @param configuration Configuration to create hook from + * @return {@code ProducerCleanUpConfiguration} with registered topic hook + * @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration) + */ + static ProducerCleanUpConfiguration registerLargeMessageCleanUpHook( + final ProducerCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration configuration) { + return cleanUpConfiguration.registerTopicHook( LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration)); } @Override default ProducerCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { - final ProducerCleanUpConfiguration configurer = ProducerApp.super.setupCleanUp(configuration); - return registerLargeMessageCleanUpHook(configurer, configuration); + final ProducerCleanUpConfiguration cleanUpConfiguration = ProducerApp.super.setupCleanUp(configuration); + return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration); } } diff --git a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java index 5437012c..585e87c2 100644 --- a/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java +++ b/streams-bootstrap-large-messages/src/main/java/com/bakdata/kafka/LargeMessageStreamsApp.java @@ -29,17 +29,24 @@ */ public interface LargeMessageStreamsApp extends StreamsApp { - static StreamsCleanUpConfiguration registerLargeMessageCleanUpHook(final StreamsCleanUpConfiguration configurer, - final EffectiveAppConfiguration configuration) { - return configurer.registerTopicHook( + /** + * Register a hook that cleans up LargeMessage files associated with a topic + * @param cleanUpConfiguration Configuration to register hook on + * @param configuration Configuration to create hook from + * @return {@code StreamsCleanUpConfiguration} with registered topic hook + * @see LargeMessageKafkaApplicationUtils#createLargeMessageCleanUpHook(EffectiveAppConfiguration) + */ + static StreamsCleanUpConfiguration registerLargeMessageCleanUpHook( + final StreamsCleanUpConfiguration cleanUpConfiguration, final EffectiveAppConfiguration configuration) { + return cleanUpConfiguration.registerTopicHook( LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(configuration)); } @Override default StreamsCleanUpConfiguration setupCleanUp( final EffectiveAppConfiguration configuration) { - final StreamsCleanUpConfiguration configurer = StreamsApp.super.setupCleanUp(configuration); - return registerLargeMessageCleanUpHook(configurer, configuration); + final StreamsCleanUpConfiguration cleanUpConfiguration = StreamsApp.super.setupCleanUp(configuration); + return registerLargeMessageCleanUpHook(cleanUpConfiguration, configuration); } } diff --git a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java index 1b01d5ec..4f75db03 100644 --- a/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java +++ b/streams-bootstrap-picocli/src/main/java/com/bakdata/kafka/KafkaApplication.java @@ -232,8 +232,7 @@ public final CA createConfiguredApp(final boolean cleanUp) { */ public final AppConfiguration createConfiguration() { final T topics = this.createTopicConfig(); - final Map kafkaConfig = this.getKafkaConfig(); - return new AppConfiguration<>(topics, kafkaConfig); + return new AppConfiguration<>(topics, this.kafkaConfig); } /**