From cde20aa06117947562148d4981aa1d32d3334d32 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Wed, 8 Jan 2025 21:16:03 +0100 Subject: [PATCH] Use MockSchemaRegistry and remove custom mock --- .../build.gradle.kts | 1 - .../junit4/TestTopologyRule.java | 6 +- .../junit4/WordCountWitherTest.java | 4 +- .../build.gradle.kts | 1 - .../junit5/TestTopologyExtension.java | 6 +- .../junit5/WordCountWitherTest.java | 4 +- fluent-kafka-streams-tests/build.gradle.kts | 6 +- .../TestTopology.java | 75 ++-- .../CountInhabitantsWithProtoTest.java | 10 +- schema-registry-mock-junit4/build.gradle.kts | 13 - schema-registry-mock-junit4/lombok.config | 3 - .../junit4/SchemaRegistryMockRule.java | 77 ---- .../junit4/SchemaRegistryMockRuleTest.java | 125 ------ schema-registry-mock-junit5/build.gradle.kts | 12 - schema-registry-mock-junit5/lombok.config | 3 - .../junit5/SchemaRegistryMockExtension.java | 92 ---- .../ProtobufRegistryMockExtensionTest.java | 189 -------- .../SchemaRegistryMockExtensionTest.java | 125 ------ .../src/test/resources/record.proto | 9 - schema-registry-mock/README.md | 113 ----- schema-registry-mock/build.gradle.kts | 15 - schema-registry-mock/lombok.config | 3 - .../AllSubjectsHandler.java | 49 --- .../AutoRegistrationHandler.java | 64 --- .../DeleteSubjectHandler.java | 50 --- .../ErrorResponseTransformer.java | 72 --- .../GetSubjectSchemaVersionHandler.java | 61 --- .../schemaregistrymock/GetVersionHandler.java | 61 --- .../ListVersionsHandler.java | 50 --- .../SchemaRegistryMock.java | 416 ------------------ .../schemaregistrymock/SubjectsHandler.java | 49 --- .../ErrorResponseTransformerTest.java | 86 ---- .../ProtobufRegistryMockTest.java | 200 --------- .../SchemaRegistryMockTest.java | 279 ------------ .../src/test/resources/log4j2.xml | 16 - .../src/test/resources/nested.proto | 6 - .../src/test/resources/record.proto | 9 - settings.gradle | 2 +- 38 files changed, 54 insertions(+), 2308 deletions(-) delete mode 100644 schema-registry-mock-junit4/build.gradle.kts delete mode 100644 schema-registry-mock-junit4/lombok.config delete mode 100644 schema-registry-mock-junit4/src/main/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRule.java delete mode 100644 schema-registry-mock-junit4/src/test/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRuleTest.java delete mode 100644 schema-registry-mock-junit5/build.gradle.kts delete mode 100644 schema-registry-mock-junit5/lombok.config delete mode 100644 schema-registry-mock-junit5/src/main/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtension.java delete mode 100644 schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/ProtobufRegistryMockExtensionTest.java delete mode 100644 schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java delete mode 100644 schema-registry-mock-junit5/src/test/resources/record.proto delete mode 100644 schema-registry-mock/README.md delete mode 100644 schema-registry-mock/build.gradle.kts delete mode 100644 schema-registry-mock/lombok.config delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AllSubjectsHandler.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AutoRegistrationHandler.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/DeleteSubjectHandler.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ErrorResponseTransformer.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetSubjectSchemaVersionHandler.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetVersionHandler.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ListVersionsHandler.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java delete mode 100644 schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SubjectsHandler.java delete mode 100644 schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ErrorResponseTransformerTest.java delete mode 100644 schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ProtobufRegistryMockTest.java delete mode 100644 schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java delete mode 100644 schema-registry-mock/src/test/resources/log4j2.xml delete mode 100644 schema-registry-mock/src/test/resources/nested.proto delete mode 100644 schema-registry-mock/src/test/resources/record.proto diff --git a/fluent-kafka-streams-tests-junit4/build.gradle.kts b/fluent-kafka-streams-tests-junit4/build.gradle.kts index 0e59432..363ad66 100644 --- a/fluent-kafka-streams-tests-junit4/build.gradle.kts +++ b/fluent-kafka-streams-tests-junit4/build.gradle.kts @@ -2,7 +2,6 @@ description = "Provides the fluent Kafka Streams test framework." dependencies { api(project(":fluent-kafka-streams-tests")) - api(project(":schema-registry-mock")) val junit4Version: String by project api(group = "junit", name = "junit", version = junit4Version) diff --git a/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java b/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java index 5facd5f..198df5a 100644 --- a/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java +++ b/fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -151,8 +151,8 @@ public TestTopologyRule withDefaultSerde(final Serde defaultKeyS } @Override - public TestTopologyRule withSchemaRegistryMock(final SchemaRegistryMock schemaRegistryMock) { - return (TestTopologyRule) super.withSchemaRegistryMock(schemaRegistryMock); + public TestTopologyRule withSchemaRegistryUrl(final SchemaRegistryMock schemaRegistryMock) { + return (TestTopologyRule) super.withSchemaRegistryUrl(schemaRegistryMock); } } diff --git a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java index c3a1c50..1f67b67 100644 --- a/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java +++ b/fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -39,7 +39,7 @@ public class WordCountWitherTest { public final TestTopologyRule testTopology = new TestTopologyRule<>(this.app.getTopology(), WordCount.getKafkaProperties()) .withDefaultValueSerde(Serdes.String()) - .withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); + .withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); @Test public void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests-junit5/build.gradle.kts b/fluent-kafka-streams-tests-junit5/build.gradle.kts index b07f16c..498c9b2 100644 --- a/fluent-kafka-streams-tests-junit5/build.gradle.kts +++ b/fluent-kafka-streams-tests-junit5/build.gradle.kts @@ -2,7 +2,6 @@ description = "Provides the fluent Kafka Streams test framework." dependencies { api(project(":fluent-kafka-streams-tests")) - api(project(":schema-registry-mock")) val junit5Version: String by project testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) diff --git a/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java b/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java index cd5792e..8cc0010 100644 --- a/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java +++ b/fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -147,8 +147,8 @@ public TestTopologyExtension withDefaultSerde(final Serde defaul } @Override - public TestTopologyExtension withSchemaRegistryMock( + public TestTopologyExtension withSchemaRegistryUrl( final SchemaRegistryMock schemaRegistryMock) { - return (TestTopologyExtension) super.withSchemaRegistryMock(schemaRegistryMock); + return (TestTopologyExtension) super.withSchemaRegistryUrl(schemaRegistryMock); } } diff --git a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java index e9f9084..9ad610a 100644 --- a/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java +++ b/fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -39,7 +39,7 @@ class WordCountWitherTest { final TestTopologyExtension testTopology = new TestTopologyExtension<>(this.app::getTopology, WordCount.getKafkaProperties()) .withDefaultValueSerde(Serdes.String()) - .withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); + .withSchemaRegistryUrl(new SchemaRegistryMock(List.of(new AvroSchemaProvider()))); @Test void shouldAggregateSameWordStream() { diff --git a/fluent-kafka-streams-tests/build.gradle.kts b/fluent-kafka-streams-tests/build.gradle.kts index a892f4d..32cb89a 100644 --- a/fluent-kafka-streams-tests/build.gradle.kts +++ b/fluent-kafka-streams-tests/build.gradle.kts @@ -13,13 +13,15 @@ dependencies { "api"(group = "org.apache.kafka", name = "kafka-clients", version = kafkaVersion) "api"(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion) "api"(group = "org.apache.kafka", name = "kafka-streams-test-utils", version = kafkaVersion) - api(project(":schema-registry-mock")) + val confluentVersion: String by project + api(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) + implementation(group = "io.confluent", name = "kafka-schema-serializer", version = confluentVersion) val junit5Version: String by project testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junit5Version) testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) testImplementation(group = "org.apache.avro", name = "avro", version = "1.12.0") - val confluentVersion: String by project + testImplementation(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) testImplementation(group = "io.confluent", name = "kafka-protobuf-provider", version = confluentVersion) testImplementation(group = "io.confluent", name = "kafka-streams-protobuf-serde", version = confluentVersion) testImplementation(group = "com.google.protobuf", name = "protobuf-java", version = "3.25.5") diff --git a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java index d1ff091..1a13150 100644 --- a/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java +++ b/fluent-kafka-streams-tests/src/main/java/com/bakdata/fluent_kafka_streams_tests/TestTopology.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -24,8 +24,12 @@ package com.bakdata.fluent_kafka_streams_tests; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; +import static java.util.Collections.emptyMap; + +import io.confluent.kafka.schemaregistry.SchemaProvider; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory; +import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import java.io.File; import java.io.IOException; @@ -36,6 +40,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Properties; @@ -102,7 +107,7 @@ */ @Getter public class TestTopology { - private final SchemaRegistryMock schemaRegistry; + private final String schemaRegistryUrl; private final Function, ? extends Topology> topologyFactory; private final Map properties = new HashMap<>(); private final Collection inputTopics = new HashSet<>(); @@ -121,8 +126,9 @@ public class TestTopology { protected TestTopology(final Function, ? extends Topology> topologyFactory, final Function> propertiesFactory, final Serde defaultKeySerde, - final Serde defaultValueSerde, final SchemaRegistryMock schemaRegistry) { - this.schemaRegistry = schemaRegistry; + final Serde defaultValueSerde, final String schemaRegistryUrl) { + MockSchemaRegistry.validateAndMaybeGetMockScope(List.of(schemaRegistryUrl)); + this.schemaRegistryUrl = schemaRegistryUrl; this.topologyFactory = topologyFactory; this.propertiesFactory = propertiesFactory; this.defaultKeySerde = defaultKeySerde; @@ -140,7 +146,7 @@ protected TestTopology(final Function, ? extends Top */ public TestTopology(final Function, ? extends Topology> topologyFactory, final Function> propertiesFactory) { - this(topologyFactory, propertiesFactory, null, null, new SchemaRegistryMock()); + this(topologyFactory, propertiesFactory, null, null, "mock://"); } /** @@ -262,29 +268,27 @@ public TestTopology withDefaultKeySerde(final Serde defaultK public TestTopology withDefaultSerde(final Serde defaultKeySerde, final Serde defaultValueSerde) { return this.with(this.topologyFactory, this.propertiesFactory, defaultKeySerde, defaultValueSerde, - this.schemaRegistry); + this.schemaRegistryUrl); } /** - * Overrides the {@link SchemaRegistryMock} + * Overrides the schema registry url * - * @param schemaRegistryMock {@code SchemaRegistryMock} to use - * @return Copy of current {@code TestTopology} with provided {@code SchemaRegistryMock} + * @param schemaRegistryUrl schema registry url to use + * @return Copy of current {@code TestTopology} with provided schema registry url */ - public TestTopology withSchemaRegistryMock(final SchemaRegistryMock schemaRegistryMock) { + public TestTopology withSchemaRegistryUrl(final String schemaRegistryUrl) { return this.with(this.topologyFactory, this.propertiesFactory, this.defaultKeySerde, this.defaultValueSerde, - schemaRegistryMock); + schemaRegistryUrl); } /** * Start the {@code TestTopology} and create all required resources. *

- * This method starts the {@link SchemaRegistryMock}, creates the state directory and creates a - * {@link TopologyTestDriver}. + * This method creates the state directory and creates a {@link TopologyTestDriver}. */ public void start() { - this.schemaRegistry.start(); - this.properties.putAll(this.propertiesFactory.apply(this.getSchemaRegistryUrl())); + this.properties.putAll(this.propertiesFactory.apply(this.schemaRegistryUrl)); try { this.stateDirectory = Files.createTempDirectory("fluent-kafka-streams"); } catch (final IOException e) { @@ -309,13 +313,11 @@ public void start() { } } - protected TestTopology with( - final Function, ? extends Topology> topologyFactory, - final Function> propertiesFactory, final Serde defaultKeySerde, - final Serde defaultValueSerde, - final SchemaRegistryMock schemaRegistry) { - return new TestTopology<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, - schemaRegistry); + /** + * Get the client to the schema registry for setup or verifications. + */ + public SchemaRegistryClient getSchemaRegistry() { + return this.getSchemaRegistry(null); } /** @@ -419,31 +421,19 @@ public TestOutput tableOutput(final String topic) { return this.streamOutput(topic).asTable(); } - /** - * Get the client to the schema registry for setup or verifications. - */ - public SchemaRegistryClient getSchemaRegistry() { - return this.schemaRegistry.getSchemaRegistryClient(); - } - - /** - * Get the URL of the schema registry in the format that is expected in Kafka Streams configurations. - */ - public String getSchemaRegistryUrl() { - return this.schemaRegistry.getUrl(); + public SchemaRegistryClient getSchemaRegistry(final List providers) { + return SchemaRegistryClientFactory.newClient(List.of(this.schemaRegistryUrl), 0, providers, emptyMap(), null); } /** * Stop the {@code TestTopology} and cleaning up all resources. *

- * This method closes the {@link TopologyTestDriver}, stops the {@link SchemaRegistryMock} and removes the state - * directory. + * This method closes the {@link TopologyTestDriver} and removes the state directory. */ public void stop() { if (this.testDriver != null) { this.testDriver.close(); } - this.schemaRegistry.stop(); try (final Stream stateFiles = Files.walk(this.stateDirectory)) { stateFiles.sorted(Comparator.reverseOrder()) .map(Path::toFile) @@ -453,6 +443,15 @@ public void stop() { } } + protected TestTopology with( + final Function, ? extends Topology> topologyFactory, + final Function> propertiesFactory, final Serde defaultKeySerde, + final Serde defaultValueSerde, + final String schemaRegistryUrl) { + return new TestTopology<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, + schemaRegistryUrl); + } + private Properties createProperties() { final Properties props = new Properties(); props.putAll(this.properties); diff --git a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java index 72a61b9..c3175f9 100644 --- a/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java +++ b/fluent-kafka-streams-tests/src/test/java/com/bakdata/fluent_kafka_streams_tests/CountInhabitantsWithProtoTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2024 bakdata + * Copyright (c) 2025 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -28,9 +28,6 @@ import static com.bakdata.fluent_kafka_streams_tests.test_types.proto.PersonOuterClass.Person; import com.bakdata.fluent_kafka_streams_tests.test_applications.CountInhabitantsWithProto; -import com.bakdata.schemaregistrymock.SchemaRegistryMock; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; -import java.util.Collections; import java.util.Map; import org.apache.kafka.common.serialization.Serdes; import org.junit.jupiter.api.AfterEach; @@ -42,11 +39,8 @@ class CountInhabitantsWithProtoTest { private final CountInhabitantsWithProto app = new CountInhabitantsWithProto(); - private final SchemaRegistryMock mock = - new SchemaRegistryMock(Collections.singletonList(new ProtobufSchemaProvider())); private final TestTopology testTopology = - new TestTopology<>(this.app::getTopology, this::properties) - .withSchemaRegistryMock(this.mock); + new TestTopology<>(this.app::getTopology, this::properties); static Person newPerson(final String name, final String city) { return Person.newBuilder().setName(name).setCity(city).build(); diff --git a/schema-registry-mock-junit4/build.gradle.kts b/schema-registry-mock-junit4/build.gradle.kts deleted file mode 100644 index 478bbd7..0000000 --- a/schema-registry-mock-junit4/build.gradle.kts +++ /dev/null @@ -1,13 +0,0 @@ -description = "Mocks the HTTP endpoint of the schema registry for seamlessly testing topologies with Avro serdes" - -dependencies { - val junit4Version: String by project - api(group = "junit", name = "junit", version = junit4Version) - api(project(":schema-registry-mock")) - - testImplementation(group = "junit", name = "junit", version = junit4Version) -} - -tasks.test { - useJUnit() -} diff --git a/schema-registry-mock-junit4/lombok.config b/schema-registry-mock-junit4/lombok.config deleted file mode 100644 index 189c0be..0000000 --- a/schema-registry-mock-junit4/lombok.config +++ /dev/null @@ -1,3 +0,0 @@ -# This file is generated by the 'io.freefair.lombok' Gradle plugin -config.stopBubbling = true -lombok.addLombokGeneratedAnnotation = true diff --git a/schema-registry-mock-junit4/src/main/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRule.java b/schema-registry-mock-junit4/src/main/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRule.java deleted file mode 100644 index 586d1d7..0000000 --- a/schema-registry-mock-junit4/src/main/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRule.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2019 bakdata GmbH - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.bakdata.schemaregistrymock.junit4; - -import com.bakdata.schemaregistrymock.SchemaRegistryMock; -import org.junit.rules.TestRule; -import org.junit.runner.Description; -import org.junit.runners.model.Statement; - -/** - *

The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.

- * In particular, - *
    - *
  • you can register a schema
  • - *
  • retrieve a schema by id.
  • - *
  • list and get schema versions of a subject
  • - *
- * - *

If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at - * all.

- * - *

Without the test framework, you can use the mock as follows:

- *

- * public class SchemaRegistryMockTest {
- *     {@literal @Rule}
- *     public final SchemaRegistryMockRule schemaRegistry = new SchemaRegistryMockRule();
- *
- *     {@literal @Test}
- *     public void shouldRegisterKeySchema() throws IOException, RestClientException {
- *         final Schema keySchema = this.createSchema("key_schema");
- *         final int id = this.schemaRegistry.registerKeySchema("test-topic", keySchema);
- *
- *         final Schema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getById(id);
- *         assertThat(retrievedSchema).isEqualTo(keySchema);
- *     }
- * }
- * - * To retrieve the url of the schema registry for a Kafka Streams config, please use {@link #getUrl()} - */ -public class SchemaRegistryMockRule extends SchemaRegistryMock implements TestRule { - - @Override - public Statement apply(final Statement base, final Description description) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - SchemaRegistryMockRule.this.start(); - try { - base.evaluate(); - } finally { - SchemaRegistryMockRule.this.stop(); - } - } - }; - } -} diff --git a/schema-registry-mock-junit4/src/test/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRuleTest.java b/schema-registry-mock-junit4/src/test/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRuleTest.java deleted file mode 100644 index 94bf58a..0000000 --- a/schema-registry-mock-junit4/src/test/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRuleTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata GmbH - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.bakdata.schemaregistrymock.junit4; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import org.apache.avro.Schema; -import org.junit.Rule; -import org.junit.Test; - -public class SchemaRegistryMockRuleTest { - @Rule - public final SchemaRegistryMockRule schemaRegistry = new SchemaRegistryMockRule(); - - private static Schema createSchema(final String name) { - return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); - } - - @Test - public void shouldRegisterKeySchema() throws IOException, RestClientException { - final Schema keySchema = createSchema("key_schema"); - final int id = this.schemaRegistry.registerKeySchema("test-topic", keySchema); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(keySchema); - } - - @Test - public void shouldRegisterValueSchema() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final int id = this.schemaRegistry.registerValueSchema("test-topic", valueSchema); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(valueSchema); - } - - @Test - public void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException { - final Schema keySchema = createSchema("key_schema"); - final int id = - this.schemaRegistry.getSchemaRegistryClient().register("test-topic-key", new AvroSchema(keySchema)); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(keySchema); - } - - @Test - public void shouldRegisterValueSchemaWithClient() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final int id = - this.schemaRegistry.getSchemaRegistryClient().register("test-topic-value", new AvroSchema(valueSchema)); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(valueSchema); - } - - @Test - public void shouldHaveSchemaVersions() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - final String schemaString = metadata.getSchema(); - final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema); - } - - @Test - public void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { - final Schema valueSchema1 = createSchema("value_schema"); - final String topic = "test-topic"; - final int id1 = this.schemaRegistry.registerValueSchema(topic, valueSchema1); - - final List fields = Collections.singletonList( - new Schema.Field("f1", Schema.create(Schema.Type.STRING), "", null)); - final Schema valueSchema2 = Schema.createRecord("value_schema", "no doc", "", false, fields); - final int id2 = this.schemaRegistry.registerValueSchema(topic, valueSchema2); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(2); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); - final int metadataId = metadata.getId(); - assertThat(metadataId) - .isNotEqualTo(id1) - .isEqualTo(id2); - final String schemaString = metadata.getSchema(); - final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema2); - } -} diff --git a/schema-registry-mock-junit5/build.gradle.kts b/schema-registry-mock-junit5/build.gradle.kts deleted file mode 100644 index 2cf01be..0000000 --- a/schema-registry-mock-junit5/build.gradle.kts +++ /dev/null @@ -1,12 +0,0 @@ -description = "Mocks the HTTP endpoint of the schema registry for seamlessly testing topologies with Avro serdes" - -dependencies { - val junit5Version: String by project - api(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) - api(project(":schema-registry-mock")) - - testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junit5Version) - testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) - val confluentVersion: String by project - testImplementation(group = "io.confluent", name = "kafka-protobuf-provider", version = confluentVersion) -} diff --git a/schema-registry-mock-junit5/lombok.config b/schema-registry-mock-junit5/lombok.config deleted file mode 100644 index 189c0be..0000000 --- a/schema-registry-mock-junit5/lombok.config +++ /dev/null @@ -1,3 +0,0 @@ -# This file is generated by the 'io.freefair.lombok' Gradle plugin -config.stopBubbling = true -lombok.addLombokGeneratedAnnotation = true diff --git a/schema-registry-mock-junit5/src/main/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtension.java b/schema-registry-mock-junit5/src/main/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtension.java deleted file mode 100644 index ee630bc..0000000 --- a/schema-registry-mock-junit5/src/main/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtension.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata GmbH - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.bakdata.schemaregistrymock.junit5; - -import com.bakdata.schemaregistrymock.SchemaRegistryMock; -import io.confluent.kafka.schemaregistry.SchemaProvider; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import java.util.List; -import org.junit.jupiter.api.extension.AfterEachCallback; -import org.junit.jupiter.api.extension.BeforeEachCallback; -import org.junit.jupiter.api.extension.ExtensionContext; - -/** - *

The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.

- * In particular, - *
    - *
  • you can register a schema
  • - *
  • retrieve a schema by id.
  • - *
  • list and get schema versions of a subject
  • - *
- * - *

If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at - * all.

- * - *

Without the test framework, you can use the mock as follows:

- *

- * class SchemaRegistryMockTest {
- *     {@literal @RegisterExtension}
- *     final SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension();
- *
- *     {@literal @Test}
- *     void shouldRegisterKeySchema() throws IOException, RestClientException {
- *         final Schema keySchema = this.createSchema("key_schema");
- *         final int id = this.schemaRegistry.registerKeySchema("test-topic", keySchema);
- *
- *         final Schema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getById(id);
- *         assertThat(retrievedSchema).isEqualTo(keySchema);
- *     }
- * }
- * - * To retrieve the url of the schema registry for a Kafka Streams config, please use {@link #getUrl()} - */ -public class SchemaRegistryMockExtension extends SchemaRegistryMock implements BeforeEachCallback, AfterEachCallback { - /** - * Create a new {@code SchemaRegistryMockExtension} with default {@link SchemaProvider SchemaProviders}. - * - * @see #SchemaRegistryMockExtension(List) - */ - public SchemaRegistryMockExtension() { - this(null); - } - - /** - * Create a new {@code SchemaRegistryMockExtension} from {@link SchemaProvider SchemaProviders}. - * - * @param schemaProviders List of {@link SchemaProvider}. If null, {@link AvroSchemaProvider} will be used. - */ - public SchemaRegistryMockExtension(final List schemaProviders) { - super(schemaProviders); - } - - @Override - public void afterEach(final ExtensionContext context) { - this.stop(); - } - - @Override - public void beforeEach(final ExtensionContext context) { - this.start(); - } -} diff --git a/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/ProtobufRegistryMockExtensionTest.java b/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/ProtobufRegistryMockExtensionTest.java deleted file mode 100644 index 4ab8c04..0000000 --- a/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/ProtobufRegistryMockExtensionTest.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata GmbH - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock.junit5; - -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; - -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -class ProtobufRegistryMockExtensionTest { - @RegisterExtension - private final SchemaRegistryMockExtension schemaRegistry; - private final ParsedSchema schema; - - ProtobufRegistryMockExtensionTest() throws IOException { - this.schemaRegistry = new SchemaRegistryMockExtension(Collections.singletonList(new ProtobufSchemaProvider())); - try (final InputStream input = ProtobufRegistryMockExtensionTest.class.getResourceAsStream("/record.proto"); - final BufferedReader reader = new BufferedReader(new InputStreamReader(input))) { - this.schema = new ProtobufSchema(reader.lines().collect(Collectors.joining("\n"))); - } - } - - @Test - void shouldRegisterKeySchema() throws IOException, RestClientException { - final int id = this.schemaRegistry.registerKeySchema("test-topic", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldRegisterValueSchema() throws IOException, RestClientException { - final int id = this.schemaRegistry.registerValueSchema("test-topic", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException { - final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-key", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldRegisterValueSchemaWithClient() throws IOException, RestClientException { - final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-value", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldHaveSchemaVersions() throws IOException, RestClientException { - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, this.schema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - final String schemaString = metadata.getSchema(); - final ParsedSchema retrievedSchema = new ProtobufSchema(schemaString); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldReturnAllSubjects() throws IOException, RestClientException { - this.schemaRegistry.registerKeySchema("test-topic", this.schema); - this.schemaRegistry.registerValueSchema("test-topic", this.schema); - final Collection allSubjects = this.schemaRegistry.getSchemaRegistryClient().getAllSubjects(); - assertThat(allSubjects).hasSize(2).containsExactly("test-topic-key", "test-topic-value"); - } - - - @Test - void shouldDeleteKeySchema() throws IOException, RestClientException { - this.schemaRegistry.registerKeySchema("test-topic", this.schema); - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key"); - this.schemaRegistry.deleteKeySchema("test-topic"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteValueSchema() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerValueSchema("test-topic", this.schema); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value"); - this.schemaRegistry.deleteValueSchema("test-topic"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteKeySchemaWithClient() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerKeySchema("test-topic", this.schema); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key"); - client.deleteSubject("test-topic-key"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteValueSchemaWithClient() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerValueSchema("test-topic", this.schema); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value"); - client.deleteSubject("test-topic-value"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException { - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, this.schema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) - .isNotNull(); - this.schemaRegistry.deleteValueSchema(topic); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient() - .getSchemaMetadata(topic + "-value", versions.get(0))) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy( - () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - } - -} diff --git a/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java b/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java deleted file mode 100644 index 5433a28..0000000 --- a/schema-registry-mock-junit5/src/test/java/com/bakdata/schemaregistrymock/junit5/SchemaRegistryMockExtensionTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2024 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.bakdata.schemaregistrymock.junit5; - -import static org.assertj.core.api.Assertions.assertThat; - -import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import org.apache.avro.Schema; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -class SchemaRegistryMockExtensionTest { - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension(); - - private static Schema createSchema(final String name) { - return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); - } - - @Test - void shouldRegisterKeySchema() throws IOException, RestClientException { - final Schema keySchema = createSchema("key_schema"); - final int id = this.schemaRegistry.registerKeySchema("test-topic", keySchema); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(keySchema); - } - - @Test - void shouldRegisterValueSchema() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final int id = this.schemaRegistry.registerValueSchema("test-topic", valueSchema); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(valueSchema); - } - - @Test - void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException { - final Schema keySchema = createSchema("key_schema"); - final int id = - this.schemaRegistry.getSchemaRegistryClient().register("test-topic-key", new AvroSchema(keySchema)); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(keySchema); - } - - @Test - void shouldRegisterValueSchemaWithClient() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final int id = - this.schemaRegistry.getSchemaRegistryClient().register("test-topic-value", new AvroSchema(valueSchema)); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(valueSchema); - } - - @Test - void shouldHaveSchemaVersions() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - final String schemaString = metadata.getSchema(); - final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema); - } - - @Test - void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { - final Schema valueSchema1 = createSchema("value_schema"); - final String topic = "test-topic"; - final int id1 = this.schemaRegistry.registerValueSchema(topic, valueSchema1); - - final List fields = Collections.singletonList( - new Schema.Field("f1", Schema.create(Schema.Type.STRING), "", null)); - final Schema valueSchema2 = Schema.createRecord("value_schema", "no doc", "", false, fields); - final int id2 = this.schemaRegistry.registerValueSchema(topic, valueSchema2); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(2); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); - final int metadataId = metadata.getId(); - assertThat(metadataId) - .isNotEqualTo(id1) - .isEqualTo(id2); - final String schemaString = metadata.getSchema(); - final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema2); - } -} diff --git a/schema-registry-mock-junit5/src/test/resources/record.proto b/schema-registry-mock-junit5/src/test/resources/record.proto deleted file mode 100644 index e1ee892..0000000 --- a/schema-registry-mock-junit5/src/test/resources/record.proto +++ /dev/null @@ -1,9 +0,0 @@ -syntax = "proto3"; -package com.bakdata.proto; - -import "nested.proto"; - -message Record { - string f1 = 1; - Nested f2 = 2; -} diff --git a/schema-registry-mock/README.md b/schema-registry-mock/README.md deleted file mode 100644 index 7562dc1..0000000 --- a/schema-registry-mock/README.md +++ /dev/null @@ -1,113 +0,0 @@ -[![Build Status](https://travis-ci.com/bakdata/fluent-kafka-streams-tests.svg?branch=master)](https://travis-ci.com/bakdata/fluent-kafka-streams-tests) -[![Sonarcloud status](https://sonarcloud.io/api/project_badges/measure?project=com.bakdata.fluent-kafka-streams-tests%3Afluent-kafka-streams-tests&metric=alert_status)](https://sonarcloud.io/dashboard?id=com.bakdata.fluent-kafka-streams-tests%3Afluent-kafka-streams-tests) -[![Code coverage](https://sonarcloud.io/api/project_badges/measure?project=com.bakdata.fluent-kafka-streams-tests%3Afluent-kafka-streams-tests&metric=coverage)](https://sonarcloud.io/dashboard?id=com.bakdata.fluent-kafka-streams-tests%3Afluent-kafka-streams-tests) -[![Maven](https://img.shields.io/maven-central/v/com.bakdata.fluent-kafka-streams-tests/schema-registry-mock.svg)](https://search.maven.org/search?q=g:com.bakdata.fluent-kafka-streams-tests%20AND%20a:schema-registry-mock&core=gav) - -Schema Registry Mock -==================== - -Mock your Schema Registry in Kafka Streams Tests. - -You can find a [blog post on medium](https://medium.com/bakdata/transparent-schema-registry-for-kafka-streams-6b43a3e7a15c) with some examples and detailed explanations of how the Schema Registry Mock works with the Fluent Kafka Streams Tests framework. - -## Getting Started -You can find the Schema Registry Mock via Maven Central. - -#### Gradle -```gradle -compile group: 'com.bakdata', name: 'schema-registry-mock-junit5', version: '2.0.0' -``` - -#### Maven -```xml - - com.bakdata - schema-registry-mock-junit5 - 2.0.0 - -``` - -There is also a junit4 version and one without any dependencies to a specific testing framework. - -For other build tools or versions, refer to the [overview of sonatype](https://search.maven.org/search?q=g:com.bakdata.fluent-kafka-streams-tests%20AND%20a:schema-registry-mock*&core=gav). - -## Using it in Tests - -There are two ways to use the Mock Schema Registry, -together with the Fluent Kafka Streams Tests -or as a standalone module in your existing test framework. - -### With Fluent Kafka Streams Tests - -Using the Mock Schema Registry with the Fluent Kafka Streams Tests is very straightforward. -All you need to do, is set your (default) serde to `GenericAvroSerde` or `SpecificAvroSerde`, so that your topology actually needs to use the Schema Registry. -The `TestTopology` takes care of updating the Kafka config with the url of the Mock Schema Registry for you. - -You can then write a simple test that uses Avro, without having to deal with the Schema Registry. - -```java -@Test -void shouldAggregateInhabitants() { - this.testTopology.input() - .add(new Person("Huey", "City1")) - .add(new Person("Dewey", "City2")) - .add(new Person("Louie", "City1")); - - this.testTopology.tableOutput().withValueType(City.class) - .expectNextRecord().hasKey("City1").hasValue(new City("City1", 2)) - .expectNextRecord().hasKey("City2").hasValue(new City("City2", 1)) - .expectNoMoreRecord(); -} -``` - - -### As a Standalone Module -To use this in your tests, you need to do three things: - -- Setup the `SchemaRegistryMock` as an extension of your test. -```java -class SchemaRegistryMockTest { - @RegisterExtension - final SchemaRegistryMockExtension schemaRegistry = new SchemaRegistryMockExtension(); - ... -} -``` -See the tests for the [junit4](../schema-registry-mock-junit4/src/test/java/com/bakdata/schemaregistrymock/junit4/SchemaRegistryMockRuleTest.java) and [framework agnostic](../schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java) setup. - - Set the serde of your key and/or value to `GenericAvroSerde` or `SpecificAvroSerde`, so that your topology actually needs to use the Schema Registry. - - Set the `AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG` to `schemaRegistry.getSchemaRegistryUrl()`, so that your test knows where to find the Schema Registry. -```java -@Test -void customTest() { - Properties properties = new Properties(); - properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.getSchemaRegistryUrl()); - ... - testDriver = new TopologyTestDriver(topology, properties); -} -``` - -After that, you can write a test that uses the Schema Registry in your testing framework. - - -## Development - -If you want to contribute to this project, you can simply clone the repository and build it via Gradle. -All dependencies should be included in the Gradle files, there are no external prerequisites. - -```bash -> git clone git@github.com:bakdata/fluent-kafka-streams-tests.git -> cd fluent-kafka-streams-tests && ./gradlew build -``` - -Please note, that we have [code styles](https://github.com/bakdata/bakdata-code-styles) for Java. -They are basically the Google style guide, with some small modifications. - -## Contributing - -We are happy if you want to contribute to this project. -If you find any bugs or have suggestions for improvements, please open an issue. -We are also happy to accept your PRs. -Just open an issue beforehand and let us know what you want to do and why. - -## License -This project is licensed under the MIT license. -Have a look at the [LICENSE](https://github.com/bakdata/fluent-kafka-streams-tests/blob/master/LICENSE) for more details. diff --git a/schema-registry-mock/build.gradle.kts b/schema-registry-mock/build.gradle.kts deleted file mode 100644 index 21ab0ec..0000000 --- a/schema-registry-mock/build.gradle.kts +++ /dev/null @@ -1,15 +0,0 @@ -description = "Mocks the HTTP endpoint of the schema registry for seamlessly testing topologies with Avro serdes" - -dependencies { - val confluentVersion: String by project - "api"(group = "io.confluent", name = "kafka-avro-serializer", version = confluentVersion) - "api"(group = "io.confluent", name = "kafka-schema-registry-client", version = confluentVersion) - "api"(group = "io.confluent", name = "kafka-streams-avro-serde", version = confluentVersion) - - implementation(group = "org.wiremock", name = "wiremock", version = "3.10.0") - - val junit5Version: String by project - testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junit5Version) - testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junit5Version) - testImplementation(group = "io.confluent", name = "kafka-protobuf-provider", version = confluentVersion) -} diff --git a/schema-registry-mock/lombok.config b/schema-registry-mock/lombok.config deleted file mode 100644 index 189c0be..0000000 --- a/schema-registry-mock/lombok.config +++ /dev/null @@ -1,3 +0,0 @@ -# This file is generated by the 'io.freefair.lombok' Gradle plugin -config.stopBubbling = true -lombok.addLombokGeneratedAnnotation = true diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AllSubjectsHandler.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AllSubjectsHandler.java deleted file mode 100644 index 9ded4ba..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AllSubjectsHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import java.util.Collection; - -class AllSubjectsHandler extends SubjectsHandler { - private final SchemaRegistryMock schemaRegistryMock; - - AllSubjectsHandler(final SchemaRegistryMock schemaRegistryMock) { - this.schemaRegistryMock = schemaRegistryMock; - } - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - final Collection body = this.schemaRegistryMock.listAllSubjects(); - return ResponseDefinitionBuilder.jsonResponse(body); - } - - @Override - public String getName() { - return AllSubjectsHandler.class.getSimpleName(); - } -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AutoRegistrationHandler.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AutoRegistrationHandler.java deleted file mode 100644 index 0d6cb87..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/AutoRegistrationHandler.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import com.google.common.collect.Iterables; -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; -import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; -import java.io.IOException; - -class AutoRegistrationHandler extends SubjectsHandler { - - private final SchemaRegistryMock schemaRegistryMock; - - AutoRegistrationHandler(final SchemaRegistryMock schemaRegistryMock) { - this.schemaRegistryMock = schemaRegistryMock; - } - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - final String subject = Iterables.get(this.urlSplitter.split(serveEvent.getRequest().getUrl()), 1); - try { - final RegisterSchemaRequest schemaRequest = - RegisterSchemaRequest.fromJson(serveEvent.getRequest().getBodyAsString()); - final ParsedSchema schema = this.schemaRegistryMock.parseSchema(schemaRequest); - final int id = this.schemaRegistryMock.register(subject, schema); - final RegisterSchemaResponse registerSchemaResponse = new RegisterSchemaResponse(); - registerSchemaResponse.setId(id); - return ResponseDefinitionBuilder.jsonResponse(registerSchemaResponse); - } catch (final IOException e) { - throw new IllegalArgumentException("Cannot parse schema registration request", e); - } - } - - @Override - public String getName() { - return AutoRegistrationHandler.class.getSimpleName(); - } -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/DeleteSubjectHandler.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/DeleteSubjectHandler.java deleted file mode 100644 index 7e86e22..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/DeleteSubjectHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import java.util.List; - -class DeleteSubjectHandler extends SubjectsHandler { - private final SchemaRegistryMock schemaRegistryMock; - - DeleteSubjectHandler(final SchemaRegistryMock schemaRegistryMock) { - this.schemaRegistryMock = schemaRegistryMock; - } - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - final String subject = removeQueryParameters(this.getSubject(serveEvent.getRequest())); - final List ids = this.schemaRegistryMock.delete(subject); - return ResponseDefinitionBuilder.jsonResponse(ids); - } - - @Override - public String getName() { - return DeleteSubjectHandler.class.getSimpleName(); - } -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ErrorResponseTransformer.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ErrorResponseTransformer.java deleted file mode 100644 index e83121e..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ErrorResponseTransformer.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformerV2; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * {@link ResponseDefinitionTransformerV2} that wraps other {@link ResponseDefinitionTransformerV2} and transforms - * potential errors. - * - *

- * The {@link io.confluent.kafka.schemaregistry.client.SchemaRegistryClient} requires errors to be in the format of - * {@link ErrorMessage}, so this class transforms any exception into this format. - */ -@Slf4j -@AllArgsConstructor -class ErrorResponseTransformer implements ResponseDefinitionTransformerV2 { - // Confluent's error codes are a superset of HTTP error codes. - // see https://docs.confluent.io/platform/current/kafka-rest/api.html#errors - public static final int INTERNAL_SERVER_ERROR_CODE = 500; - private final ResponseDefinitionTransformerV2 transformer; - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - try { - return this.transformer.transform(serveEvent); - } catch (final RuntimeException e) { - log.warn("An exception occurred while handling the schema registry request '{} {}'", - serveEvent.getRequest().getMethod(), serveEvent.getRequest().getUrl(), e); - final ErrorMessage body = new ErrorMessage(INTERNAL_SERVER_ERROR_CODE, e.getMessage()); - return ResponseDefinitionBuilder.jsonResponse(body, INTERNAL_SERVER_ERROR_CODE); - } - } - - @Override - public String getName() { - return this.transformer.getName(); - } - - @Override - public boolean applyGlobally() { - return this.transformer.applyGlobally(); - } -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetSubjectSchemaVersionHandler.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetSubjectSchemaVersionHandler.java deleted file mode 100644 index b8f221a..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetSubjectSchemaVersionHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.rest.entities.Schema; -import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; -import java.io.IOException; - -class GetSubjectSchemaVersionHandler extends SubjectsHandler { - - private final SchemaRegistryMock schemaRegistryMock; - - GetSubjectSchemaVersionHandler(final SchemaRegistryMock schemaRegistryMock) { - this.schemaRegistryMock = schemaRegistryMock; - } - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - final String subject = removeQueryParameters(this.getSubject(serveEvent.getRequest())); - try { - final RegisterSchemaRequest schemaRequest = - RegisterSchemaRequest.fromJson(serveEvent.getRequest().getBodyAsString()); - final ParsedSchema parsedSchema = this.schemaRegistryMock.parseSchema(schemaRequest); - final Schema schema = this.schemaRegistryMock.getSchema(subject, parsedSchema); - return ResponseDefinitionBuilder.jsonResponse(schema); - } catch (final IOException e) { - throw new IllegalArgumentException("Cannot parse schema registration request", e); - } - } - - @Override - public String getName() { - return GetSubjectSchemaVersionHandler.class.getSimpleName(); - } -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetVersionHandler.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetVersionHandler.java deleted file mode 100644 index e163a88..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/GetVersionHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import com.github.tomakehurst.wiremock.verification.LoggedRequest; -import com.google.common.collect.Iterables; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; - -class GetVersionHandler extends SubjectsHandler { - - private final SchemaRegistryMock schemaRegistryMock; - - GetVersionHandler(final SchemaRegistryMock schemaRegistryMock) { - this.schemaRegistryMock = schemaRegistryMock; - } - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - final LoggedRequest request = serveEvent.getRequest(); - final String versionStr = Iterables - .get(this.urlSplitter.split(removeQueryParameters(request.getUrl())), 3); - final SchemaMetadata metadata; - if ("latest".equals(versionStr)) { - metadata = this.schemaRegistryMock.getSubjectVersion(this.getSubject(request), versionStr); - } else { - final int version = Integer.parseInt(versionStr); - metadata = this.schemaRegistryMock.getSubjectVersion(this.getSubject(request), version); - } - return ResponseDefinitionBuilder.jsonResponse(metadata); - } - - @Override - public String getName() { - return GetVersionHandler.class.getSimpleName(); - } -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ListVersionsHandler.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ListVersionsHandler.java deleted file mode 100644 index feef837..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/ListVersionsHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import java.util.List; - -class ListVersionsHandler extends SubjectsHandler { - - private final SchemaRegistryMock schemaRegistryMock; - - ListVersionsHandler(final SchemaRegistryMock schemaRegistryMock) { - this.schemaRegistryMock = schemaRegistryMock; - } - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - final List versions = this.schemaRegistryMock.listVersions(this.getSubject(serveEvent.getRequest())); - return ResponseDefinitionBuilder.jsonResponse(versions); - } - - @Override - public String getName() { - return ListVersionsHandler.class.getSimpleName(); - } -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java deleted file mode 100644 index d369f90..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ /dev/null @@ -1,416 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.bakdata.schemaregistrymock; - -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; - -import com.github.tomakehurst.wiremock.WireMockServer; -import com.github.tomakehurst.wiremock.client.MappingBuilder; -import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder; -import com.github.tomakehurst.wiremock.client.WireMock; -import com.github.tomakehurst.wiremock.core.WireMockConfiguration; -import com.github.tomakehurst.wiremock.extension.Extension; -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.SchemaProvider; -import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; -import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; -import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.subject.TopicNameStrategy; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Stream; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; - -/** - *

The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.

- * In particular, - *
    - *
  • you can register a schema
  • - *
  • retrieve a schema by id.
  • - *
  • list and get schema versions of a subject
  • - *
- * - *

If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at - * all.

- * - *

Without the test framework, you can use the mock as follows:

- *

- * class SchemaRegistryMockTest {
- *     private final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock();
- *
- *     {@literal @BeforeEach}
- *     void setup() {
- *         schemaRegistry.start();
- *     }
- *
- *     {@literal @AfterEach}
- *     void teardown() {
- *         schemaRegistry.stop();
- *     }
- *
- *     {@literal @Test}
- *     void shouldRegisterKeySchema() throws IOException, RestClientException {
- *         final Schema keySchema = this.createSchema("key_schema");
- *         final int id = this.schemaRegistry.registerKeySchema("test-topic", keySchema);
- *
- *         final Schema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getById(id);
- *         assertThat(retrievedSchema).isEqualTo(keySchema);
- *     }
- * }
- * - * To retrieve the url of the schema registry for a Kafka Streams config, please use {@link #getUrl()} - */ -@Slf4j -public class SchemaRegistryMock { - private static final String ALL_SUBJECT_PATTERN = "/subjects"; - private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions"; - private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/"; - private static final int IDENTITY_MAP_CAPACITY = 1000; - private static final String BOOLEAN_PATTERN = "false|true"; - - private final List schemaProviders; - private final SchemaRegistryClient client; - private final ListVersionsHandler listVersionsHandler; - private final GetVersionHandler getVersionHandler; - private final GetSubjectSchemaVersionHandler getSubjectSchemaVersionHandler; - private final AutoRegistrationHandler autoRegistrationHandler; - private final DeleteSubjectHandler deleteSubjectHandler; - private final AllSubjectsHandler allSubjectsHandler; - private final WireMockServer mockSchemaRegistry; - - /** - * Create a new {@code SchemaRegistryMock} from {@link SchemaProvider SchemaProviders}. - * - * @param schemaProviders List of {@link SchemaProvider}. If null, {@link AvroSchemaProvider} will be used. - */ - public SchemaRegistryMock(final List schemaProviders) { - this.schemaProviders = Optional.ofNullable(schemaProviders) - .orElseGet(() -> Collections.singletonList(new AvroSchemaProvider())); - this.client = new MockSchemaRegistryClient(schemaProviders); - this.listVersionsHandler = new ListVersionsHandler(this); - this.getVersionHandler = new GetVersionHandler(this); - this.getSubjectSchemaVersionHandler = new GetSubjectSchemaVersionHandler(this); - this.autoRegistrationHandler = new AutoRegistrationHandler(this); - this.deleteSubjectHandler = new DeleteSubjectHandler(this); - this.allSubjectsHandler = new AllSubjectsHandler(this); - this.mockSchemaRegistry = - new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort().extensions(this.extensions())); - } - - /** - * Create a new {@code SchemaRegistryMock} with default {@link SchemaProvider SchemaProviders}. - * - * @see #SchemaRegistryMock(List) - */ - public SchemaRegistryMock() { - this(null); - } - - private static MappingBuilder getSchema(final int id) { - return WireMock.get(WireMock.urlPathEqualTo(SCHEMA_BY_ID_PATTERN + (Integer) id)) - .withQueryParam("fetchMaxId", WireMock.matching(BOOLEAN_PATTERN)); - } - - private static MappingBuilder deleteSubject(final String subject) { - return WireMock.delete(WireMock.urlPathEqualTo(ALL_SUBJECT_PATTERN + "/" + subject)) - .withQueryParam("permanent", WireMock.equalTo("false")); - } - - private static MappingBuilder postSubjectVersion(final String subject) { - return WireMock.post(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/" + subject)) - .withQueryParam("deleted", WireMock.matching(BOOLEAN_PATTERN)) - .withQueryParam("normalize", WireMock.matching(BOOLEAN_PATTERN)); - } - - private static MappingBuilder getSubjectVersions(final String subject) { - return WireMock.get(WireMock.urlPathEqualTo(ALL_SUBJECT_PATTERN + "/" + subject + "/versions")) - .withQueryParam("deletedOnly", WireMock.matching(BOOLEAN_PATTERN)) - .withQueryParam("deleted", WireMock.matching(BOOLEAN_PATTERN)); - } - - private static MappingBuilder getSubject(final String subject) { - return WireMock.get( - WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/" + subject + "/versions/(?:latest|\\d+)")); - } - - private static SchemaString parsedSchemaToSchemaString(final ParsedSchema parsedSchema) { - final SchemaString schemaString = new SchemaString(parsedSchema.toString()); - schemaString.setSchemaType(parsedSchema.schemaType()); - schemaString.setReferences(parsedSchema.references()); - return schemaString; - } - - private static IllegalStateException internalError(final Exception e) { - return new IllegalStateException("Internal error in mock schema registry client", e); - } - - private static String getKeySubject(final String topic) { - return topic + "-key"; - } - - private static String getValueSubject(final String topic) { - return topic + "-value"; - } - - /** - * Start the {@code SchemaRegistryMock}. Subsequent calls will have no effect. - */ - public void start() { - if (this.mockSchemaRegistry.isRunning()) { - return; - } - - this.mockSchemaRegistry.start(); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN)) - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); - this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN)) - .willReturn(WireMock.aResponse().withTransformers(this.autoRegistrationHandler.getName()))); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_PATH_PATTERN + "/(?:latest|\\d+)")) - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(SCHEMA_BY_ID_PATTERN + "\\d+")) - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); - this.mockSchemaRegistry.stubFor(WireMock.delete(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN + "/[^/]+")) - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); - this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN)) - .willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName()))); - this.mockSchemaRegistry.stubFor(postSubjectVersion("[^/]+") - .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); - } - - /** - * Stop the {@code SchemaRegistryMock}. - */ - public void stop() { - this.mockSchemaRegistry.stop(); - } - - /** - * Register a key {@link Schema} for the topic - * - * @param topic topic to register key schema for - * @param schema schema to be registered - * @return Schema id - * @see #registerKeySchema(String, ParsedSchema) - */ - public int registerKeySchema(final String topic, final Schema schema) { - return this.register(getKeySubject(topic), new AvroSchema(schema)); - } - - /** - * Register a value {@link Schema} for the topic - * - * @param topic topic to register value schema for - * @param schema schema to be registered - * @return Schema id - * @see #registerValueSchema(String, ParsedSchema) - */ - public int registerValueSchema(final String topic, final Schema schema) { - return this.register(getValueSubject(topic), new AvroSchema(schema)); - } - - /** - * Register a key {@link ParsedSchema} for the topic using {@link TopicNameStrategy} - * - * @param topic topic to register key schema for - * @param schema schema to be registered - * @return Schema id - */ - public int registerKeySchema(final String topic, final ParsedSchema schema) { - return this.register(getKeySubject(topic), schema); - } - - /** - * Register a value {@link ParsedSchema} for the topic using {@link TopicNameStrategy} - * - * @param topic topic to register value schema for - * @param schema schema to be registered - * @return Schema id - */ - public int registerValueSchema(final String topic, final ParsedSchema schema) { - return this.register(getValueSubject(topic), schema); - } - - /** - * Delete all key schemas associated with the given subject. Uses {@link TopicNameStrategy}. - * - * @param subject subject to delete key schemas for - * @return Ids of deleted schemas - */ - public List deleteKeySchema(final String subject) { - return this.delete(getKeySubject(subject)); - } - - /** - * Delete all value schemas associated with the given subject. Uses {@link TopicNameStrategy}. - * - * @param subject subject to delete value schemas for - * @return Ids of deleted schemas - */ - public List deleteValueSchema(final String subject) { - return this.delete(getValueSubject(subject)); - } - - /** - * Create {@link CachedSchemaRegistryClient} for this mock using no additional config. - * - * @return {@code SchemaRegistryClient} - * @see #getSchemaRegistryClient(Map) - */ - public SchemaRegistryClient getSchemaRegistryClient() { - return this.getSchemaRegistryClient(Collections.emptyMap()); - } - - public String getUrl() { - return "http://localhost:" + this.mockSchemaRegistry.port(); - } - - /** - * Create {@link CachedSchemaRegistryClient} for this mock. - * - * @param config config passed to - * {@link CachedSchemaRegistryClient#CachedSchemaRegistryClient(List, int, List, Map)} - * @return {@code SchemaRegistryClient} - */ - public SchemaRegistryClient getSchemaRegistryClient(final Map config) { - return new CachedSchemaRegistryClient(Collections.singletonList(this.getUrl()), IDENTITY_MAP_CAPACITY, - this.schemaProviders, - config); - } - - int register(final String subject, final ParsedSchema schema) { - try { - final int id = this.client.register(subject, schema); - log.debug("Registered schema {}", id); - // add stubs for the new subject - this.mockSchemaRegistry.stubFor(getSchema(id) - .willReturn(ResponseDefinitionBuilder.okForJson(parsedSchemaToSchemaString(schema)))); - this.mockSchemaRegistry.stubFor(deleteSubject(subject) - .willReturn(WireMock.aResponse().withTransformers(this.deleteSubjectHandler.getName()))); - this.mockSchemaRegistry.stubFor(getSubjectVersions(subject) - .willReturn(WireMock.aResponse().withTransformers(this.listVersionsHandler.getName()))); - this.mockSchemaRegistry.stubFor(getSubject(subject) - .willReturn(WireMock.aResponse().withTransformers(this.getVersionHandler.getName()))); - this.mockSchemaRegistry.stubFor(postSubjectVersion(subject) - .willReturn(WireMock.aResponse().withTransformers(this.getSubjectSchemaVersionHandler.getName()))); - - return id; - } catch (final IOException | RestClientException e) { - throw internalError(e); - } - } - - List delete(final String subject) { - try { - final List ids = this.client.deleteSubject(subject); - - // remove stub for each version as well as stubs for subject - ids.forEach(id -> this.mockSchemaRegistry.removeStub(getSchema(id))); - this.mockSchemaRegistry.removeStub(deleteSubject(subject)); - this.mockSchemaRegistry.removeStub(getSubjectVersions(subject)); - this.mockSchemaRegistry.removeStub(getSubject(subject)); - this.mockSchemaRegistry.removeStub(postSubjectVersion(subject)); - return ids; - } catch (final IOException | RestClientException e) { - throw internalError(e); - } - } - - List listVersions(final String subject) { - log.debug("Listing all versions for subject {}", subject); - try { - return this.client.getAllVersions(subject); - } catch (final IOException | RestClientException e) { - throw internalError(e); - } - } - - io.confluent.kafka.schemaregistry.client.rest.entities.Schema getSchema(final String subject, - final ParsedSchema parsedSchema) { - log.debug("Getting schema version for subject {}", subject); - try { - final int version = this.client.getVersion(subject, parsedSchema); - final int id = this.client.getId(subject, parsedSchema); - return new io.confluent.kafka.schemaregistry.client.rest.entities.Schema(subject, version, id, - parsedSchema.schemaType(), parsedSchema.references(), parsedSchema.canonicalString()); - } catch (final IOException | RestClientException e) { - throw internalError(e); - } - } - - SchemaMetadata getSubjectVersion(final String subject, final Object version) { - log.debug("Requesting version {} for subject {}", version, subject); - try { - if (version instanceof String && "latest".equals(version)) { - return this.client.getLatestSchemaMetadata(subject); - } else if (version instanceof Number) { - return this.client.getSchemaMetadata(subject, ((Number) version).intValue()); - } else { - throw new IllegalArgumentException("Only 'latest' or integer versions are allowed"); - } - } catch (final IOException | RestClientException e) { - throw internalError(e); - } - } - - Collection listAllSubjects() { - try { - return this.client.getAllSubjects(); - } catch (final IOException | RestClientException e) { - throw internalError(e); - } - } - - ParsedSchema parseSchema(final RegisterSchemaRequest registerSchemaRequest) { - final String schemaType = registerSchemaRequest.getSchemaType(); - final String schema = registerSchemaRequest.getSchema(); - final List references = Optional.ofNullable(registerSchemaRequest.getReferences()) - .orElse(Collections.emptyList()); - final Optional schemaOptional = this.client.parseSchema(schemaType, schema, references); - return schemaOptional.orElseThrow(() -> new RuntimeException("Could not parse schema")); - } - - private Extension[] extensions() { - return Stream.of( - this.autoRegistrationHandler, - this.listVersionsHandler, - this.getVersionHandler, - this.getSubjectSchemaVersionHandler, - this.deleteSubjectHandler, - this.allSubjectsHandler - ).map(ErrorResponseTransformer::new).toArray(Extension[]::new); - } - -} diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SubjectsHandler.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SubjectsHandler.java deleted file mode 100644 index 9821f3d..0000000 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SubjectsHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformerV2; -import com.github.tomakehurst.wiremock.http.Request; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; - -abstract class SubjectsHandler implements ResponseDefinitionTransformerV2 { - // Expected url pattern /subjects(/.*-value/versions) - protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings(); - - @Override - public boolean applyGlobally() { - return false; - } - - protected String getSubject(final Request request) { - return Iterables.get(this.urlSplitter.split(request.getUrl()), 1); - } - - static String removeQueryParameters(final String url) { - final int index = url.indexOf('?'); - return index == -1 ? url : url.substring(0, index); - } -} diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ErrorResponseTransformerTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ErrorResponseTransformerTest.java deleted file mode 100644 index 356d1f7..0000000 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ErrorResponseTransformerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import com.github.tomakehurst.wiremock.client.WireMock; -import com.github.tomakehurst.wiremock.core.WireMockConfiguration; -import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformerV2; -import com.github.tomakehurst.wiremock.http.ResponseDefinition; -import com.github.tomakehurst.wiremock.junit5.WireMockExtension; -import com.github.tomakehurst.wiremock.stubbing.ServeEvent; -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import org.assertj.core.api.SoftAssertions; -import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; -import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.extension.RegisterExtension; - -@ExtendWith(SoftAssertionsExtension.class) -class ErrorResponseTransformerTest { - @InjectSoftAssertions - private SoftAssertions softly; - - static final ErrorResponseTransformer TRANSFORMER = - new ErrorResponseTransformer(new FailingResponseTransformer()); - - @RegisterExtension - static WireMockExtension wireMock = WireMockExtension.newInstance() - .options(WireMockConfiguration.wireMockConfig() - .dynamicPort() - .extensions(TRANSFORMER)) - .build(); - - @Test - void shouldTransformError() { - wireMock.stubFor(WireMock.any(WireMock.anyUrl()) - .willReturn(WireMock.aResponse().withTransformers(TRANSFORMER.getName()))); - - final CachedSchemaRegistryClient cachedSchemaRegistryClient = - new CachedSchemaRegistryClient(wireMock.baseUrl(), 10); - - this.softly.assertThatExceptionOfType(RestClientException.class) - .isThrownBy(cachedSchemaRegistryClient::getAllSubjects) - .satisfies(e -> { - this.softly.assertThat(e.getErrorCode()).isEqualTo(500); - this.softly.assertThat(e.getStatus()).isEqualTo(500); - this.softly.assertThat(e).hasMessageContaining("Test error"); - }); - } - - static class FailingResponseTransformer implements ResponseDefinitionTransformerV2 { - - @Override - public ResponseDefinition transform(final ServeEvent serveEvent) { - throw new RuntimeException("Test error"); - } - - @Override - public String getName() { - return this.getClass().getSimpleName(); - } - } -} diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ProtobufRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ProtobufRegistryMockTest.java deleted file mode 100644 index c8a0cfd..0000000 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/ProtobufRegistryMockTest.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata GmbH - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package com.bakdata.schemaregistrymock; - -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; - -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; -import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class ProtobufRegistryMockTest { - private final SchemaRegistryMock schemaRegistry; - private final ParsedSchema schema; - - ProtobufRegistryMockTest() throws IOException { - this.schemaRegistry = new SchemaRegistryMock(Collections.singletonList(new ProtobufSchemaProvider())); - - try (final InputStream input = ProtobufRegistryMockTest.class.getResourceAsStream("/record.proto"); - final BufferedReader reader = new BufferedReader(new InputStreamReader(input))) { - this.schema = new ProtobufSchema(reader.lines().collect(Collectors.joining("\n"))); - } - } - - @BeforeEach - void start() { - this.schemaRegistry.start(); - } - - @AfterEach - void stop() { - this.schemaRegistry.stop(); - } - - @Test - void shouldRegisterKeySchema() throws IOException, RestClientException { - final int id = this.schemaRegistry.registerKeySchema("test-topic", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldRegisterValueSchema() throws IOException, RestClientException { - final int id = this.schemaRegistry.registerValueSchema("test-topic", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException { - final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-key", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldRegisterValueSchemaWithClient() throws IOException, RestClientException { - final int id = this.schemaRegistry.getSchemaRegistryClient().register("test-topic-value", this.schema); - - final ParsedSchema retrievedSchema = this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldHaveSchemaVersions() throws IOException, RestClientException { - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, this.schema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - final String schemaString = metadata.getSchema(); - final ParsedSchema retrievedSchema = new ProtobufSchema(schemaString); - assertThat(retrievedSchema).isEqualTo(this.schema); - } - - @Test - void shouldReturnAllSubjects() throws IOException, RestClientException { - this.schemaRegistry.registerKeySchema("test-topic", this.schema); - this.schemaRegistry.registerValueSchema("test-topic", this.schema); - final Collection allSubjects = this.schemaRegistry.getSchemaRegistryClient().getAllSubjects(); - assertThat(allSubjects).hasSize(2).containsExactly("test-topic-key", "test-topic-value"); - } - - - @Test - void shouldDeleteKeySchema() throws IOException, RestClientException { - this.schemaRegistry.registerKeySchema("test-topic", this.schema); - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key"); - this.schemaRegistry.deleteKeySchema("test-topic"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteValueSchema() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerValueSchema("test-topic", this.schema); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value"); - this.schemaRegistry.deleteValueSchema("test-topic"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteKeySchemaWithClient() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerKeySchema("test-topic", this.schema); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key"); - client.deleteSubject("test-topic-key"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteValueSchemaWithClient() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerValueSchema("test-topic", this.schema); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value"); - client.deleteSubject("test-topic-value"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException { - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, this.schema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) - .isNotNull(); - this.schemaRegistry.deleteValueSchema(topic); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient() - .getSchemaMetadata(topic + "-value", versions.get(0))) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy( - () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - } - -} diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java deleted file mode 100644 index cd4e053..0000000 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2023 bakdata GmbH - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.bakdata.schemaregistrymock; - -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; - -import io.confluent.kafka.schemaregistry.ParsedSchema; -import io.confluent.kafka.schemaregistry.avro.AvroSchema; -import io.confluent.kafka.schemaregistry.client.SchemaMetadata; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import org.apache.avro.Schema; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -class SchemaRegistryMockTest { - private final SchemaRegistryMock schemaRegistry = new SchemaRegistryMock(); - - private static Schema createSchema(final String name) { - return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); - } - - @BeforeEach - void start() { - this.schemaRegistry.start(); - } - - @AfterEach - void stop() { - this.schemaRegistry.stop(); - } - - @Test - void shouldRegisterKeySchema() throws IOException, RestClientException { - final Schema keySchema = createSchema("key_schema"); - final int id = this.schemaRegistry.registerKeySchema("test-topic", keySchema); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(keySchema); - } - - @Test - void shouldRegisterValueSchema() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final int id = this.schemaRegistry.registerValueSchema("test-topic", valueSchema); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(valueSchema); - } - - @Test - void shouldRegisterKeySchemaWithClient() throws IOException, RestClientException { - final Schema keySchema = createSchema("key_schema"); - final int id = - this.schemaRegistry.getSchemaRegistryClient().register("test-topic-key", new AvroSchema(keySchema)); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(keySchema); - } - - @Test - void shouldRegisterValueSchemaWithClient() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final int id = - this.schemaRegistry.getSchemaRegistryClient().register("test-topic-value", new AvroSchema(valueSchema)); - - final AvroSchema retrievedSchema = (AvroSchema) this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id); - assertThat(retrievedSchema.rawSchema()).isEqualTo(valueSchema); - } - - @Test - void shouldHaveSchemaVersions() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - final String schemaString = metadata.getSchema(); - final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema); - } - - @Test - void shouldNotHaveSchemaVersionsForUnknownSubject() { - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions("does_not_exist")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata("does_not_exist", 0)) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - } - - @Test - void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { - final Schema valueSchema1 = createSchema("value_schema"); - final String topic = "test-topic"; - final int id1 = this.schemaRegistry.registerValueSchema(topic, valueSchema1); - - final List fields = Collections.singletonList( - new Schema.Field("f1", Schema.create(Schema.Type.STRING), "", null)); - final Schema valueSchema2 = Schema.createRecord("value_schema", "no doc", "", false, fields); - final int id2 = this.schemaRegistry.registerValueSchema(topic, valueSchema2); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(2); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); - final int metadataId = metadata.getId(); - assertThat(metadataId) - .isNotEqualTo(id1) - .isEqualTo(id2); - final String schemaString = metadata.getSchema(); - final Schema retrievedSchema = new Schema.Parser().parse(schemaString); - assertThat(retrievedSchema).isEqualTo(valueSchema2); - } - - @Test - void shouldNotHaveLatestSchemaVersionForUnknownSubject() { - assertThatExceptionOfType(RestClientException.class) - .isThrownBy( - () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - } - - @Test - void shouldReturnAllSubjects() throws IOException, RestClientException { - this.schemaRegistry.registerKeySchema("test-topic", createSchema("key_schema")); - this.schemaRegistry.registerValueSchema("test-topic", createSchema("value_schema")); - final Collection allSubjects = this.schemaRegistry.getSchemaRegistryClient().getAllSubjects(); - assertThat(allSubjects).hasSize(2).containsExactly("test-topic-key", "test-topic-value"); - } - - @Test - void shouldReturnEmptyListForNoSubjects() throws IOException, RestClientException { - final Collection allSubjects = this.schemaRegistry.getSchemaRegistryClient().getAllSubjects(); - assertThat(allSubjects).isEmpty(); - } - - @Test - void shouldDeleteKeySchema() throws IOException, RestClientException { - this.schemaRegistry.registerKeySchema("test-topic", createSchema("key_schema")); - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key"); - this.schemaRegistry.deleteKeySchema("test-topic"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteValueSchema() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerValueSchema("test-topic", createSchema("value_schema")); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value"); - this.schemaRegistry.deleteValueSchema("test-topic"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteKeySchemaWithClient() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerKeySchema("test-topic", createSchema("key_schema")); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-key"); - client.deleteSubject("test-topic-key"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldDeleteValueSchemaWithClient() throws IOException, RestClientException { - final SchemaRegistryClient client = this.schemaRegistry.getSchemaRegistryClient(); - this.schemaRegistry.registerValueSchema("test-topic", createSchema("value_schema")); - final Collection allSubjects = client.getAllSubjects(); - assertThat(allSubjects).hasSize(1).containsExactly("test-topic-value"); - client.deleteSubject("test-topic-value"); - final Collection subjectsAfterDeletion = client.getAllSubjects(); - assertThat(subjectsAfterDeletion).isEmpty(); - } - - @Test - void shouldNotDeleteUnknownSubject() { - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().deleteSubject("does_not_exist")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - } - - @Test - void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException { - final Schema valueSchema = createSchema("value_schema"); - final String topic = "test-topic"; - final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); - - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); - assertThat(versions).hasSize(1); - - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); - assertThat(metadata.getId()).isEqualTo(id); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) - .isNotNull(); - this.schemaRegistry.deleteValueSchema(topic); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient() - .getSchemaMetadata(topic + "-value", versions.get(0))) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy( - () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient() - .getVersion(topic + "-value", new AvroSchema(valueSchema))) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getSchemaById(id)) - .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); - } - - @Test - void shouldReturnVersion() throws IOException, RestClientException { - final ParsedSchema keySchema = new AvroSchema(createSchema("key_schema")); - final ParsedSchema valueSchema = new AvroSchema(createSchema("value_schema")); - this.schemaRegistry.registerKeySchema("test-topic", keySchema); - this.schemaRegistry.registerValueSchema("test-topic", valueSchema); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", keySchema)).isEqualTo(1); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema)) - .isEqualTo(1); - } - - @Test - void shouldUpdateVersion() throws IOException, RestClientException { - final ParsedSchema keySchema1 = new AvroSchema(createSchema("key_schema1")); - final ParsedSchema keySchema2 = new AvroSchema(createSchema("key_schema2")); - this.schemaRegistry.registerKeySchema("test-topic", keySchema1); - this.schemaRegistry.registerKeySchema("test-topic", keySchema2); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", keySchema1)).isEqualTo(1); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", keySchema2)).isEqualTo(2); - } -} diff --git a/schema-registry-mock/src/test/resources/log4j2.xml b/schema-registry-mock/src/test/resources/log4j2.xml deleted file mode 100644 index 99533fd..0000000 --- a/schema-registry-mock/src/test/resources/log4j2.xml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - - - - - - - - - - - diff --git a/schema-registry-mock/src/test/resources/nested.proto b/schema-registry-mock/src/test/resources/nested.proto deleted file mode 100644 index 0593a73..0000000 --- a/schema-registry-mock/src/test/resources/nested.proto +++ /dev/null @@ -1,6 +0,0 @@ -syntax = "proto3"; -package com.bakdata.proto; - -message Nested { - int32 other_id = 1; -} diff --git a/schema-registry-mock/src/test/resources/record.proto b/schema-registry-mock/src/test/resources/record.proto deleted file mode 100644 index e1ee892..0000000 --- a/schema-registry-mock/src/test/resources/record.proto +++ /dev/null @@ -1,9 +0,0 @@ -syntax = "proto3"; -package com.bakdata.proto; - -import "nested.proto"; - -message Record { - string f1 = 1; - Nested f2 = 2; -} diff --git a/settings.gradle b/settings.gradle index a8607ea..c11bd57 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,5 +7,5 @@ pluginManagement { rootProject.name = 'fluent-kafka-streams-tests' ['', '-junit5', '-junit4'].each { suffix -> - include ":fluent-kafka-streams-tests$suffix", ":schema-registry-mock$suffix" + include ":fluent-kafka-streams-tests$suffix" }