From 43e927923af26b5499dbb633de7998718f26cd56 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Sat, 16 Nov 2019 10:59:14 +0100 Subject: [PATCH 1/6] Add schema version handler --- .../SchemaRegistryMock.java | 56 +++++++++++++++++-- .../SchemaRegistryMockTest.java | 46 +++++++++++++-- 2 files changed, 90 insertions(+), 12 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index cfba321..760c3c0 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -23,6 +23,7 @@ */ package com.bakdata.schemaregistrymock; +import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import com.github.tomakehurst.wiremock.WireMockServer; @@ -42,6 +43,7 @@ import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse; @@ -96,6 +98,7 @@ public class SchemaRegistryMock { private static final String ALL_SUBJECT_PATTERN = "/subjects"; private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions"; private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/"; + private static final String SCHEMA_VERSION_PATTERN = "/subjects/[^/]+\\?deleted=true"; private static final int IDENTITY_MAP_CAPACITY = 1000; private final ListVersionsHandler listVersionsHandler = new ListVersionsHandler(); @@ -103,10 +106,11 @@ public class SchemaRegistryMock { private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler(); private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler(); private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler(); + private final SchemaVersionHandler schemaVersionHandler = new SchemaVersionHandler(); private final WireMockServer mockSchemaRegistry = new WireMockServer( WireMockConfiguration.wireMockConfig().dynamicPort() .extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler, - this.deleteSubjectHandler, this.allSubjectsHandler)); + this.deleteSubjectHandler, this.allSubjectsHandler, this.schemaVersionHandler)); private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); private static UrlPattern getSchemaPattern(final Integer id) { @@ -139,6 +143,8 @@ public void start() { .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName()))); + this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlMatching(SCHEMA_VERSION_PATTERN)) + .willReturn(WireMock.aResponse().withTransformers(this.schemaVersionHandler.getName()))); } public void stop() { @@ -251,7 +257,7 @@ private class AutoRegistrationHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final String subject = Iterables.get(this.urlSplitter.split(request.getUrl()), 1); try { final int id = SchemaRegistryMock.this.register(subject, @@ -275,7 +281,7 @@ private class ListVersionsHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final List versions = SchemaRegistryMock.this.listVersions(this.getSubject(request)); log.debug("Got versions {}", versions); return ResponseDefinitionBuilder.jsonResponse(versions); @@ -291,7 +297,7 @@ private class GetVersionHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); final SchemaMetadata metadata; if (versionStr.equals("latest")) { @@ -312,7 +318,7 @@ public String getName() { private class DeleteSubjectHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final List ids = SchemaRegistryMock.this.delete(this.getSubject(request)); return ResponseDefinitionBuilder.jsonResponse(ids); } @@ -326,7 +332,7 @@ public String getName() { private class AllSubjectsHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final Collection body = SchemaRegistryMock.this.listAllSubjects(); return ResponseDefinitionBuilder.jsonResponse(body); } @@ -336,4 +342,42 @@ public String getName() { return AllSubjectsHandler.class.getSimpleName(); } } + + private class SchemaVersionHandler extends SubjectsHandler { + @Override + public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, + final FileSource files, final Parameters parameters) { + try { + final Schema schema = new Schema.Parser() + .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()); + final String subject = this.getSubject(request); + final int schemaVersion = SchemaRegistryMock.this.schemaRegistryClient.getVersion(subject, schema); + final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(subject, schema); + + return ResponseDefinitionBuilder + .jsonResponse(new io.confluent.kafka.schemaregistry.client.rest.entities.Schema( + subject, schemaVersion, schemaId, schema.toString() + )); + } catch (final IOException | RestClientException e) { + final ErrorMessage error = new ErrorMessage(HTTP_BAD_REQUEST, "Cannot fetch schema version"); + return ResponseDefinitionBuilder.jsonResponse(error, HTTP_BAD_REQUEST); + } + } + + @Override + public String getName() { + return SchemaVersionHandler.class.getSimpleName(); + } + + @Override + protected String getSubject(final Request request) { + String subject = super.getSubject(request); + // remove request parameters + if (subject.contains("?")) { + subject = subject.split("\\?")[0]; + } + return subject; + } + } + } diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index f2a1c9f..ddc7082 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -26,6 +26,7 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.confluent.kafka.schemaregistry.client.SchemaMetadata; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -97,7 +98,8 @@ void shouldHaveSchemaVersions() throws IOException, RestClientException { final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isOne(); - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); + final SchemaMetadata metadata = + this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); assertThat(metadata.getId()).isEqualTo(id); final String schemaString = metadata.getSchema(); final Schema retrievedSchema = new Schema.Parser().parse(schemaString); @@ -128,7 +130,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isEqualTo(2); - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); + final SchemaMetadata metadata = + this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); final int metadataId = metadata.getId(); assertThat(metadataId).isNotEqualTo(id1); assertThat(metadataId).isEqualTo(id2); @@ -140,7 +143,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { @Test void shouldNotHaveLatestSchemaVersionForUnknownSubject() { assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) + .isThrownBy( + () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } @@ -221,7 +225,8 @@ void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClie final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isOne(); - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); + final SchemaMetadata metadata = + this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); assertThat(metadata.getId()).isEqualTo(id); assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) .isNotNull(); @@ -230,13 +235,42 @@ void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClie .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0))) + .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient() + .getSchemaMetadata(topic + "-value", versions.get(0))) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) + .isThrownBy( + () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } + @Test + void shouldReturnValueVersion() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerValueSchema("test-topic", valueSchema); + + final Integer version = + this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema); + assertThat(version).isEqualTo(1); + } + + @Test + void shouldReturnKeyVersion() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerKeySchema("test-topic", valueSchema); + + final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", valueSchema); + assertThat(version).isEqualTo(1); + } + + @Test + void shouldNotReturnVersionForNonExistingSchema() { + final Schema test = createSchema("test"); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic", test)) + .isInstanceOf(RestClientException.class) + .hasMessage("Cannot fetch schema version; error code: 400"); + } + private static Schema createSchema(final String name) { return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); } From d60f28166d32b3ac32f3bdee77c48519f52fc680 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Sat, 16 Nov 2019 14:47:25 +0100 Subject: [PATCH 2/6] Clean up --- .../SchemaRegistryMock.java | 9 ++++-- .../SchemaRegistryMockTest.java | 30 ++++++++----------- 2 files changed, 18 insertions(+), 21 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index 760c3c0..d0a0cdc 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -56,11 +56,14 @@ /** *

The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.

- * In particular, + * In particular, you can *
    - *
  • you can register a schema
  • - *
  • retrieve a schema by id.
  • + *
  • register a schema
  • + *
  • retrieve a schema by id
  • *
  • list and get schema versions of a subject
  • + *
  • list all subjects
  • + *
  • delete a schema
  • + *
  • retrieve the version of a schema
  • *
* *

If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index ddc7082..ae2ed70 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -98,8 +98,8 @@ void shouldHaveSchemaVersions() throws IOException, RestClientException { final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isOne(); - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); + final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient() + .getSchemaMetadata(topic + "-value", versions.get(0)); assertThat(metadata.getId()).isEqualTo(id); final String schemaString = metadata.getSchema(); final Schema retrievedSchema = new Schema.Parser().parse(schemaString); @@ -130,8 +130,7 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isEqualTo(2); - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); + final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); final int metadataId = metadata.getId(); assertThat(metadataId).isNotEqualTo(id1); assertThat(metadataId).isEqualTo(id2); @@ -143,8 +142,7 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { @Test void shouldNotHaveLatestSchemaVersionForUnknownSubject() { assertThatExceptionOfType(RestClientException.class) - .isThrownBy( - () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) + .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } @@ -215,32 +213,29 @@ void shouldNotDeleteUnknownSubject() { .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } - @Test void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClientException { final Schema valueSchema = createSchema("value_schema"); final String topic = "test-topic"; final int id = this.schemaRegistry.registerValueSchema(topic, valueSchema); - final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); + final SchemaRegistryClient schemaRegistryClient = this.schemaRegistry.getSchemaRegistryClient(); + final List versions = schemaRegistryClient.getAllVersions(topic + "-value"); assertThat(versions.size()).isOne(); - final SchemaMetadata metadata = - this.schemaRegistry.getSchemaRegistryClient().getSchemaMetadata(topic + "-value", versions.get(0)); + final SchemaMetadata metadata = schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0)); assertThat(metadata.getId()).isEqualTo(id); - assertThat(this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) + assertThat(schemaRegistryClient.getLatestSchemaMetadata(topic + "-value")) .isNotNull(); this.schemaRegistry.deleteValueSchema(topic); assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value")) + .isThrownBy(() -> schemaRegistryClient.getAllVersions(topic + "-value")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient() - .getSchemaMetadata(topic + "-value", versions.get(0))) + .isThrownBy(() -> schemaRegistryClient.getSchemaMetadata(topic + "-value", versions.get(0))) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); assertThatExceptionOfType(RestClientException.class) - .isThrownBy( - () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value")) + .isThrownBy(() -> schemaRegistryClient.getLatestSchemaMetadata(topic + "-value")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } @@ -249,8 +244,7 @@ void shouldReturnValueVersion() throws IOException, RestClientException { final Schema valueSchema = createSchema("value_schema"); this.schemaRegistry.registerValueSchema("test-topic", valueSchema); - final Integer version = - this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema); + final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema); assertThat(version).isEqualTo(1); } From e1876462d862213090698b27060480f90eb2dafe Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Wed, 20 Nov 2019 07:07:49 +0100 Subject: [PATCH 3/6] Remove alignment --- .../schemaregistrymock/SchemaRegistryMock.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index d0a0cdc..8476d09 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -260,7 +260,7 @@ private class AutoRegistrationHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final String subject = Iterables.get(this.urlSplitter.split(request.getUrl()), 1); try { final int id = SchemaRegistryMock.this.register(subject, @@ -284,7 +284,7 @@ private class ListVersionsHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final List versions = SchemaRegistryMock.this.listVersions(this.getSubject(request)); log.debug("Got versions {}", versions); return ResponseDefinitionBuilder.jsonResponse(versions); @@ -300,7 +300,7 @@ private class GetVersionHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final String versionStr = Iterables.get(this.urlSplitter.split(request.getUrl()), 3); final SchemaMetadata metadata; if (versionStr.equals("latest")) { @@ -321,7 +321,7 @@ public String getName() { private class DeleteSubjectHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final List ids = SchemaRegistryMock.this.delete(this.getSubject(request)); return ResponseDefinitionBuilder.jsonResponse(ids); } @@ -335,7 +335,7 @@ public String getName() { private class AllSubjectsHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { final Collection body = SchemaRegistryMock.this.listAllSubjects(); return ResponseDefinitionBuilder.jsonResponse(body); } @@ -349,7 +349,7 @@ public String getName() { private class SchemaVersionHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, - final FileSource files, final Parameters parameters) { + final FileSource files, final Parameters parameters) { try { final Schema schema = new Schema.Parser() .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()); From d6919ad1fc50b50fec9016cb2e3f4a34846cceee Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Wed, 20 Nov 2019 15:42:41 +0100 Subject: [PATCH 4/6] Remove deleted parameter --- .../SchemaRegistryMock.java | 87 ++++++++++----- .../SchemaRegistryMockTest.java | 102 ++++++++++++++++-- 2 files changed, 152 insertions(+), 37 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index 8476d09..913f032 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -23,7 +23,7 @@ */ package com.bakdata.schemaregistrymock; -import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import com.github.tomakehurst.wiremock.WireMockServer; @@ -37,6 +37,7 @@ import com.github.tomakehurst.wiremock.http.ResponseDefinition; import com.github.tomakehurst.wiremock.matching.UrlPathPattern; import com.github.tomakehurst.wiremock.matching.UrlPattern; +import com.google.common.base.CharMatcher; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; @@ -63,7 +64,7 @@ *

  • list and get schema versions of a subject
  • *
  • list all subjects
  • *
  • delete a schema
  • - *
  • retrieve the version of a schema
  • + *
  • retrieve version and subject of a schema
  • * * *

    If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at @@ -101,7 +102,7 @@ public class SchemaRegistryMock { private static final String ALL_SUBJECT_PATTERN = "/subjects"; private static final String SCHEMA_PATH_PATTERN = "/subjects/[^/]+/versions"; private static final String SCHEMA_BY_ID_PATTERN = "/schemas/ids/"; - private static final String SCHEMA_VERSION_PATTERN = "/subjects/[^/]+\\?deleted=true"; + private static final String SPECIFIC_SCHEMA_PATTERN = "/subjects/[^/]+"; private static final int IDENTITY_MAP_CAPACITY = 1000; private final ListVersionsHandler listVersionsHandler = new ListVersionsHandler(); @@ -109,11 +110,11 @@ public class SchemaRegistryMock { private final AutoRegistrationHandler autoRegistrationHandler = new AutoRegistrationHandler(); private final DeleteSubjectHandler deleteSubjectHandler = new DeleteSubjectHandler(); private final AllSubjectsHandler allSubjectsHandler = new AllSubjectsHandler(); - private final SchemaVersionHandler schemaVersionHandler = new SchemaVersionHandler(); + private final SchemaHandler schemaHandler = new SchemaHandler(); private final WireMockServer mockSchemaRegistry = new WireMockServer( WireMockConfiguration.wireMockConfig().dynamicPort() .extensions(this.autoRegistrationHandler, this.listVersionsHandler, this.getVersionHandler, - this.deleteSubjectHandler, this.allSubjectsHandler, this.schemaVersionHandler)); + this.deleteSubjectHandler, this.allSubjectsHandler, this.schemaHandler)); private final SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient(); private static UrlPattern getSchemaPattern(final Integer id) { @@ -146,8 +147,8 @@ public void start() { .willReturn(WireMock.aResponse().withStatus(HTTP_NOT_FOUND))); this.mockSchemaRegistry.stubFor(WireMock.get(WireMock.urlPathMatching(ALL_SUBJECT_PATTERN)) .willReturn(WireMock.aResponse().withTransformers(this.allSubjectsHandler.getName()))); - this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlMatching(SCHEMA_VERSION_PATTERN)) - .willReturn(WireMock.aResponse().withTransformers(this.schemaVersionHandler.getName()))); + this.mockSchemaRegistry.stubFor(WireMock.post(WireMock.urlPathMatching(SPECIFIC_SCHEMA_PATTERN)) + .willReturn(WireMock.aResponse().withTransformers(this.schemaHandler.getName()))); } public void stop() { @@ -243,8 +244,8 @@ private Collection listAllSubjects() { } private abstract static class SubjectsHandler extends ResponseDefinitionTransformer { - // Expected url pattern /subjects(/.*-value/versions) - protected final Splitter urlSplitter = Splitter.on('/').omitEmptyStrings(); + // Expected url pattern /subjects(/.*-value/(versions|?param)) + protected final Splitter urlSplitter = Splitter.on(CharMatcher.anyOf("/?")).omitEmptyStrings(); @Override public boolean applyGlobally() { @@ -346,41 +347,69 @@ public String getName() { } } - private class SchemaVersionHandler extends SubjectsHandler { + private class SchemaHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, final FileSource files, final Parameters parameters) { + final String requestSubject = this.getSubject(request); + final boolean deletedAllowed = this.isDeletedAllowed(request); + // Check if requestSubject exists. This is required because the mock always throws an exception + // with 'Schema not found' + final boolean subjectExists = SchemaRegistryMock.this.listAllSubjects().stream() + .anyMatch(subject -> subject.equals(requestSubject)); + + if (!subjectExists) { + final ErrorMessage error = new ErrorMessage(40401, "Subject not found"); + return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND); + } + + final Schema schema; try { - final Schema schema = new Schema.Parser() + schema = new Schema.Parser() .parse(RegisterSchemaRequest.fromJson(request.getBodyAsString()).getSchema()); - final String subject = this.getSubject(request); - final int schemaVersion = SchemaRegistryMock.this.schemaRegistryClient.getVersion(subject, schema); - final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(subject, schema); + } catch (final IOException e) { + final ErrorMessage error = + new ErrorMessage(HTTP_INTERNAL_ERROR, "Error while looking up schema under subject topic"); + return ResponseDefinitionBuilder.jsonResponse(error, HTTP_INTERNAL_ERROR); + } + try { + final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(requestSubject, schema); + final int schemaVersion = this.getSchemaVersion(requestSubject, deletedAllowed, schema); return ResponseDefinitionBuilder .jsonResponse(new io.confluent.kafka.schemaregistry.client.rest.entities.Schema( - subject, schemaVersion, schemaId, schema.toString() - )); - } catch (final IOException | RestClientException e) { - final ErrorMessage error = new ErrorMessage(HTTP_BAD_REQUEST, "Cannot fetch schema version"); - return ResponseDefinitionBuilder.jsonResponse(error, HTTP_BAD_REQUEST); + requestSubject, schemaVersion, schemaId, schema.toString())); + } catch (final RestClientException | IOException e) { + final ErrorMessage error = new ErrorMessage(40403, "Schema not found"); + return ResponseDefinitionBuilder.jsonResponse(error, HTTP_NOT_FOUND); } } - @Override - public String getName() { - return SchemaVersionHandler.class.getSimpleName(); + private int getSchemaVersion(final String requestSubject, final boolean deletedAllowed, final Schema schema) + throws IOException, RestClientException { + final int schemaVersion; + if (deletedAllowed) { + schemaVersion = SchemaRegistryMock.this.schemaRegistryClient.getVersion(requestSubject, schema); + } else { + // throws an exception if schema was deleted + schemaVersion = SchemaRegistryMock.this.getSchemaRegistryClient().getVersion(requestSubject, schema); + } + return schemaVersion; } - @Override - protected String getSubject(final Request request) { - String subject = super.getSubject(request); - // remove request parameters - if (subject.contains("?")) { - subject = subject.split("\\?")[0]; + private boolean isDeletedAllowed(final Request request) { + boolean deletedAllowed = false; + if (request.getUrl().contains("?deleted=")) { + deletedAllowed = Boolean.parseBoolean(request.getUrl().split("=")[1]); } - return subject; + return deletedAllowed; + } + + @Override + public String getName() { + return SchemaHandler.class.getSimpleName(); } + } } diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index ae2ed70..d6a17e6 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -130,7 +130,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { final List versions = this.schemaRegistry.getSchemaRegistryClient().getAllVersions(topic + "-value"); assertThat(versions.size()).isEqualTo(2); - final SchemaMetadata metadata = this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); + final SchemaMetadata metadata = + this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata(topic + "-value"); final int metadataId = metadata.getId(); assertThat(metadataId).isNotEqualTo(id1); assertThat(metadataId).isEqualTo(id2); @@ -142,7 +143,8 @@ void shouldHaveLatestSchemaVersion() throws IOException, RestClientException { @Test void shouldNotHaveLatestSchemaVersionForUnknownSubject() { assertThatExceptionOfType(RestClientException.class) - .isThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) + .isThrownBy( + () -> this.schemaRegistry.getSchemaRegistryClient().getLatestSchemaMetadata("does_not_exist")) .satisfies(e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)); } @@ -240,16 +242,17 @@ void shouldNotHaveSchemaVersionsForDeletedSubject() throws IOException, RestClie } @Test - void shouldReturnValueVersion() throws IOException, RestClientException { + void shouldReturnValueSchemaVersion() throws IOException, RestClientException { final Schema valueSchema = createSchema("value_schema"); this.schemaRegistry.registerValueSchema("test-topic", valueSchema); - final Integer version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema); + final Integer version = + this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", valueSchema); assertThat(version).isEqualTo(1); } @Test - void shouldReturnKeyVersion() throws IOException, RestClientException { + void shouldReturnKeySchemaVersion() throws IOException, RestClientException { final Schema valueSchema = createSchema("value_schema"); this.schemaRegistry.registerKeySchema("test-topic", valueSchema); @@ -257,12 +260,95 @@ void shouldReturnKeyVersion() throws IOException, RestClientException { assertThat(version).isEqualTo(1); } + @Test + void shouldReturnValueSchemaId() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerValueSchema("test-topic", valueSchema); + + final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", valueSchema); + assertThat(id).isEqualTo(1); + } + + @Test + void shouldReturnKeySchemaId() throws IOException, RestClientException { + final Schema valueSchema = createSchema("value_schema"); + this.schemaRegistry.registerKeySchema("test-topic", valueSchema); + + final Integer id = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", valueSchema); + assertThat(id).isEqualTo(1); + } + @Test void shouldNotReturnVersionForNonExistingSchema() { final Schema test = createSchema("test"); - assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic", test)) - .isInstanceOf(RestClientException.class) - .hasMessage("Cannot fetch schema version; error code: 400"); + final Schema other = createSchema("other"); + this.schemaRegistry.registerValueSchema("test-topic", other); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Schema not found; error code: 40403"); + } + + @Test + void shouldNotReturnIdForNonExistingSchema() { + final Schema test = createSchema("test"); + final Schema other = createSchema("other"); + this.schemaRegistry.registerValueSchema("test-topic", other); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Schema not found; error code: 40403"); + } + + @Test + void shouldNotReturnVersionForNonExistingSubject() { + final Schema test = createSchema("test"); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Subject not found; error code: 40401"); + } + + @Test + void shouldNotReturnIdForNonExistingSubject() { + final Schema test = createSchema("test"); + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-value", test)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Subject not found; error code: 40401"); + } + + @Test + void shouldReturnVersionForDeletedSchema() throws IOException, RestClientException { + final Schema testSchema = createSchema("value_schema"); + this.schemaRegistry.registerKeySchema("test-topic", testSchema); + + final int version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", testSchema); + assertThat(version).isEqualTo(1); + + this.schemaRegistry.deleteValueSchema("test-topic"); + this.schemaRegistry.registerValueSchema("test-topic", createSchema("new_schema")); + + final Integer versionAfterDeletion = + this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", testSchema); + assertThat(versionAfterDeletion).isEqualTo(1); + } + + @Test + void shouldNotReturnIdForDeletedSchema() throws IOException, RestClientException { + final Schema testSchema = createSchema("value_schema"); + this.schemaRegistry.registerKeySchema("test-topic", testSchema); + + final Integer version = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", testSchema); + assertThat(version).isEqualTo(1); + + this.schemaRegistry.deleteKeySchema("test-topic"); + this.schemaRegistry.registerKeySchema("test-topic", createSchema("new_schema")); + + assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", testSchema)) + .isInstanceOfSatisfying(RestClientException.class, + e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) + .hasMessage("Schema not found; error code: 40403"); } private static Schema createSchema(final String name) { From 30ba507507fb1a1ddc641eb2aec8c7d46cb194f6 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Thu, 21 Nov 2019 10:57:20 +0100 Subject: [PATCH 5/6] Use query parameter --- .../schemaregistrymock/SchemaRegistryMock.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index 913f032..7c7caf2 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -33,6 +33,7 @@ import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.extension.Parameters; import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformer; +import com.github.tomakehurst.wiremock.http.QueryParameter; import com.github.tomakehurst.wiremock.http.Request; import com.github.tomakehurst.wiremock.http.ResponseDefinition; import com.github.tomakehurst.wiremock.matching.UrlPathPattern; @@ -59,12 +60,12 @@ *

    The schema registry mock implements a few basic HTTP endpoints that are used by the Avro serdes.

    * In particular, you can *
      - *
    • register a schema
    • + *
    • register a schema
    • *
    • retrieve a schema by id
    • *
    • list and get schema versions of a subject
    • *
    • list all subjects
    • - *
    • delete a schema
    • - *
    • retrieve version and subject of a schema
    • + *
    • delete a subject
    • + *
    • retrieve version and id of a schema
    • *
    * *

    If you use the TestTopology of the fluent Kafka Streams test, you don't have to interact with this class at @@ -399,8 +400,9 @@ private int getSchemaVersion(final String requestSubject, final boolean deletedA private boolean isDeletedAllowed(final Request request) { boolean deletedAllowed = false; - if (request.getUrl().contains("?deleted=")) { - deletedAllowed = Boolean.parseBoolean(request.getUrl().split("=")[1]); + final QueryParameter deleted = request.queryParameter("deleted"); + if (deleted.isPresent()) { + deletedAllowed = Boolean.parseBoolean(deleted.firstValue()); } return deletedAllowed; } From e17912596e49742de8d440981f5aefca9a62a651 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Thu, 21 Nov 2019 12:02:59 +0100 Subject: [PATCH 6/6] Revert attempts to handle deleted schema --- .../SchemaRegistryMock.java | 32 +++++------------- .../SchemaRegistryMockTest.java | 33 ------------------- 2 files changed, 8 insertions(+), 57 deletions(-) diff --git a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java index 7c7caf2..5c14d10 100644 --- a/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java +++ b/schema-registry-mock/src/main/java/com/bakdata/schemaregistrymock/SchemaRegistryMock.java @@ -33,7 +33,6 @@ import com.github.tomakehurst.wiremock.core.WireMockConfiguration; import com.github.tomakehurst.wiremock.extension.Parameters; import com.github.tomakehurst.wiremock.extension.ResponseDefinitionTransformer; -import com.github.tomakehurst.wiremock.http.QueryParameter; import com.github.tomakehurst.wiremock.http.Request; import com.github.tomakehurst.wiremock.http.ResponseDefinition; import com.github.tomakehurst.wiremock.matching.UrlPathPattern; @@ -348,12 +347,17 @@ public String getName() { } } + /** + * The SchemaHandler returns version and id for a given schema. + * + * Note: It returns "Schema not found, Error code 40403" for a deleted schema even if the request parameter + * 'deleted' is set to true. The MockSchemaRegistryClient does not save the version of a deleted schema. + */ private class SchemaHandler extends SubjectsHandler { @Override public ResponseDefinition transform(final Request request, final ResponseDefinition responseDefinition, final FileSource files, final Parameters parameters) { final String requestSubject = this.getSubject(request); - final boolean deletedAllowed = this.isDeletedAllowed(request); // Check if requestSubject exists. This is required because the mock always throws an exception // with 'Schema not found' final boolean subjectExists = SchemaRegistryMock.this.listAllSubjects().stream() @@ -376,7 +380,8 @@ public ResponseDefinition transform(final Request request, final ResponseDefinit try { final int schemaId = SchemaRegistryMock.this.schemaRegistryClient.getId(requestSubject, schema); - final int schemaVersion = this.getSchemaVersion(requestSubject, deletedAllowed, schema); + final int schemaVersion = + SchemaRegistryMock.this.schemaRegistryClient.getVersion(requestSubject, schema); return ResponseDefinitionBuilder .jsonResponse(new io.confluent.kafka.schemaregistry.client.rest.entities.Schema( requestSubject, schemaVersion, schemaId, schema.toString())); @@ -386,27 +391,6 @@ public ResponseDefinition transform(final Request request, final ResponseDefinit } } - private int getSchemaVersion(final String requestSubject, final boolean deletedAllowed, final Schema schema) - throws IOException, RestClientException { - final int schemaVersion; - if (deletedAllowed) { - schemaVersion = SchemaRegistryMock.this.schemaRegistryClient.getVersion(requestSubject, schema); - } else { - // throws an exception if schema was deleted - schemaVersion = SchemaRegistryMock.this.getSchemaRegistryClient().getVersion(requestSubject, schema); - } - return schemaVersion; - } - - private boolean isDeletedAllowed(final Request request) { - boolean deletedAllowed = false; - final QueryParameter deleted = request.queryParameter("deleted"); - if (deleted.isPresent()) { - deletedAllowed = Boolean.parseBoolean(deleted.firstValue()); - } - return deletedAllowed; - } - @Override public String getName() { return SchemaHandler.class.getSimpleName(); diff --git a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java index d6a17e6..b98cebe 100644 --- a/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java +++ b/schema-registry-mock/src/test/java/com/bakdata/schemaregistrymock/SchemaRegistryMockTest.java @@ -318,39 +318,6 @@ void shouldNotReturnIdForNonExistingSubject() { .hasMessage("Subject not found; error code: 40401"); } - @Test - void shouldReturnVersionForDeletedSchema() throws IOException, RestClientException { - final Schema testSchema = createSchema("value_schema"); - this.schemaRegistry.registerKeySchema("test-topic", testSchema); - - final int version = this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", testSchema); - assertThat(version).isEqualTo(1); - - this.schemaRegistry.deleteValueSchema("test-topic"); - this.schemaRegistry.registerValueSchema("test-topic", createSchema("new_schema")); - - final Integer versionAfterDeletion = - this.schemaRegistry.getSchemaRegistryClient().getVersion("test-topic-key", testSchema); - assertThat(versionAfterDeletion).isEqualTo(1); - } - - @Test - void shouldNotReturnIdForDeletedSchema() throws IOException, RestClientException { - final Schema testSchema = createSchema("value_schema"); - this.schemaRegistry.registerKeySchema("test-topic", testSchema); - - final Integer version = this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", testSchema); - assertThat(version).isEqualTo(1); - - this.schemaRegistry.deleteKeySchema("test-topic"); - this.schemaRegistry.registerKeySchema("test-topic", createSchema("new_schema")); - - assertThatThrownBy(() -> this.schemaRegistry.getSchemaRegistryClient().getId("test-topic-key", testSchema)) - .isInstanceOfSatisfying(RestClientException.class, - e -> assertThat(e.getStatus()).isEqualTo(HTTP_NOT_FOUND)) - .hasMessage("Schema not found; error code: 40403"); - } - private static Schema createSchema(final String name) { return Schema.createRecord(name, "no doc", "", false, Collections.emptyList()); }