Skip to content

Commit

Permalink
Support loading schemas from another JPMS module (#485)
Browse files Browse the repository at this point in the history
* Support loading schemas from another JPMS module

JPMS introduces issues as resources, like classes, must be exposed from _unique_ packages.
This has necessitated changing where the schemas are generated, from a flat `schema/json` directory within the jar, to a directory tree that matches the package directories, i.e. generated schema end up in the same package as their source Java type.

Main change required in this repo is in the `LocalSchemaLoader`, which needs to call `type.getClassLoader().getResource()` if its to get resources from other modules.
  • Loading branch information
big-andy-coates authored Jan 31, 2024
1 parent 6faf50a commit 8a18482
Show file tree
Hide file tree
Showing 36 changed files with 221 additions and 73 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ See [CreekService.org](https://www.creekservice.org/creek-kafka) for more info o
* **[test-serde-java-eight](test-serde-java-eight)**: test-only serde extension implementation that only registers itself
using the Java 8 `META-INF/services` method.
* **[test-service-native](test-service-native)**: test-only microservice implementation using inbuilt/native Kafka serde.
* **[test-service-json](test-service-json)**: test-only microservice implementation using [json-serde](json-serde).

### Docs:
* **[docs](docs)**: doc site accessible on https://www.creekservice.org/creek-kafka.
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ plugins {
`creek-sonatype-publishing-convention`
id("pl.allegro.tech.build.axion-release") version "1.16.1" // https://plugins.gradle.org/plugin/pl.allegro.tech.build.axion-release
id("com.bmuschko.docker-remote-api") version "9.4.0" apply false
id("org.creekservice.schema.json") version "0.4.1" apply false
id("org.creekservice.schema.json") version "0.4.2-SNAPSHOT" apply false
}

project.version = scmVersion.version
Expand Down
54 changes: 54 additions & 0 deletions docs/_docs/home.md
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ for more information on _why_ only producers register schemas in the Schema Regi
It is recommended that schemas are generated from Java classes using the [Creek JSON Schema Gradle plugin](https://github.com/creek-service/creek-json-schema-gradle-plugin).
This plugin will, by default, create the closed content model JSON schemas that this serde requires.
#### Confluent compatability
Note, the JSON serde is not currently compatible with Confluent's own JSON serde, as Confluent's serde prefixes
the serialized JSON with the schema-id. This is not necessary with Creek's JSON format.
However, there is a task to track [optionally enabling Confluent JSON serde compatability](https://github.com/creek-service/creek-kafka/issues/455)
Expand All @@ -512,6 +514,56 @@ See [this article series](https://www.creekservice.org/articles/2024/01/08/json-
and how Creek implements schema compatibility checks for JSON.
{: .notice--warning}
In its current form, the JSON serde does not persist the schema id used to serialize the key or value in the Kafka record.
This is because the schema id is not needed, as there are checks to ensure all consuming schemas are backwards compatible
with producing schemas, i.e. all consumers can consume all date produced by producers.
Why did we choose to not use the Confluent JSON schema serde?
In [our view](https://www.creekservice.org/articles/2024/01/08/json-schema-evolution-part-1.html) the current Confluent's
current JSON schema serde is not fit for purpose. Hence, coming up with our own.

Let's look at the pros and cons between the two:
| | Confluent Serde | Creek Serde |
|-----|------------------------------------------|-----------------------------------|
| 1. | Broken schema evolution | Usable schema evolution. |
| 2. | Generates schema at runtime. | Generates schema at compile-time. |
| 3. | Schemas published on first use. | Schemas published on startup. |
| 4. | Supports per-record & per-topic schemas. | Supports only per-topic schemas. |
| 5. | Compatible with Confluent UI | Unsure if compatible with UI |
| 6. | Hard to evolve a key schema | Key schemas can be evolved. |
Let's look at each of these in more detail:

1. Probably the biggest difference is how the two serde handle schema compatability.
In [our view](https://www.creekservice.org/articles/2024/01/08/json-schema-evolution-part-1.html) Confluent's
currently model just doesn't work, and we think ours is better.
2. Generating schemas at compile-time reduces service startup times,
and allows engineers the freedom to inspect schemas, and even test they are as expected or don't change unexpectedly, if they wish
3. Publishing schemas on first use has a few downsides, especially on a topic that doesn't see much traffic.
1. Schema changes that break evolvability rules are not detected on startup.
In contrast, publishing & validating schemas on service startup ensures services fail eagerly if there are issues.
2. The set of schema versions for a topic become less deterministic across environments,
as service needs to have started _and_ produced messages.
In contrast, publishing on start-up allows the schema versions in an environment to be derived from the versions of the services deployed.
4. Per-record schemas is, in our opinion, hard to manage in organisations and doesn't lend itself to have self-service data-products in Kafka.
Publishing a new record schema to a topic isn't a compatible change and can break downstream consumers if things aren't managed correctly.
Yet, with per-record schemas its very easy to publish a message with a new schema.
For these reasons, we see per-record schemas as an anti-pattern, and therefore only support per-topic schemas.
Defining the explicit type or types that can be found in a topic defines a clear _contract_ with users.
Multiple types can be better supported and polymorphism can be achieved via subtyping and JSON schema's [`anyOf`][anyOf].
5. Obviously, the Confluent JSON serde is compatible with Confluent's own UIs and therefore likely other UIs built by others
around the schema store. We've not actually checked, but it's certainly possible the UI _expects_ JSON key and values to be
prefixed with the schema id, and balks if that's not the case.
Personally, we prefer the payload being actual JSON, though we've a [planned enhancement](https://github.com/creek-service/creek-kafka/issues/455)
to support Confluent's format to allow interoperability.
6. One of the implications of prefixing the payload with the schema id, as Confluent's serde do, is that its
impossible to evolve the schema of a topic's key, unless using a custom partitioning strategy.
This is because the schema id forms part of the binary key. Evolving the schema means a new schema id,
which changes the serialised form of a specific key, meaning it may be produced to a different partition.
By not prefixing with the schema id, the Creek serde allows the key schema to be evolved. For example,
there's no reason why a new optional property can't be added.

#### Dependencies

The `creek-kafka-json-serde.jar` module has dependencies not stored in maven central.
Expand Down Expand Up @@ -574,6 +626,7 @@ public final class ServiceMain {
}
}
```
[todo]: http://Convert to snippet once released.

Manually registering subtypes is only necessary when this information is not available to Jackson already,
i.e. when a base type is annotated with `@JsonTypeInfo`, but not with `@JsonSubTypes`.
Expand Down Expand Up @@ -704,5 +757,6 @@ The `creek-kafka-serde-test` jar contains a test utility that will test a serial
[ksTest]: https://kafka.apache.org/documentation/streams/developer-guide/testing.html
[systemTest]: https://github.com/creek-service/creek-system-test
[gradle-system-test-plugin]: https://github.com/creek-service/creek-system-test-gradle-plugin
[anyOf]:https://json-schema.org/understanding-json-schema/reference/combining.html#anyof
[serviceLoader]: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html
[todo]: http://update links above once doccs migrated to creekservice.org
6 changes: 6 additions & 0 deletions json-serde/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {

testImplementation(project(":client-extension"))
testImplementation(project(":serde-test"))
testImplementation(project(":test-service-json"))
testImplementation("com.fasterxml.jackson.core:jackson-annotations:$jacksonVersion")
testImplementation("org.testcontainers:junit-jupiter:$testContainersVersion")
testImplementation("org.testcontainers:kafka:$testContainersVersion")
Expand All @@ -58,3 +59,8 @@ dependencies {

// Patch Kafka Testcontainers jar into main test containers module to avoid split packages:
modularity.patchModule("testcontainers", "kafka-$testContainersVersion.jar")

creek.schema.json {
typeScanning.packageWhiteList("org.creekservice.api.kafka.serde.json", "org.creekservice.internal.kafka.serde.json")
subTypeScanning.packageWhiteList("org.creekservice.api.kafka.serde.json", "org.creekservice.internal.kafka.serde.json")
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,32 @@
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import org.creekservice.api.base.annotation.VisibleForTesting;
import org.creekservice.api.base.type.schema.GeneratedSchemas;
import org.creekservice.api.kafka.serde.json.schema.ProducerSchema;

public final class LocalSchemaLoader {

private static final String SCHEMA_DIRECTORY = "/schema/json/";

private LocalSchemaLoader() {}

public static ProducerSchema loadFromClasspath(final Class<?> type) {
final Path schemaFile =
GeneratedSchemas.schemaFileName(type, GeneratedSchemas.yamlExtension());
return loadFromClasspath(schemaFile);
}

public static ProducerSchema loadFromClasspath(final Path schemaFile) {
final URL resource = findResource(schemaFile);
final URL resource = findResource(type);
return load(resource);
}

private static URL findResource(final Path schemaFile) {
final String path = SCHEMA_DIRECTORY + schemaFile;
final URL resource = LocalSchemaLoader.class.getResource(path);
private static URL findResource(final Class<?> type) {
final String schemaFile =
GeneratedSchemas.schemaFileName(type, GeneratedSchemas.yamlExtension());

// Load from current module file:
URL resource = type.getResource("/" + schemaFile);
if (resource == null) {
throw new SchemaResourceNotFoundException(
"Failed to load schema resource: " + path + ". Resource not found.");
// Then try loading from other modules:
resource = type.getClassLoader().getResource(schemaFile);
if (resource == null) {
throw new SchemaResourceNotFoundException(
"Failed to load schema resource: " + schemaFile + ". Resource not found.");
}
}

return resource;
Expand Down
4 changes: 2 additions & 2 deletions json-serde/src/test/java/module-info.test
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
--add-modules
org.junitpioneer,org.hamcrest,creek.observability.logging.fixtures,guava.testlib,creek.test.util,creek.test.hamcrest,creek.test.conformity,creek.kafka.serde.test,com.fasterxml.jackson.annotation,creek.kafka.clients.extension
org.junitpioneer,org.hamcrest,creek.observability.logging.fixtures,guava.testlib,creek.test.util,creek.test.hamcrest,creek.test.conformity,creek.kafka.serde.test,com.fasterxml.jackson.annotation,creek.kafka.clients.extension,creek.kafka.test.service.json

--add-reads
creek.kafka.serde.json.schema=org.junitpioneer,org.hamcrest,guava.testlib,creek.test.util,creek.test.hamcrest,creek.test.conformity,creek.kafka.serde.test,com.fasterxml.jackson.annotation,creek.kafka.clients.extension
creek.kafka.serde.json.schema=org.junitpioneer,org.hamcrest,guava.testlib,creek.test.util,creek.test.hamcrest,creek.test.conformity,creek.kafka.serde.test,com.fasterxml.jackson.annotation,creek.kafka.clients.extension,creek.kafka.test.service.json

--add-reads
creek.kafka.serde.json.schema=creek.observability.logging.fixtures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ void shouldThrowIfTypeHasNoSchema() {
e.getMessage(),
containsString(
"Failed to load schema resource:"
+ " /schema/json/org.creekservice.internal.kafka.serde.json.json_schema_serde_functional_test.yml."
+ " org/creekservice/internal/kafka/serde/json/JsonSchemaSerdeFunctionalTest.yml."
+ " Resource not found."));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@
package org.creekservice.internal.kafka.serde.json.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject;
import org.creekservice.api.base.annotation.schema.GeneratesSchema;

@GeneratesSchema
public final class WithAmbiguousFloat {

private final Object number;
private final Number number;

public WithAmbiguousFloat(@JsonProperty("number") final Object number) {
public WithAmbiguousFloat(@JsonProperty("number") final Number number) {
this.number = number;
}

@JsonSchemaInject(json = "{\"type\": \"number\"}")
public Object getNumber() {
public Number getNumber() {
return number;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.creekservice.api.kafka.serde.json.schema.ProducerSchema;
import org.creekservice.api.kafka.test.service.json.model.OutputValue;
import org.creekservice.api.test.util.TestPaths;
import org.junit.jupiter.api.Test;

Expand All @@ -40,31 +39,21 @@ public void shouldLoadTypeSchemaFromClasspath() {
assertThat(schema.toString(), containsString("$id: test_model.yml"));
}

@Test
public void shouldLoadPathSchemaFromClasspath() {
// When:
final ProducerSchema schema =
LocalSchemaLoader.loadFromClasspath(Paths.get("test-schema.yml"));

// Then:
assertThat(schema.toString(), containsString("$id: test-schema.yml"));
}

@SuppressFBWarnings("PATH_TRAVERSAL_IN")
@Test
public void shouldThrowIfSchemaNoFound() {
// When:
final Exception e =
assertThrows(
LocalSchemaLoader.SchemaResourceNotFoundException.class,
() -> LocalSchemaLoader.loadFromClasspath(Paths.get("u_wont_find_me.yml")));
() -> LocalSchemaLoader.loadFromClasspath(LocalSchemaLoaderTest.class));

// Then:
assertThat(
e.getMessage(),
is(
"Failed to load schema resource: /schema/json/u_wont_find_me.yml."
+ " Resource not found."));
"Failed to load schema resource:"
+ " org/creekservice/internal/kafka/serde/json/schema/LocalSchemaLoaderTest.yml."
+ " Resource not found."));
}

@Test
Expand All @@ -83,5 +72,14 @@ public void shouldLoadSchemaFromWithJar() throws Exception {
assertThat(schema.toString(), containsString("$id: test-schema.yml"));
}

@Test
void shouldBeAbleToLoadSchemaForAnotherModule() {
// Given:
final ProducerSchema schema = LocalSchemaLoader.loadFromClasspath(OutputValue.class);

// Then:
assertThat(schema.toString(), containsString("title: Output Value"));
}

private static final class TestModel {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
This directory contains schema generated using the creek-json-schema-gradle-plugin.

Unfortunately, until https://github.com/java9-modularity/gradle-modules-plugin/pull/228 is fixed,
the schema generated automatically by the `generateTestJsonSchema` task aren't available during testing.

So for now, they are copied into here.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ additionalProperties: false
properties:
inner:
oneOf:
- $ref: '#/definitions/ImplicitlyNamed'
- $ref: '#/definitions/the-explicit-name'
- $ref: '#/definitions/ImplicitlyNamed'
- $ref: '#/definitions/the-explicit-name'
definitions:
ImplicitlyNamed:
type: object
Expand All @@ -17,25 +17,25 @@ definitions:
'@type':
type: string
enum:
- TypeWithImplicitPolymorphism$ImplicitlyNamed
- TypeWithImplicitPolymorphism$ImplicitlyNamed
default: TypeWithImplicitPolymorphism$ImplicitlyNamed
age:
type: integer
title: TypeWithImplicitPolymorphism$ImplicitlyNamed
required:
- '@type'
- age
- '@type'
- age
the-explicit-name:
type: object
additionalProperties: false
properties:
'@type':
type: string
enum:
- the-explicit-name
- the-explicit-name
default: the-explicit-name
text:
type: string
title: the-explicit-name
required:
- '@type'
- '@type'
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
# timestamp=1706025251767
# timestamp=1706479369935
$schema: http://json-schema.org/draft-07/schema#
title: With Ambiguous Float
type: object
additionalProperties: false
properties:
number:
type: number
type: number
23 changes: 0 additions & 23 deletions json-serde/src/test/resources/schema/json/test-schema.yml

This file was deleted.

3 changes: 2 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ include(
"test-java-nine",
"test-serde",
"test-serde-java-eight",
"test-service-native"
"test-service-native",
"test-service-json"
)
3 changes: 3 additions & 0 deletions test-service-json/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Creek Kafka - Test Service

A module that will soon define a test microservice used by this repo for testing.
Loading

0 comments on commit 8a18482

Please sign in to comment.