Skip to content

Commit

Permalink
Use MockSchemaRegistry and remove custom mock
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jan 8, 2025
1 parent 510fa6e commit cde20aa
Show file tree
Hide file tree
Showing 38 changed files with 54 additions and 2,308 deletions.
1 change: 0 additions & 1 deletion fluent-kafka-streams-tests-junit4/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ description = "Provides the fluent Kafka Streams test framework."

dependencies {
api(project(":fluent-kafka-streams-tests"))
api(project(":schema-registry-mock"))

val junit4Version: String by project
api(group = "junit", name = "junit", version = junit4Version)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -151,8 +151,8 @@ public <K, V> TestTopologyRule<K, V> withDefaultSerde(final Serde<K> defaultKeyS
}

@Override
public TestTopologyRule<DefaultK, DefaultV> withSchemaRegistryMock(final SchemaRegistryMock schemaRegistryMock) {
return (TestTopologyRule<DefaultK, DefaultV>) super.withSchemaRegistryMock(schemaRegistryMock);
public TestTopologyRule<DefaultK, DefaultV> withSchemaRegistryUrl(final SchemaRegistryMock schemaRegistryMock) {
return (TestTopologyRule<DefaultK, DefaultV>) super.withSchemaRegistryUrl(schemaRegistryMock);
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -39,7 +39,7 @@ public class WordCountWitherTest {
public final TestTopologyRule<Object, String> testTopology =
new TestTopologyRule<>(this.app.getTopology(), WordCount.getKafkaProperties())
.withDefaultValueSerde(Serdes.String())
.withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));
.withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));

@Test
public void shouldAggregateSameWordStream() {
Expand Down
1 change: 0 additions & 1 deletion fluent-kafka-streams-tests-junit5/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ description = "Provides the fluent Kafka Streams test framework."

dependencies {
api(project(":fluent-kafka-streams-tests"))
api(project(":schema-registry-mock"))

val junit5Version: String by project
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -147,8 +147,8 @@ public <K, V> TestTopologyExtension<K, V> withDefaultSerde(final Serde<K> defaul
}

@Override
public TestTopologyExtension<DefaultK, DefaultV> withSchemaRegistryMock(
public TestTopologyExtension<DefaultK, DefaultV> withSchemaRegistryUrl(
final SchemaRegistryMock schemaRegistryMock) {
return (TestTopologyExtension<DefaultK, DefaultV>) super.withSchemaRegistryMock(schemaRegistryMock);
return (TestTopologyExtension<DefaultK, DefaultV>) super.withSchemaRegistryUrl(schemaRegistryMock);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -39,7 +39,7 @@ class WordCountWitherTest {
final TestTopologyExtension<Object, String> testTopology =
new TestTopologyExtension<>(this.app::getTopology, WordCount.getKafkaProperties())
.withDefaultValueSerde(Serdes.String())
.withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));
.withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));

@Test
void shouldAggregateSameWordStream() {
Expand Down
6 changes: 4 additions & 2 deletions fluent-kafka-streams-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ dependencies {
"api"(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion)
"api"(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
"api"(group = "org.apache.kafka", name = "kafka-streams-test-utils", version = kafkaVersion)
api(project(":schema-registry-mock"))
val confluentVersion: String by project
api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion)
implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion)

val junit5Version: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junit5Version)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version)
testImplementation(group = "org.apache.avro", name = "avro", version = "1.12.0")
val confluentVersion: String by project
testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-protobuf-provider", version = confluentVersion)
testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion)
testImplementation(group = "com.google.protobuf", name = "protobuf-java", version = "3.25.5")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -24,8 +24,12 @@

package com.bakdata.fluent_kafka_streams_tests;

import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import static java.util.Collections.emptyMap;

import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.File;
import java.io.IOException;
Expand All @@ -36,6 +40,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
Expand Down Expand Up @@ -102,7 +107,7 @@
*/
@Getter
public class TestTopology<DefaultK, DefaultV> {
private final SchemaRegistryMock schemaRegistry;
private final String schemaRegistryUrl;
private final Function<? super Map<String, Object>, ? extends Topology> topologyFactory;
private final Map<String, Object> properties = new HashMap<>();
private final Collection<String> inputTopics = new HashSet<>();
Expand All @@ -121,8 +126,9 @@ public class TestTopology<DefaultK, DefaultV> {
protected TestTopology(final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory,
final Serde<DefaultK> defaultKeySerde,
final Serde<DefaultV> defaultValueSerde, final SchemaRegistryMock schemaRegistry) {
this.schemaRegistry = schemaRegistry;
final Serde<DefaultV> defaultValueSerde, final String schemaRegistryUrl) {
MockSchemaRegistry.validateAndMaybeGetMockScope(List.of(schemaRegistryUrl));
this.schemaRegistryUrl = schemaRegistryUrl;
this.topologyFactory = topologyFactory;
this.propertiesFactory = propertiesFactory;
this.defaultKeySerde = defaultKeySerde;
Expand All @@ -140,7 +146,7 @@ protected TestTopology(final Function<? super Map<String, Object>, ? extends Top
*/
public TestTopology(final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
this(topologyFactory, propertiesFactory, null, null, new SchemaRegistryMock());
this(topologyFactory, propertiesFactory, null, null, "mock://");
}

/**
Expand Down Expand Up @@ -262,29 +268,27 @@ public <K> TestTopology<K, DefaultV> withDefaultKeySerde(final Serde<K> defaultK
public <K, V> TestTopology<K, V> withDefaultSerde(final Serde<K> defaultKeySerde,
final Serde<V> defaultValueSerde) {
return this.with(this.topologyFactory, this.propertiesFactory, defaultKeySerde, defaultValueSerde,
this.schemaRegistry);
this.schemaRegistryUrl);
}

/**
* Overrides the {@link SchemaRegistryMock}
* Overrides the schema registry url
*
* @param schemaRegistryMock {@code SchemaRegistryMock} to use
* @return Copy of current {@code TestTopology} with provided {@code SchemaRegistryMock}
* @param schemaRegistryUrl schema registry url to use
* @return Copy of current {@code TestTopology} with provided schema registry url
*/
public TestTopology<DefaultK, DefaultV> withSchemaRegistryMock(final SchemaRegistryMock schemaRegistryMock) {
public TestTopology<DefaultK, DefaultV> withSchemaRegistryUrl(final String schemaRegistryUrl) {
return this.with(this.topologyFactory, this.propertiesFactory, this.defaultKeySerde, this.defaultValueSerde,
schemaRegistryMock);
schemaRegistryUrl);
}

/**
* Start the {@code TestTopology} and create all required resources.
* <p>
* This method starts the {@link SchemaRegistryMock}, creates the state directory and creates a
* {@link TopologyTestDriver}.
* This method creates the state directory and creates a {@link TopologyTestDriver}.
*/
public void start() {
this.schemaRegistry.start();
this.properties.putAll(this.propertiesFactory.apply(this.getSchemaRegistryUrl()));
this.properties.putAll(this.propertiesFactory.apply(this.schemaRegistryUrl));
try {
this.stateDirectory = Files.createTempDirectory("fluent-kafka-streams");
} catch (final IOException e) {
Expand All @@ -309,13 +313,11 @@ public void start() {
}
}

protected <K, V> TestTopology<K, V> with(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
final Serde<V> defaultValueSerde,
final SchemaRegistryMock schemaRegistry) {
return new TestTopology<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,
schemaRegistry);
/**
* Get the client to the schema registry for setup or verifications.
*/
public SchemaRegistryClient getSchemaRegistry() {
return this.getSchemaRegistry(null);
}

/**
Expand Down Expand Up @@ -419,31 +421,19 @@ public TestOutput<DefaultK, DefaultV> tableOutput(final String topic) {
return this.streamOutput(topic).asTable();
}

/**
* Get the client to the schema registry for setup or verifications.
*/
public SchemaRegistryClient getSchemaRegistry() {
return this.schemaRegistry.getSchemaRegistryClient();
}

/**
* Get the URL of the schema registry in the format that is expected in Kafka Streams configurations.
*/
public String getSchemaRegistryUrl() {
return this.schemaRegistry.getUrl();
public SchemaRegistryClient getSchemaRegistry(final List<SchemaProvider> providers) {
return SchemaRegistryClientFactory.newClient(List.of(this.schemaRegistryUrl), 0, providers, emptyMap(), null);
}

/**
* Stop the {@code TestTopology} and cleaning up all resources.
* <p>
* This method closes the {@link TopologyTestDriver}, stops the {@link SchemaRegistryMock} and removes the state
* directory.
* This method closes the {@link TopologyTestDriver} and removes the state directory.
*/
public void stop() {
if (this.testDriver != null) {
this.testDriver.close();
}
this.schemaRegistry.stop();
try (final Stream<Path> stateFiles = Files.walk(this.stateDirectory)) {
stateFiles.sorted(Comparator.reverseOrder())
.map(Path::toFile)
Expand All @@ -453,6 +443,15 @@ public void stop() {
}
}

protected <K, V> TestTopology<K, V> with(
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
final Serde<V> defaultValueSerde,
final String schemaRegistryUrl) {
return new TestTopology<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,
schemaRegistryUrl);
}

private Properties createProperties() {
final Properties props = new Properties();
props.putAll(this.properties);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -28,9 +28,6 @@
import static com.bakdata.fluent_kafka_streams_tests.test_types.proto.PersonOuterClass.Person;

import com.bakdata.fluent_kafka_streams_tests.test_applications.CountInhabitantsWithProto;
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -42,11 +39,8 @@ class CountInhabitantsWithProtoTest {

private final CountInhabitantsWithProto app = new CountInhabitantsWithProto();

private final SchemaRegistryMock mock =
new SchemaRegistryMock(Collections.singletonList(new ProtobufSchemaProvider()));
private final TestTopology<Object, Object> testTopology =
new TestTopology<>(this.app::getTopology, this::properties)
.withSchemaRegistryMock(this.mock);
new TestTopology<>(this.app::getTopology, this::properties);

static Person newPerson(final String name, final String city) {
return Person.newBuilder().setName(name).setCity(city).build();
Expand Down
13 changes: 0 additions & 13 deletions schema-registry-mock-junit4/build.gradle.kts

This file was deleted.

3 changes: 0 additions & 3 deletions schema-registry-mock-junit4/lombok.config

This file was deleted.

Loading

0 comments on commit cde20aa

Please sign in to comment.