Skip to content

Commit

Permalink
Use Confluent MockSchemaRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 8, 2025
1 parent 9c1d770 commit 4c73923
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 58 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ org.gradle.parallel=false
kafkaVersion=3.8.1
testContainersVersion=1.20.4
confluentVersion=7.8.0
fluentKafkaVersion=2.16.0
fluentKafkaVersion=2.16.1-SNAPSHOT
junitVersion=5.11.4
mockitoVersion=5.15.2
assertJVersion=3.27.2
Expand Down
8 changes: 2 additions & 6 deletions streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ dependencies {
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
testImplementation(testFixtures(project(":streams-bootstrap-core")))
testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2")
val fluentKafkaVersion: String by project
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
val confluentVersion: String by project
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
Expand All @@ -49,16 +48,14 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
class RunProducerAppTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
private static final String SCHEMA_REGISTRY_URL = "mock://";
@Container
private final KafkaContainer kafkaCluster = newKafkaCluster();

Expand Down Expand Up @@ -92,15 +89,14 @@ public SerializerConfig defaultSerializationConfig() {
}
})) {
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl());
app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL);
app.setOutputTopic(output);
app.run();
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
assertThat(kafkaContainerHelper.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.<String, TestRecord>from(output, TIMEOUT))
.hasSize(1)
.anySatisfy(kv -> {
Expand Down
6 changes: 0 additions & 6 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ dependencies {
testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion)
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)

val fluentKafkaVersion: String by project
testImplementation(project(":streams-bootstrap-test"))
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)
val testContainersVersion: String by project
testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,28 @@

package com.bakdata.kafka.integration;

import static java.util.Collections.emptyMap;

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.KafkaEndpointConfig;
import com.bakdata.kafka.TestUtil;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import java.util.List;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
abstract class KafkaTest {
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
static final String SCHEMA_REGISTRY_URL = "mock://";
@Container
private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster();

static SchemaRegistryClient getSchemaRegistryClient() {
return SchemaRegistryClientFactory.newClient(List.of(SCHEMA_REGISTRY_URL), 0, null, emptyMap(), null);
}

KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBootstrapServers())
Expand All @@ -49,7 +55,7 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
KafkaEndpointConfig createEndpoint() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBootstrapServers())
.schemaRegistryUrl(this.schemaRegistryMockExtension.getUrl())
.schemaRegistryUrl(SCHEMA_REGISTRY_URL)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void shouldDeleteTopic() {
void shouldDeleteValueSchema() throws IOException, RestClientException {
try (final ConfiguredProducerApp<ProducerApp> app = createAvroValueApplication();
final ExecutableProducerApp<ProducerApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = getSchemaRegistryClient()) {
run(executableApp);

final String outputTopic = app.getTopics().getOutputTopic();
Expand All @@ -137,7 +137,7 @@ void shouldDeleteValueSchema() throws IOException, RestClientException {
void shouldDeleteKeySchema() throws IOException, RestClientException {
try (final ConfiguredProducerApp<ProducerApp> app = createAvroKeyApplication();
final ExecutableProducerApp<ProducerApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = getSchemaRegistryClient()) {
run(executableApp);

final String outputTopic = app.getTopics().getOutputTopic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ void shouldDeleteInternalTopics() throws InterruptedException {
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -336,8 +335,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException {
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -443,7 +441,7 @@ void shouldDeleteValueSchema()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorValueApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -452,8 +450,7 @@ void shouldDeleteValueSchema()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(null, testRecord)
Expand All @@ -478,7 +475,7 @@ void shouldDeleteKeySchema()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -487,8 +484,7 @@ void shouldDeleteKeySchema()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(testRecord, "val")
Expand All @@ -513,7 +509,7 @@ void shouldDeleteSchemaOfInternalTopics()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = this.createComplexApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -522,8 +518,7 @@ void shouldDeleteSchemaOfInternalTopics()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -556,7 +551,7 @@ void shouldDeleteSchemaOfIntermediateTopics()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = this.createComplexApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -565,8 +560,7 @@ void shouldDeleteSchemaOfIntermediateTopics()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down
Loading

0 comments on commit 4c73923

Please sign in to comment.