A collection of Spring Boot applications to demonstrate schema evolution using Spring Cloud Stream and Confluent Schema Registry.
All components (producers and consumers) in this sample are configured to use native encoding[1] which ensures their use of the Confluent provided Avro serializers, rather than the Spring Cloud Stream provided Avro serializers. The Confluent serializers leverage the Confluent schema registry to validate schemas during record production. An additional benefit is that these components are now cross compatible with external tools that use the Confluent Avro serializers such as the out-of-the box tools - kafka-avro-console-consumer
and kafka-avro-console-producer
- that come with Confluent Schema registry.
As a developer, I’d like to design my consumer to be resilient to differing payload schemas, and I want to use Confluent Schema Registry.
There are a lot of online literature on schema evolution, so we are going to skip defining them here. For this sample, however, we will simply assume there are two producers producing events with different payload schemas and a consumer that is able to consume both of the payloads.
To build the applications simply execute the following command from the confluent-schema-registry-integration
directory:
./mvnw clean install
Note
|
The apps can be built and run from w/in an IDE (such as IntelliJ) but you will need to invoke the Maven package goal and then refresh the project as the Avro Maven plugin needs to execute so that it generates the required model classes - otherwise you will see compile failures around missing Sensor class.
|
-
The components have all been built by following the Building the applications steps.
-
Apache Kafka broker available at
localhost:9092
-
Confluent Schema Registry available at
localhost:8081
Tip
|
The included Kafka tools can be used to easily start a broker and schema registry locally on the required coordinates |
Make sure the above pre-requisites are satisfied and that you are in the confluent-schema-registry-integration
directory and follow the steps below.
Execute the following command to set the schema compatibility mode to NONE
on the Confluent schema registry:
curl -X PUT http://localhost:8081/config -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"
{"compatibility":"NONE"}
-
Start
consumer
on another terminal session
cd confluent-schema-registry-integration-consumer
java -jar target/confluent-schema-registry-integration-consumer-4.0.0-SNAPSHOT.jar
-
Start
producer1
on another terminal session
cd confluent-schema-registry-integration-producer1
java -jar target/confluent-schema-registry-integration-producer1-4.0.0-SNAPSHOT.jar
-
Start
producer2
on another terminal session
cd confluent-schema-registry-integration-producer2
java -jar target/confluent-schema-registry-integration-producer2-4.0.0-SNAPSHOT.jar
The producer apps each expose a REST endpoint which sends a sample Kafka event when invoked. Execute the following commands to send some sample data.
curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9010/randomMessage
curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9009/randomMessage
curl -X POST http://localhost:9010/randomMessage
The consumer should log the results which should look something like:
{"id": "10fd4598-0a12-4d9b-9512-3e2257d9b713-v1", "internalTemperature": 28.674625, "externalTemperature": 0.0, "acceleration": 5.3196855, "velocity": 41.38155}
{"id": "cc3555b0-0a52-44ef-9a06-89ffda361e0b-v2", "internalTemperature": 8.026814, "externalTemperature": 0.0, "acceleration": 7.5858965, "velocity": 79.71516}
{"id": "5a23d687-093f-4b13-a42f-19842041a96f-v1", "internalTemperature": 31.853527, "externalTemperature": 0.0, "acceleration": 6.006965, "velocity": 19.520967}
{"id": "3fe53513-def6-4376-81b9-a5b332d54fc2-v1", "internalTemperature": 33.353485, "externalTemperature": 0.0, "acceleration": 5.429616, "velocity": 82.439064}
{"id": "5813a495-748c-4485-ac85-cd6ad28a839c-v2", "internalTemperature": 6.988123, "externalTemperature": 0.0, "acceleration": 1.4945298, "velocity": 51.230377}
Note
|
The id value is appended with -v1 or -v2 indicating if it was sent from producer1 or producer2 , respectively.
|
Alternatively, the Confluent command line tools can also be used to view the output with the following command:
<confluent-tools-install-dir>/bin/kafka-avro-console-consumer \
--topic sensor-topic \
--bootstrap-server localhost:9092 \
--from-beginning
The schema evolved on the temperature
field. That field is now split into two separate fields, internalTemperature
and externalTemperature
. The producer1
produces payload only with temperature
and on the other hand, producer2
produces payload with internalTemperature
and externalTemperature
.
The consumer
is coded against a base schema that include the split fields.
The consumer
app can happily deserialize the payload with internalTemperature
and externalTemperature
fields. However, when a producer1
payload arrives (which includes only the single temperature
field), the schema evolution and compatibility check are automatically applied.
Because each payload also includes the payload version in the header, Spring Cloud Stream with the help of Schema
Registry server and Avro, the schema evolution occurs behind the scenes. The automatic mapping of temperature
to
internalTemperature
field is applied.
Confluent provides the Confluent Control Center UI - a web-based tool for managing and monitoring Apache Kafka®.
Tip
|
The included Kafka tools can be used to easily start a Control Center UI locally |
Once the UI is running locally and you have followed the Running the applications section:
-
Open the control center at
http://localhost:9021
and click on the provided cluster -
From the vertical menu select
Topics
tab -
From the list of topics select the
sensor-topic
(created by the samples) -
Click on the
Schema
tab to view theSensors
schema.
The Confluent Schema Registry also makes available a REST API for schema related operations.
For example, the http://localhost:8081/subjects
endpoint will list the schema names (e.g. subjects) currently defined in the registry. After you have run the samples you should be able to see a schema subject name sensor-topic-value
.
By default, Confluent uses the TopicNameStrategy to create the name of the message payload schema. The name of the schema is your topic name (e.g. spring.cloud.stream.bindings.<channel>:destination
) appended with -value
.
This means that by default you can use a single schema per topic. However, the subject naming strategy can be changed to RecordNameStrategy
or TopicRecordNameStrategy
by setting the following properties:
spring:
cloud:
stream:
kafka:
binder:
consumerProperties:
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
spring:
cloud:
stream:
kafka:
binder:
producerProperties:
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
Note
|
Currently the Control Center seems to only recognize the subjects created with the TopicNameStrategy . If you configure the RecordNameStrategy the schema will not appear in the UI.
|