Skip to content

Latest commit

 

History

History
203 lines (158 loc) · 9.34 KB

File metadata and controls

203 lines (158 loc) · 9.34 KB

Spring Cloud Stream and Schema Evolution in Action with Kafka Binder.

These are a set of Spring Boot applications to demonstrate Schema Evolution using Spring Cloud Stream with Kafka binder. Producer V1 (producer1), Producer V2 (producer2), and Consumer (consumer) are included in this project.

Requirement

As a developer, I’d like to design my consumer to be resilient to differing payload schemas.

Assumptions

For this demonstration, we will simply assume there are two producers producing events with different payload schemas. A consumer that consumes both the payload versions will be designed to adapt to evolving schemas.

Both producers and consumers interact with the SCSt schema registry to register and evolve the schema.

Building

Note
It is expected that this repo has been checked out locally and All commands are executed from this sample’s directory spring-cloud-stream-schema-registry-integration unless otherwise noted.

Build the apps

To build the applications simply execute the following command:

./mvnw clean install
Note
The apps can be built and run from w/in an IDE (such as IntelliJ) but you will need to invoke the Maven package goal and then refresh the project as the Avro Maven plugin needs to execute so that it generates the required model classes - otherwise you will see compile failures around missing Sensor class.

Running

Note
It is expected that this repo has been checked out locally and All commands are executed from this sample’s directory spring-cloud-stream-schema-registry-integration unless otherwise noted.

Pre-requisites

  • The components have all been built by following the Building steps.

  • Apache Kafka broker available at localhost:9092

Tip
The included Kafka tools can be used to easily start a broker locally on the required coordinates
  • By default, the schema registry is backed by an H2 database.

    • To instead use Postgres it must be available at localhost:5432

    • To instead use MySQL it must be available at localhost:3306.

Tip
Docker compose files are provided for both Postgres and MySQL. You can simply run docker-compose -f <postgres.yml|mysql.yml> <up|down> to start/stop the database server

Steps

Make sure the above pre-requisites are satisfied and follow the steps below.

Start Schema Registry

Download Spring Cloud Stream Schema Registry (Note that the version below is used as an example, please update the version to the latest GA release)

Start the Schema Registry server (adjust commands accordingly if you are not on a Unix like platform). If you downloaded the Schema Registry server to a different location, update the location below when you start the server.

java -jar ./spring-cloud-stream-schema-registry-server-4.0.4.jar

By default, the schema registry starts with a local H2 database.

To use Postgres database instead of H2…​

additional properties must be specified when starting the server:

java -jar ./spring-cloud-stream-schema-registry-server-4.0.4.jar \
  --spring.datasource.url=jdbc:postgresql://localhost:5432/registry \
  --spring.datasource.username=root \
  --spring.datasource.password=rootpw \
  --spring.datasource.driver-class-name=org.postgresql.Driver \
  --spring.jpa.database-platform=org.hibernate.dialect.PostgreSQLDialect \
  --spring.jpa.hibernate.ddl-auto-create=true \
  --spring.jpa.hibernate.ddl-auto=update \
  --spring.jpa.generate-ddl=true
To use MySQL database instead of H2…​

additional properties must be specified when starting the server:

java -jar ./spring-cloud-stream-schema-registry-server-4.0.4.jar \
  --spring.datasource.url=jdbc:mariadb://localhost:3306/registry \
  --spring.datasource.username=root \
  --spring.datasource.password=rootpw \
  --spring.datasource.driver-class-name=org.mariadb.jdbc.Driver \
  --spring.jpa.database-platform=org.hibernate.dialect.MariaDB53Dialect \
  --spring.jpa.hibernate.ddl-auto-create=true \
  --spring.jpa.hibernate.ddl-auto=update \
  --spring.jpa.generate-ddl=true
Start consumer

Start consumer on another terminal session (or run it from an IDE)

java -jar schema-registry-consumer-kafka/target/schema-registry-consumer-kafka-0.0.1-SNAPSHOT.jar
Start V1 producer

Start producer1 on another terminal session (or run it from an IDE)

java -jar schema-registry-producer1-kafka/target/schema-registry-producer1-kafka-0.0.1-SNAPSHOT.jar
Start V2 producer

Start producer2 on another terminal session (or run it from an IDE)

java -jar schema-registry-producer2-kafka/target/schema-registry-producer2-kafka-0.0.1-SNAPSHOT.jar

Sample Data

Both the producers in the demonstration are also REST controllers. We will hit the /randomMessage endpoint on each producer to POST sample data.

Example:

curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9010/randomMessage
curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9010/randomMessage

Output

The consumer should log the results.

{"id": "d5657e55-c2cd-48f0-a22e-d28d1ef10873-v1", "internalTemperature": 19.534815, "externalTemperature": 0.0, "acceleration": 5.286502, "velocity": 25.349945}
{"id": "6a6de265-997c-4bf9-8eae-97accccb78e9-v2", "internalTemperature": 39.443855, "externalTemperature": 40.365253, "acceleration": 1.8879288, "velocity": 2.5296867}
{"id": "f09defad-828f-43ae-93a4-e777754cf57a-v1", "internalTemperature": 15.895501, "externalTemperature": 0.0, "acceleration": 1.9341749, "velocity": 52.868507}
{"id": "b39b8c73-eec3-4abd-b8d2-cc405eb39bd7-v1", "internalTemperature": 44.90698, "externalTemperature": 0.0, "acceleration": 1.5393275, "velocity": 87.0358}
{"id": "19d5c20e-ec18-4b35-a82a-c8322d7fea27-v2", "internalTemperature": 19.203693, "externalTemperature": 47.290142, "acceleration": 1.125809, "velocity": 11.153614}
Note
Refer to the payload suffix in the id field. Each of them are appended with -v1 or -v2 indicating they are from producer1 and producer2 respectively.

What just happened?

The schema evolved on the temperature field. That field is now split into internalTemperature and externalTemperature, as two separate fields. The producer1 produces payload only with temperature and on the other hand, producer2 produces payload with internalTemperature and externalTemperature fields in it.

The consumer is coded against a base schema that include the split fields.

The consumer app can happily deserialize the payload with internalTemperature and externalTemperature fields. However, when a producer1 payload arrives (which includes temperature field), the schema evolution and compatibility check are automatically applied.

Because each payload also includes the payload version in the header, Spring Cloud Stream with the help of Schema Registry server and Avro, the schema evolution occurs behind the scenes. The automatic mapping of temperature to internalTemperature field is applied, since that’s the field where the aliases is defined.

Using Confluent Schema Registry with Spring Cloud Stream Schema Registry AVRO Converter Clients

In the examples above, we used Spring Cloud Stream Schema Registry Server with AVRO converter clients in Spring Cloud Stream. What if you want to use these converters, but against Confluent Schema Registry Server? In order to make it work, you need to provide a custom SchemaRegistryClient bean in your applications that knows how to interact with Confluent Schema Registry.

@Configuration
static class ConfluentSchemaRegistryConfiguration {
    @Bean
    public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schema-registry-client.endpoint:http://localhost:8081}") String endpoint){
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endpoint);
        return client;
    }
}

As you can see, we are using a specific implementation of SchemaRegistryClient - ConfluentSchemaRegistryClient.

You need to add this to both the producers and consumer applications.

If you started Confluent Schema Registry server at a non-default server/port (localhost:8081), then you need to provide that using the following property.

spring.cloud.stream.schema-registry-client.endpoint

That’s all there to it. The same applications that previously interacted with Spring Cloud Stream Schema Registry server now interacts with the Confluent Schema Registry Server using the same set of AVRO message converters provided by Spring Cloud Stream Schema Registry.