Skip to content

Commit

Permalink
Use Confluent MockSchemaRegistry (#269)
Browse files Browse the repository at this point in the history
Fluent Kafka Streams Tests removed Schema Registry support in 3.0
  • Loading branch information
philipp94831 authored Jan 9, 2025
1 parent ab27dd6 commit 51f8614
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 133 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ org.gradle.parallel=false
kafkaVersion=3.8.1
testContainersVersion=1.20.4
confluentVersion=7.8.0
fluentKafkaVersion=2.16.0
fluentKafkaVersion=3.0.0
junitVersion=5.11.4
mockitoVersion=5.15.2
assertJVersion=3.27.2
Expand Down
8 changes: 2 additions & 6 deletions streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@ dependencies {
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
testImplementation(testFixtures(project(":streams-bootstrap-core")))
testImplementation(group = "com.ginsberg", name = "junit5-system-exit", version = "1.1.2")
val fluentKafkaVersion: String by project
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)
val confluentVersion: String by project
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.bakdata.kafka.SimpleKafkaProducerApplication;
import com.bakdata.kafka.TestRecord;
import com.bakdata.kafka.util.ImprovedAdminClient;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
Expand All @@ -49,16 +48,14 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
class RunProducerAppTest {
private static final Duration TIMEOUT = Duration.ofSeconds(10);
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
private static final String SCHEMA_REGISTRY_URL = "mock://";
@Container
private final KafkaContainer kafkaCluster = newKafkaCluster();

Expand Down Expand Up @@ -92,15 +89,14 @@ public SerializerConfig defaultSerializationConfig() {
}
})) {
app.setBootstrapServers(this.kafkaCluster.getBootstrapServers());
app.setSchemaRegistryUrl(this.schemaRegistryMockExtension.getUrl());
app.setSchemaRegistryUrl(SCHEMA_REGISTRY_URL);
app.setOutputTopic(output);
app.run();
final KafkaContainerHelper kafkaContainerHelper = new KafkaContainerHelper(this.kafkaCluster);
assertThat(kafkaContainerHelper.read()
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL)
.<String, TestRecord>from(output, TIMEOUT))
.hasSize(1)
.anySatisfy(kv -> {
Expand Down
6 changes: 0 additions & 6 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,7 @@ dependencies {
testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion)
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)

val fluentKafkaVersion: String by project
testImplementation(project(":streams-bootstrap-test"))
testImplementation(
group = "com.bakdata.fluent-kafka-streams-tests",
name = "schema-registry-mock-junit5",
version = fluentKafkaVersion
)
val testContainersVersion: String by project
testFixturesApi(group = "org.testcontainers", name = "junit-jupiter", version = testContainersVersion)
testFixturesApi(group = "org.testcontainers", name = "kafka", version = testContainersVersion)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -35,7 +35,7 @@ class AvroMirrorTest {
private final ConfiguredStreamsApp<MirrorWithNonDefaultSerde> app = createApp();
@RegisterExtension
final TestTopologyExtension<TestRecord, TestRecord> testTopology =
TestTopologyFactory.createTopologyExtensionWithSchemaRegistry(this.app);
TestTopologyFactory.withSchemaRegistry().createTopologyExtension(this.app);

private static ConfiguredStreamsApp<MirrorWithNonDefaultSerde> createApp() {
final AppConfiguration<StreamsTopicConfig> configuration = new AppConfiguration<>(StreamsTopicConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@

import com.bakdata.kafka.KafkaContainerHelper;
import com.bakdata.kafka.KafkaEndpointConfig;
import com.bakdata.kafka.TestTopologyFactory;
import com.bakdata.kafka.TestUtil;
import com.bakdata.schemaregistrymock.junit5.SchemaRegistryMockExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.KafkaContainer;

@Testcontainers
abstract class KafkaTest {
@RegisterExtension
final SchemaRegistryMockExtension schemaRegistryMockExtension = new SchemaRegistryMockExtension();
private final TestTopologyFactory testTopologyFactory = TestTopologyFactory.withSchemaRegistry();
@Container
private final KafkaContainer kafkaCluster = TestUtil.newKafkaCluster();

Expand All @@ -49,11 +48,19 @@ KafkaEndpointConfig createEndpointWithoutSchemaRegistry() {
KafkaEndpointConfig createEndpoint() {
return KafkaEndpointConfig.builder()
.bootstrapServers(this.kafkaCluster.getBootstrapServers())
.schemaRegistryUrl(this.schemaRegistryMockExtension.getUrl())
.schemaRegistryUrl(this.getSchemaRegistryUrl())
.build();
}

KafkaContainerHelper newContainerHelper() {
return new KafkaContainerHelper(this.kafkaCluster);
}

String getSchemaRegistryUrl() {
return this.testTopologyFactory.getSchemaRegistryUrl();
}

SchemaRegistryClient getSchemaRegistryClient() {
return this.testTopologyFactory.getSchemaRegistryClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void shouldDeleteTopic() {
void shouldDeleteValueSchema() throws IOException, RestClientException {
try (final ConfiguredProducerApp<ProducerApp> app = createAvroValueApplication();
final ExecutableProducerApp<ProducerApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
run(executableApp);

final String outputTopic = app.getTopics().getOutputTopic();
Expand All @@ -137,7 +137,7 @@ void shouldDeleteValueSchema() throws IOException, RestClientException {
void shouldDeleteKeySchema() throws IOException, RestClientException {
try (final ConfiguredProducerApp<ProducerApp> app = createAvroKeyApplication();
final ExecutableProducerApp<ProducerApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
run(executableApp);

final String outputTopic = app.getTopics().getOutputTopic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ void shouldDeleteInternalTopics() throws InterruptedException {
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -336,8 +335,7 @@ void shouldDeleteIntermediateTopics() throws InterruptedException {
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -443,7 +441,7 @@ void shouldDeleteValueSchema()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorValueApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -452,8 +450,7 @@ void shouldDeleteValueSchema()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(null, testRecord)
Expand All @@ -478,7 +475,7 @@ void shouldDeleteKeySchema()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = createMirrorKeyApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -487,8 +484,7 @@ void shouldDeleteKeySchema()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
new KeyValue<>(testRecord, "val")
Expand All @@ -513,7 +509,7 @@ void shouldDeleteSchemaOfInternalTopics()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = this.createComplexApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -522,8 +518,7 @@ void shouldDeleteSchemaOfInternalTopics()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down Expand Up @@ -556,7 +551,7 @@ void shouldDeleteSchemaOfIntermediateTopics()
throws InterruptedException, IOException, RestClientException {
try (final ConfiguredStreamsApp<StreamsApp> app = this.createComplexApplication();
final ExecutableStreamsApp<StreamsApp> executableApp = app.withEndpoint(this.createEndpoint());
final SchemaRegistryClient client = this.schemaRegistryMockExtension.getSchemaRegistryClient()) {
final SchemaRegistryClient client = this.getSchemaRegistryClient()) {
final TestRecord testRecord = TestRecord.newBuilder().setContent("key 1").build();
final String inputTopic = app.getTopics().getInputTopics().get(0);
final KafkaContainerHelper kafkaContainerHelper = this.newContainerHelper();
Expand All @@ -565,8 +560,7 @@ void shouldDeleteSchemaOfIntermediateTopics()
.createTopic(app.getTopics().getOutputTopic(), DEFAULT_TOPIC_SETTINGS, emptyMap());
}
kafkaContainerHelper.send()
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
this.schemaRegistryMockExtension.getUrl())
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl())
.with(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
.with(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.to(app.getTopics().getInputTopics().get(0), List.of(
Expand Down
Loading

0 comments on commit 51f8614

Please sign in to comment.