Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add modules for easy usage with Fluent Kafka Streams Tests and Large Message SerDe #188

Merged
merged 17 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
61d8a41
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
81b6189
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
7845e0c
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
f7a326f
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
b6a3154
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
37fd042
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
c84a31a
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
ce21ee1
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
c61c179
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
522a584
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
9dd46ee
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
c9a0553
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
f22dce5
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
50fbb95
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Sep 22, 2023
67258dd
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Oct 27, 2023
a896af3
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Oct 27, 2023
b870555
Add module for easy usage with Fluent Kafka Streams Tests
philipp94831 Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 5 additions & 55 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
description = "Collection of commonly used modules when writing a Kafka Streams Application"


plugins {
`java-library`
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.7"
id("com.bakdata.sonatype") version "1.1.7"
id("org.hildan.github.changelog") version "1.12.1"
id("com.github.davidmc24.gradle.plugin.avro") version "1.5.0"
id("io.freefair.lombok") version "6.6.1"
}

Expand All @@ -16,6 +11,7 @@ allprojects {

tasks.withType<Test> {
maxParallelForks = 1 // Embedded Kafka does not reliably work in parallel since Kafka 3.0
useJUnitPlatform()
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
}

repositories {
Expand Down Expand Up @@ -71,65 +67,19 @@ configure<com.bakdata.gradle.SonatypeSettings> {

configure<org.hildan.github.changelog.plugin.GitHubChangelogExtension> {
githubUser = "bakdata"
githubRepository = "streams-bootstrap"
futureVersionTag = findProperty("changelog.releaseVersion")?.toString()
sinceTag = findProperty("changelog.sinceTag")?.toString()
}

allprojects {
subprojects {
apply(plugin = "java-library")
apply(plugin = "io.freefair.lombok")

configure<JavaPluginExtension> {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}

dependencies {
val kafkaVersion: String by project
implementation(group = "org.apache.kafka", name = "kafka_2.13", version = kafkaVersion)

implementation(group = "info.picocli", name = "picocli", version = "4.7.0")
api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
api(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion)
val confluentVersion: String by project
implementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion)
val log4jVersion = "2.19.0"
implementation(group = "org.apache.logging.log4j", name = "log4j-core", version = log4jVersion)
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = log4jVersion)
implementation(group = "com.google.guava", name = "guava", version = "31.1-jre")
implementation(group = "org.jooq", name = "jool", version = "0.9.14")

val junitVersion = "5.9.1"
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion)
testImplementation(group = "org.junit-pioneer", name = "junit-pioneer", version = "1.9.1")
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.assertj", name = "assertj-core", version = "3.23.1")
val mockitoVersion = "4.11.0"
testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion)
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)

val fluentKafkaVersion = "2.9.0"
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
version = fluentKafkaVersion
)
testImplementation(group = "org.apache.kafka", name = "kafka-streams-test-utils", version = kafkaVersion)
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)
testImplementation(group = "net.mguenther.kafka", name = "kafka-junit", version = "3.4.0") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}

testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2")
}
}

tasks.withType<Test> {
useJUnitPlatform()
}

release {
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ org.gradle.caching=true
org.gradle.parallel=true
kafkaVersion=3.4.0
confluentVersion=7.4.0
fluentKafkaVersion=2.10.0
org.gradle.jvmargs=-Xmx2048m
6 changes: 6 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ pluginManagement {
}

rootProject.name = 'streams-bootstrap'

include(
":streams-bootstrap",
":streams-bootstrap-test",
":streams-bootstrap-large-messages",
)
6 changes: 6 additions & 0 deletions streams-bootstrap-large-messages/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
description = "Utils for using Large Message SerDe with your Kafka Streams Application"

dependencies {
api(project(":streams-bootstrap"))
implementation(group = "com.bakdata.kafka", name = "large-message-core", version = "2.5.1")
torbsto marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# This file is generated by the 'io.freefair.lombok' Gradle plugin
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
lombok.extern.findbugs.addSuppressFBWarnings = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.Consumer;
import lombok.experimental.UtilityClass;

/**
* Utility class that provides helpers for using {@code LargeMessageSerde} with {@link KafkaApplication}
*/
@UtilityClass
public class LargeMessageKafkaApplicationUtils {
/**
* Create a hook that cleans up LargeMessage files associated with a topic. It is expected that all necessary
* properties to create a {@link AbstractLargeMessageConfig} are part of
* {@link KafkaApplication#getKafkaProperties()}.
*
* @param app {@code KafkaApplication} to create hook from
* @return hook that cleans up LargeMessage files associated with a topic
* @see CleanUpRunner#registerTopicCleanUpHook(Consumer)
*/
public static Consumer<String> createLargeMessageCleanUpHook(final KafkaApplication app) {
final AbstractLargeMessageConfig largeMessageConfig = new AbstractLargeMessageConfig(app.getKafkaProperties());
final LargeMessageStoringClient storer = largeMessageConfig.getStorer();
return storer::deleteAllFiles;
}

/**
* Register a hook that cleans up LargeMessage files associated with a topic.
*
* @param app {@code KafkaApplication} to create hook from
* @param cleanUpRunner {@code CleanUpRunner} to register hook on
* @see #createLargeMessageCleanUpHook(KafkaApplication)
*/
public static void registerLargeMessageCleanUpHook(final KafkaApplication app, final CleanUpRunner cleanUpRunner) {
final Consumer<String> deleteAllFiles = createLargeMessageCleanUpHook(app);
cleanUpRunner.registerTopicCleanUpHook(deleteAllFiles);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.Consumer;

/**
* Kafka Producer Application that automatically removes files associated with {@code LargeMessageSerializer}
*/
public abstract class LargeMessageKafkaProducerApplication extends KafkaProducerApplication {

@Override
protected Consumer<String> createTopicCleanUpHook() {
return LargeMessageKafkaApplicationUtils.createLargeMessageCleanUpHook(this);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

/**
* Kafka Streams Application that automatically removes files associated with {@code LargeMessageSerde}
*/
public abstract class LargeMessageKafkaStreamsApplication extends KafkaStreamsApplication {

@Override
protected void cleanUpRun(final CleanUpRunner cleanUpRunner) {
LargeMessageKafkaApplicationUtils.registerLargeMessageCleanUpHook(this, cleanUpRunner);
super.cleanUpRun(cleanUpRunner);
}

}
11 changes: 11 additions & 0 deletions streams-bootstrap-test/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
description = "Utils for testing your Kafka Streams Application"

dependencies {
api(project(":streams-bootstrap"))
val fluentKafkaVersion: String by project
api(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "fluent-kafka-streams-tests-junit5",
version = fluentKafkaVersion
)
}
3 changes: 3 additions & 0 deletions streams-bootstrap-test/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This file is generated by the 'io.freefair.lombok' Gradle plugin
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* MIT License
*
* Copyright (c) 2023 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import com.bakdata.fluent_kafka_streams_tests.TestTopology;
import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension;
import java.util.Map;
import java.util.function.Function;
import lombok.experimental.UtilityClass;

/**
* Utility class that provides helpers for using Fluent Kafka Streams Tests with {@link KafkaStreamsApplication}
*/
@UtilityClass
public class StreamsBootstrapTopologyFactory {

/**
* Create a {@code TestTopology} from a {@code KafkaStreamsApplication}. This also sets
* {@link KafkaStreamsApplication#schemaRegistryUrl}.
*
* @param app KafkaStreamsApplication to create TestTopology from
* @param <K> Default type of keys
* @param <V> Default type of values
* @return {@code TestTopology} that uses topology and configuration provided by {@code KafkaStreamsApplication}
* @see KafkaStreamsApplication#getKafkaProperties()
* @see KafkaStreamsApplication#createTopology()
*/
public static <K, V> TestTopology<K, V> createTopologyWithSchemaRegistry(final KafkaStreamsApplication app) {
return new TestTopology<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app));
}

/**
* Create a {@code TestTopologyExtension} from a {@code KafkaStreamsApplication}. This also sets
* {@link KafkaStreamsApplication#schemaRegistryUrl}.
*
* @param app KafkaStreamsApplication to create TestTopology from
* @param <K> Default type of keys
* @param <V> Default type of values
* @return {@code TestTopologyExtension} that uses topology and configuration provided by
* {@code KafkaStreamsApplication}
* @see KafkaStreamsApplication#getKafkaProperties()
* @see KafkaStreamsApplication#createTopology()
*/
public static <K, V> TestTopologyExtension<K, V> createTopologyExtensionWithSchemaRegistry(
final KafkaStreamsApplication app) {
return new TestTopologyExtension<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app));
}

/**
* Create a {@code TestTopology} from a {@code KafkaStreamsApplication}. This does not set
* {@link KafkaStreamsApplication#schemaRegistryUrl}.
*
* @param app KafkaStreamsApplication to create TestTopology from
* @param <K> Default type of keys
* @param <V> Default type of values
* @return {@code TestTopology} that uses topology and configuration provided by {@code KafkaStreamsApplication}
* @see KafkaStreamsApplication#getKafkaProperties()
* @see KafkaStreamsApplication#createTopology()
*/
public static <K, V> TestTopology<K, V> createTopology(final KafkaStreamsApplication app) {
return new TestTopology<>(app::createTopology, app.getKafkaProperties());
}

/**
* Create a {@code TestTopologyExtension} from a {@code KafkaStreamsApplication}. This does not set
* {@link KafkaStreamsApplication#schemaRegistryUrl}.
*
* @param app KafkaStreamsApplication to create TestTopology from
* @param <K> Default type of keys
* @param <V> Default type of values
* @return {@code TestTopologyExtension} that uses topology and configuration provided by
* {@code KafkaStreamsApplication}
* @see KafkaStreamsApplication#getKafkaProperties()
* @see KafkaStreamsApplication#createTopology()
*/
public static <K, V> TestTopologyExtension<K, V> createTopologyExtension(final KafkaStreamsApplication app) {
return new TestTopologyExtension<>(app::createTopology, app.getKafkaProperties());
}

/**
* Get Kafka properties from a {@code KafkaStreamsApplication} after configuring
* {@link KafkaStreamsApplication#schemaRegistryUrl}.
*
* @param app KafkaStreamsApplication to get Kafka properties of
* @return Kafka properties
*/
public static Function<String, Map<Object, Object>> getKafkaPropertiesWithSchemaRegistryUrl(
final KafkaApplication app) {
return schemaRegistryUrl -> {
app.setSchemaRegistryUrl(schemaRegistryUrl);
return app.getKafkaProperties();
};
}

}
Loading