Serialize and Deserialize CloudEvents integrated with Schema Registry.
Support:
- Java 11+
- Apache Kafka® 2.6.0+
- CloudEvents Spec v1.0.1
- CloudEvents Binary Content Mode
- Dependency
-
Gradle
repositories { // ... maven { url = uri('http://packages.confluent.io/maven/') } maven { url 'https://jitpack.io' } } dependencies { implementation 'com.github.kattlo:cloudevents-kafka-avro-serializer:v0.11.0' }
-
Apache Maven®
<repositories> <repository> <id>jitpack.io</id> <url>https://jitpack.io</url> </repository> </repositories> <dependency> <groupId>com.github.kattlo</groupId> <artifactId>cloudevents-kafka-avro-serializer</artifactId> <version>v0.11.0</version> </dependency>
- Configure
- Serializer
cloudevents.serializer.encoding=BINARY schema.registry.url=http://configure.me:8081 auto.register.schemas=true value.serializer=io.github.kattlo.cloudevents.KafkaAvroCloudEventSerializer
- Deserializer
specific.avro.reader=false #to use GenericRecord data #specific.avro.reader=true #to use strong typed data schema.registry.url=http://configure.me:8081 value.deserializer=io.github.kattlo.cloudevents.KafkaAvroCloudEventDeserializer
- Use
- Serialization
import java.net.URI; import java.time.OffsetDateTime; import java.util.UUID; import io.cloudevents.core.builder.CloudEventBuilder; import io.github.kattlo.cloudevents.AvroCloudEventData; import org.apache.kafka.clients.producer.ProducerRecord; // . . . var event = CloudEventBuilder .v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("/example")) .withType("type.example") .withTime(OffsetDateTime.now()) .withData(AvroCloudEventData.MIME_TYPE, data) .build(); var record = new ProducerRecord<>("my-topic", event); // --- create KafkaProducer with Serializer configurations --- // // producer.send(record);
- Deserialization
import io.github.kattlo.cloudevents.AvroCloudEventData; import io.cloudevents.CloudEvent; import org.apache.avro.generic.GenericRecord; // --- create KafkaConsumer with Deserializer configurations --- // // consumer.subscribe(...) //var records = consumer.pool() records.forEach(record -> { // Get the CloudEvent instance CloudEvent event = record.value(); // when specific.avro.reader=false GenericRecord data = AvroCloudEventData.dataOf(event); // when specific.avro.reader=true YourType data = AvroCloudEventData.dataOf(event); });