diff --git a/gradle.properties b/gradle.properties index bc437f05..a45157a2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,7 +5,7 @@ org.gradle.parallel=false kafkaVersion=3.8.1 testContainersVersion=1.20.4 confluentVersion=7.8.0 -fluentKafkaVersion=2.16.0 +fluentKafkaVersion=3.0.0 junitVersion=5.11.4 mockitoVersion=5.15.2 assertJVersion=3.27.2 diff --git a/streams-bootstrap-cli/build.gradle.kts b/streams-bootstrap-cli/build.gradle.kts index 09391646..7cc8ba99 100644 --- a/streams-bootstrap-cli/build.gradle.kts +++ b/streams-bootstrap-cli/build.gradle.kts @@ -19,12 +19,8 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) testImplementation(testFixtures(project(":streams-bootstrap-core"))) testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2") - val fluentKafkaVersion: String by project - testImplementation( - group = "com.bakdata.fluent-kafka-streams-tests", - name = "schema-registry-mock-junit5", - version = fluentKafkaVersion - ) + val confluentVersion: String by project + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) val log4jVersion: String by project testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) } diff --git a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index d9d2ae56..40bff219 100644 --- a/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-cli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -36,7 +36,6 @@ import com.bakdata.kafka.SimpleKafkaProducerApplication; import com.bakdata.kafka.TestRecord; import com.bakdata.kafka.util.ImprovedAdminClient; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; @@ -49,7 +48,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @@ -57,8 +55,7 @@ @Testcontainers class RunProducerAppTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); + private static final String SCHEMA_REGISTRY_URL = "mock://"; @Container private final KafkaContainer kafkaCluster = newKafkaCluster(); @@ -92,15 +89,14 @@ public SerializerConfig defaultSerializationConfig() { } })) { app.setBootstrapServers(this.kafkaCluster.getBootstrapServers()); - app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()); + app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL); app.setOutputTopic(output); app.run(); final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster); assertThat(kafkaContainerHelper.read() .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL) .from(output, TIMEOUT)) .hasSize(1) .anySatisfy(kv -> { diff --git a/streams-bootstrap-core/build.gradle.kts b/streams-bootstrap-core/build.gradle.kts index c1d46256..c85e0870 100644 --- a/streams-bootstrap-core/build.gradle.kts +++ b/streams-bootstrap-core/build.gradle.kts @@ -31,13 +31,7 @@ dependencies { testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion) testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion) - val fluentKafkaVersion: String by project testImplementation(project(":streams-bootstrap-test")) - testImplementation( - group = "com.bakdata.fluent-kafka-streams-tests", - name = "schema-registry-mock-junit5", - version = fluentKafkaVersion - ) val testContainersVersion: String by project testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion) testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java index 1f3d1c6d..553fde3b 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/AvroMirrorTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -35,7 +35,7 @@ class AvroMirrorTest { private final ConfiguredStreamsApp app = createApp(); @RegisterExtension final TestTopologyExtension testTopology = - TestTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.app); + TestTopologyFactory.withSchemaRegistry().createTopologyExtension(this.app); private static ConfiguredStreamsApp createApp() { final AppConfiguration configuration = new AppConfiguration<>(StreamsTopicConfig.builder() diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java index 52eeb988..f04f2001 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/KafkaTest.java @@ -26,17 +26,16 @@ import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.KafkaEndpointConfig; +import com.bakdata.kafka.TestTopologyFactory; import com.bakdata.kafka.TestUtil; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; -import org.junit.jupiter.api.extension.RegisterExtension; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @Testcontainers abstract class KafkaTest { - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); + private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster(); @@ -49,11 +48,19 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() { KafkaEndpointConfig createEndpoint() { return KafkaEndpointConfig.builder() .bootstrapServers(this.kafkaCluster.getBootstrapServers()) - .schemaRegistryUrl(this.schemaRegistryMockExtension.getUrl()) + .schemaRegistryUrl(this.getSchemaRegistryUrl()) .build(); } KafkaContainerHelper newContainerHelper() { return new KafkaContainerHelper(this.kafkaCluster); } + + String getSchemaRegistryUrl() { + return this.testTopologyFactory.getSchemaRegistryUrl(); + } + + SchemaRegistryClient getSchemaRegistryClient() { + return this.testTopologyFactory.getSchemaRegistryClient(); + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java index 5156ea60..155890b0 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/ProducerCleanUpRunnerTest.java @@ -121,7 +121,7 @@ void shouldDeleteTopic() { void shouldDeleteValueSchema() throws IOException, RestClientException { try (final ConfiguredProducerApp app = createAvroValueApplication(); final ExecutableProducerApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { run(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); @@ -137,7 +137,7 @@ void shouldDeleteValueSchema() throws IOException, RestClientException { void shouldDeleteKeySchema() throws IOException, RestClientException { try (final ConfiguredProducerApp app = createAvroKeyApplication(); final ExecutableProducerApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { run(executableApp); final String outputTopic = app.getTopics().getOutputTopic(); diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java index 2b1162ed..5e8cfcea 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/integration/StreamsCleanUpRunnerTest.java @@ -283,8 +283,7 @@ void shouldDeleteInternalTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -336,8 +335,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException { .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -443,7 +441,7 @@ void shouldDeleteValueSchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorValueApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -452,8 +450,7 @@ void shouldDeleteValueSchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(null, testRecord) @@ -478,7 +475,7 @@ void shouldDeleteKeySchema() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = createMirrorKeyApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -487,8 +484,7 @@ void shouldDeleteKeySchema() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( new KeyValue<>(testRecord, "val") @@ -513,7 +509,7 @@ void shouldDeleteSchemaOfInternalTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -522,8 +518,7 @@ void shouldDeleteSchemaOfInternalTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( @@ -556,7 +551,7 @@ void shouldDeleteSchemaOfIntermediateTopics() throws InterruptedException, IOException, RestClientException { try (final ConfiguredStreamsApp app = this.createComplexApplication(); final ExecutableStreamsApp executableApp = app.withEndpoint(this.createEndpoint()); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) { + final SchemaRegistryClient client = this.getSchemaRegistryClient()) { final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build(); final String inputTopic = app.getTopics().getInputTopics().get(0); final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper(); @@ -565,8 +560,7 @@ void shouldDeleteSchemaOfIntermediateTopics() .createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap()); } kafkaContainerHelper.send() - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) .to(app.getTopics().getInputTopics().get(0), List.of( diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java index 909c85b7..2775ef33 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/util/SchemaTopicClientTest.java @@ -31,7 +31,7 @@ import com.bakdata.kafka.KafkaContainerHelper; import com.bakdata.kafka.TestRecord; -import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension; +import com.bakdata.kafka.TestTopologyFactory; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; @@ -49,7 +49,6 @@ import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.kafka.KafkaContainer; @@ -60,8 +59,7 @@ class SchemaTopicClientTest { private static final Duration TIMEOUT = Duration.ofSeconds(10); private static final String TOPIC = "topic"; - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension(); + private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry(); @Container private final KafkaContainer kafkaCluster = newKafkaCluster(); @@ -81,13 +79,12 @@ void shouldDeleteTopicAndSchemaWhenSchemaRegistryUrlIsSet() kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + final SchemaRegistryClient client = this.getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -116,13 +113,12 @@ void shouldResetSchema() throws InterruptedException, IOException, RestClientExc kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + final SchemaRegistryClient client = this.getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -152,13 +148,12 @@ void shouldDeleteTopicAndKeepSchemaWhenSchemaRegistryUrlIsNotSet() throws Interr kafkaContainerHelper.send() .with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - this.schemaRegistryMockExtension.getUrl()) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()) .to(TOPIC, List.of( new KeyValue<>(null, TestRecord.newBuilder().setContent("foo").build()) )); - final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient(); + final SchemaRegistryClient client = this.getSchemaRegistryClient(); this.softly.assertThat(client.getAllSubjects()) .contains(TOPIC + "-value"); @@ -178,8 +173,7 @@ private SchemaTopicClient createClientWithSchemaRegistry() { final Map kafkaProperties = Map.of( AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaCluster.getBootstrapServers() ); - return SchemaTopicClient.create(kafkaProperties, this.schemaRegistryMockExtension.getUrl(), - TIMEOUT); + return SchemaTopicClient.create(kafkaProperties, this.getSchemaRegistryUrl(), TIMEOUT); } private SchemaTopicClient createClientWithNoSchemaRegistry() { @@ -189,4 +183,12 @@ private SchemaTopicClient createClientWithNoSchemaRegistry() { return SchemaTopicClient.create(kafkaProperties, TIMEOUT); } + private String getSchemaRegistryUrl() { + return this.testTopologyFactory.getSchemaRegistryUrl(); + } + + private SchemaRegistryClient getSchemaRegistryClient() { + return this.testTopologyFactory.getSchemaRegistryClient(); + } + } diff --git a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java index c87e8cd7..2c6680b9 100644 --- a/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java +++ b/streams-bootstrap-test/src/main/java/com/bakdata/kafka/TestTopologyFactory.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,55 +24,99 @@ package com.bakdata.kafka; +import static java.util.Collections.emptyMap; + import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.fluent_kafka_streams_tests.junit5.TestTopologyExtension; -import com.bakdata.kafka.KafkaEndpointConfig.KafkaEndpointConfigBuilder; +import io.confluent.kafka.schemaregistry.SchemaProvider; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import java.util.List; import java.util.Map; -import java.util.function.Function; -import lombok.experimental.UtilityClass; +import java.util.Objects; +import java.util.UUID; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; /** - * Utility class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} + * Class that provides helpers for using Fluent Kafka Streams Tests with {@link ConfiguredStreamsApp} */ -@UtilityClass -public class TestTopologyFactory { +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public final class TestTopologyFactory { + + private static final String MOCK_URL_PREFIX = "mock://"; + private final String schemaRegistryUrl; /** - * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects are {@link KafkaEndpointConfig} - * with configured Schema Registry. - * - * @param app ConfiguredStreamsApp to create TestTopology from - * @param Default type of keys - * @param Default type of values - * @return {@code TestTopology} that uses topology and configuration provided by {@code ConfiguredStreamsApp} - * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) - * @see ConfiguredStreamsApp#createTopology(Map) + * Create a new {@code TestTopologyFactory} with no configured Schema Registry. + * @return {@code TestTopologyFactory} with no configured Schema Registry */ - public static TestTopology createTopologyWithSchemaRegistry( - final ConfiguredStreamsApp app) { - return new TestTopology<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app)); + public static TestTopologyFactory withoutSchemaRegistry() { + return withSchemaRegistry(null); } /** - * Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects are - * {@link KafkaEndpointConfig} with configured Schema Registry. - * - * @param app ConfiguredStreamsApp to create TestTopology from - * @param Default type of keys - * @param Default type of values - * @return {@code TestTopologyExtension} that uses topology and configuration provided by {@code - * ConfiguredStreamsApp} - * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) - * @see ConfiguredStreamsApp#createTopology(Map) + * Create a new {@code TestTopologyFactory} with configured + * {@link io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry}. The scope is random in order to avoid + * collisions between different test instances as scopes are retained globally. + * @return {@code TestTopologyFactory} with configured Schema Registry */ - public static TestTopologyExtension createTopologyExtensionWithSchemaRegistry( - final ConfiguredStreamsApp app) { - return new TestTopologyExtension<>(app::createTopology, getKafkaPropertiesWithSchemaRegistryUrl(app)); + public static TestTopologyFactory withSchemaRegistry() { + return withSchemaRegistry(MOCK_URL_PREFIX + UUID.randomUUID()); + } + + /** + * Create a new {@code TestTopologyFactory} with configured Schema Registry. + * @param schemaRegistryUrl Schema Registry URL to use + * @return {@code TestTopologyFactory} with configured Schema Registry + */ + public static TestTopologyFactory withSchemaRegistry(final String schemaRegistryUrl) { + return new TestTopologyFactory(schemaRegistryUrl); + } + + /** + * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and + * {@link org.apache.kafka.common.serialization.Serializer} using the {@code TestTopology} properties. + * @param testTopology {@code TestTopology} to use properties of + * @return {@code Configurator} + * @see TestTopology#getProperties() + */ + public static Configurator createConfigurator(final TestTopology testTopology) { + return new Configurator(testTopology.getProperties()); } /** - * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects are {@link KafkaEndpointConfig} - * without configured Schema Registry. + * Get Schema Registry URL if configured + * @return Schema Registry URL + * @throws NullPointerException if Schema Registry is not configured + */ + public String getSchemaRegistryUrl() { + return Objects.requireNonNull(this.schemaRegistryUrl, "Schema Registry is not configured"); + } + + /** + * Get {@code SchemaRegistryClient} for configured URL with default providers + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ + public SchemaRegistryClient getSchemaRegistryClient() { + return this.getSchemaRegistryClient(null); + } + + /** + * Get {@code SchemaRegistryClient} for configured URL + * @param providers list of {@code SchemaProvider} to use for {@code SchemaRegistryClient} + * @return {@code SchemaRegistryClient} + * @throws NullPointerException if Schema Registry is not configured + */ + public SchemaRegistryClient getSchemaRegistryClient(final List providers) { + return SchemaRegistryClientFactory.newClient(List.of(this.getSchemaRegistryUrl()), 0, providers, emptyMap(), + null); + } + + /** + * Create a {@code TestTopology} from a {@code ConfiguredStreamsApp}. It injects a {@link KafkaEndpointConfig} + * for test purposes with Schema Registry optionally configured. * * @param app ConfiguredStreamsApp to create TestTopology from * @param Default type of keys @@ -81,13 +125,13 @@ public static TestTopologyExtension createTopologyExtensionWithSche * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) * @see ConfiguredStreamsApp#createTopology(Map) */ - public static TestTopology createTopology(final ConfiguredStreamsApp app) { - return new TestTopology<>(app::createTopology, getKafkaProperties(app)); + public TestTopology createTopology(final ConfiguredStreamsApp app) { + return new TestTopology<>(app::createTopology, this.getKafkaProperties(app)); } /** - * Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects are - * {@link KafkaEndpointConfig} without configured Schema Registry. + * Create a {@code TestTopologyExtension} from a {@code ConfiguredStreamsApp}. It injects a + * {@link KafkaEndpointConfig} for test purposes with Schema Registry optionally configured. * * @param app ConfiguredStreamsApp to create TestTopology from * @param Default type of keys @@ -97,51 +141,24 @@ public static TestTopology createTopology(final ConfiguredStreamsAp * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) * @see ConfiguredStreamsApp#createTopology(Map) */ - public static TestTopologyExtension createTopologyExtension( + public TestTopologyExtension createTopologyExtension( final ConfiguredStreamsApp app) { - return new TestTopologyExtension<>(app::createTopology, getKafkaProperties(app)); + return new TestTopologyExtension<>(app::createTopology, this.getKafkaProperties(app)); } /** - * Get Kafka properties from a {@code ConfiguredStreamsApp} after using a {@link KafkaEndpointConfig} with - * configured Schema Registry. + * Get Kafka properties from a {@code ConfiguredStreamsApp} using a {@link KafkaEndpointConfig} for test purposes + * with Schema Registry optionally configured. * * @param app ConfiguredStreamsApp to get Kafka properties of * @return Kafka properties * @see ConfiguredStreamsApp#getKafkaProperties(KafkaEndpointConfig) */ - public static Function> getKafkaPropertiesWithSchemaRegistryUrl( - final ConfiguredStreamsApp app) { - return schemaRegistryUrl -> getKafkaProperties(app, schemaRegistryUrl); - } - - /** - * Create {@code Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and - * {@link org.apache.kafka.common.serialization.Serializer} using the {@code TestTopology} properties. - * @param testTopology {@code TestTopology} to use properties of - * @return {@code Configurator} - * @see TestTopology#getProperties() - */ - public static Configurator createConfigurator(final TestTopology testTopology) { - return new Configurator(testTopology.getProperties()); - } - - private static Map getKafkaProperties(final ConfiguredStreamsApp app, - final String schemaRegistryUrl) { - final KafkaEndpointConfig endpointConfig = newEndpointConfig() - .schemaRegistryUrl(schemaRegistryUrl) - .build(); - return app.getKafkaProperties(endpointConfig); - } - - private static Map getKafkaProperties(final ConfiguredStreamsApp app) { - final KafkaEndpointConfig endpointConfig = newEndpointConfig() + public Map getKafkaProperties(final ConfiguredStreamsApp app) { + final KafkaEndpointConfig endpointConfig = KafkaEndpointConfig.builder() + .bootstrapServers("localhost:9092") + .schemaRegistryUrl(this.schemaRegistryUrl) .build(); return app.getKafkaProperties(endpointConfig); } - - private static KafkaEndpointConfigBuilder newEndpointConfig() { - return KafkaEndpointConfig.builder() - .bootstrapServers("localhost:9092"); - } }