diff --git a/kobuka-client/pom.xml b/kobuka-client/pom.xml index bd475c9..77de6b4 100644 --- a/kobuka-client/pom.xml +++ b/kobuka-client/pom.xml @@ -3,7 +3,7 @@ kobuka org.swisspush - 1.1.3-SNAPSHOT + 1.3.0-SNAPSHOT 4.0.0 @@ -15,6 +15,11 @@ kafka-clients provided + + org.apache.kafka + kafka-streams + provided + org.junit.jupiter junit-jupiter-engine diff --git a/kobuka-client/src/main/java/org/swisspush/kobuka/client/StreamsConfigBuilder.java b/kobuka-client/src/main/java/org/swisspush/kobuka/client/StreamsConfigBuilder.java new file mode 100644 index 0000000..b1bc79a --- /dev/null +++ b/kobuka-client/src/main/java/org/swisspush/kobuka/client/StreamsConfigBuilder.java @@ -0,0 +1,18 @@ +package org.swisspush.kobuka.client; + +import org.swisspush.kobuka.client.base.BaseCommonClientConfigBuilder; +import org.swisspush.kobuka.client.base.BaseStreamsConfigBuilder; + +public class StreamsConfigBuilder extends BaseStreamsConfigBuilder { + + public StreamsConfigBuilder() { + } + + public StreamsConfigBuilder(BaseStreamsConfigBuilder parent) { + copyFrom(parent); + } + + public StreamsConfigBuilder(BaseCommonClientConfigBuilder parent) { + copyFrom(parent); + } +} diff --git a/kobuka-client/src/main/java/org/swisspush/kobuka/client/base/BaseStreamsConfigBuilder.java b/kobuka-client/src/main/java/org/swisspush/kobuka/client/base/BaseStreamsConfigBuilder.java new file mode 100644 index 0000000..26470fe --- /dev/null +++ b/kobuka-client/src/main/java/org/swisspush/kobuka/client/base/BaseStreamsConfigBuilder.java @@ -0,0 +1,28 @@ +package org.swisspush.kobuka.client.base; + +import java.util.Map; +import java.util.function.Function; + +/** + * Base class for custom builders. Do not use directly. + */ +public class BaseStreamsConfigBuilder & ClientBuilderFunctions> + extends AbstractStreamsConfigBuilder implements ClientBuilderFunctions { + + public void copyFrom(BaseStreamsConfigBuilder parent) { + configs.putAll(parent.configs); + } + + public void copyFrom(BaseCommonClientConfigBuilder parent) { + configs.putAll(parent.configs); + } + + @Override + public Map build() { + return configs; + } + + public R transform(Function, R> fn) { + return fn.apply(this); + } +} diff --git a/kobuka-client/src/test/java/org/swisspush/kobuka/test/client/ConfigBuilderTest.java b/kobuka-client/src/test/java/org/swisspush/kobuka/test/client/ConfigBuilderTest.java index c1b3f3b..d9d766c 100644 --- a/kobuka-client/src/test/java/org/swisspush/kobuka/test/client/ConfigBuilderTest.java +++ b/kobuka-client/src/test/java/org/swisspush/kobuka/test/client/ConfigBuilderTest.java @@ -5,10 +5,12 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.StreamsConfig; import org.junit.jupiter.api.Test; import org.swisspush.kobuka.client.CommonClientConfigBuilder; import org.swisspush.kobuka.client.ConsumerConfigBuilder; import org.swisspush.kobuka.client.ProducerConfigBuilder; +import org.swisspush.kobuka.client.StreamsConfigBuilder; import java.util.Arrays; import java.util.Map; @@ -81,10 +83,18 @@ public void testInheritance() { .valueSerializer(StringDeserializer.class) .build(ProducerConfig::new); + StreamsConfig streamsConfig = + commonConfigBuilder + .transform(StreamsConfigBuilder::new) + .applicationId("hello") + .build(StreamsConfig::new); + assertEquals(Arrays.asList("localhost:9092", "otherhost:9092"), consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); assertEquals(Arrays.asList("localhost:9092", "otherhost:9092"), producerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + assertEquals(Arrays.asList("localhost:9092", "otherhost:9092"), + streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); } /** diff --git a/kobuka-gen/pom.xml b/kobuka-gen/pom.xml index 7a4aade..f6096d7 100644 --- a/kobuka-gen/pom.xml +++ b/kobuka-gen/pom.xml @@ -3,7 +3,7 @@ kobuka org.swisspush - 1.1.3-SNAPSHOT + 1.3.0-SNAPSHOT 4.0.0 @@ -14,6 +14,10 @@ org.apache.kafka kafka-clients + + org.apache.kafka + kafka-streams + com.squareup javapoet diff --git a/kobuka-gen/src/main/java/org/swisspush/kobuka/gen/Generator.java b/kobuka-gen/src/main/java/org/swisspush/kobuka/gen/Generator.java index 7ca71c5..5526461 100644 --- a/kobuka-gen/src/main/java/org/swisspush/kobuka/gen/Generator.java +++ b/kobuka-gen/src/main/java/org/swisspush/kobuka/gen/Generator.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.streams.StreamsConfig; import javax.lang.model.element.Modifier; import java.io.IOException; @@ -37,12 +38,17 @@ public static void main(String[] args) throws IOException { "AdminClientConfig", stream(AdminClientConfig.configDef()), rootDir); + generateBuilder(CLIENT_PACKAGE, + "StreamsConfig", + stream(StreamsConfig.configDef()), + rootDir); // Generate common keys Set commonKeys = new HashSet<>(ConsumerConfig.configDef().configKeys().keySet()); commonKeys.retainAll(ProducerConfig.configDef().configKeys().keySet()); commonKeys.retainAll(AdminClientConfig.configDef().configKeys().keySet()); + commonKeys.retainAll(StreamsConfig.configDef().configKeys().keySet()); Stream> commonConfigMap = AdminClientConfig.configDef().configKeys().entrySet().stream() .filter(entry -> commonKeys.contains(entry.getKey())); diff --git a/kobuka-spring/pom.xml b/kobuka-spring/pom.xml index da454ad..ad90d82 100644 --- a/kobuka-spring/pom.xml +++ b/kobuka-spring/pom.xml @@ -3,7 +3,7 @@ kobuka org.swisspush - 1.1.3-SNAPSHOT + 1.3.0-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index 981c70b..a609fbe 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.swisspush kobuka pom - 1.1.3-SNAPSHOT + 1.3.0-SNAPSHOT kobuka Config Builders for Kafka. A fluent API for configuring kafka clients. https://github.com/swisspush/kobuka @@ -109,6 +109,11 @@ kafka-clients 3.2.3 + + org.apache.kafka + kafka-streams + 3.2.3 + org.springframework.kafka spring-kafka