diff --git a/build.gradle b/build.gradle
index 85fbef6c4..ab723b7dd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -215,6 +215,40 @@ def getProjectVersion() {
return ver
}
+project('serializers') {
+ dependencies {
+ compile project(':common')
+ compile project(':client')
+ compile group: 'org.apache.avro', name: 'avro', version: avroVersion
+ compile group: 'com.google.protobuf', name: 'protobuf-java', version: protobufProtocVersion
+ compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: protobufUtilVersion
+ compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
+ compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
+ compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
+ compile group: 'com.github.everit-org.json-schema', name: 'org.everit.json.schema', version: everitVersion
+ testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
+ testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
+ testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
+ }
+
+ javadoc {
+ title = "Serializers"
+ dependsOn delombok
+ source = delombok.outputDir
+ failOnError = true
+ exclude "**/impl/**";
+ options.addBooleanOption("Xdoclint:all,-reference", true)
+ }
+
+ jar {
+ manifest {}
+
+ from {
+ configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
+ }
+ }
+}
+
project('server') {
sourceSets {
main.resources.srcDirs += "$projectDir/src/conf"
@@ -311,6 +345,15 @@ distributions {
from 'NOTICE'
}
}
+ serializers {
+ baseName = "schema-registry-serializers"
+ contents {
+ from { project(":serializers").configurations.runtime }
+ from { project(":serializers").configurations.runtime.allArtifacts.files }
+ from 'LICENSE'
+ from 'NOTICE'
+ }
+ }
}
task sourceCopy(type: Copy) {
@@ -350,6 +393,7 @@ task publishAllJars() {
dependsOn ':common:publish'
dependsOn ':contract:publish'
dependsOn ':server:publish'
+ dependsOn ':serializers:publish'
}
task prepareRegistryImage(type: Copy) {
diff --git a/checkstyle/spotbugs-exclude.xml b/checkstyle/spotbugs-exclude.xml
index 4500e3dec..31efb43d4 100644
--- a/checkstyle/spotbugs-exclude.xml
+++ b/checkstyle/spotbugs-exclude.xml
@@ -5,6 +5,9 @@
+
+
+
diff --git a/gradle.properties b/gradle.properties
index eb0c2841f..3493130ec 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -26,7 +26,7 @@ gradleLombokPluginVersion=3.2.0
gradleSshPluginVersion=2.9.0
guavaVersion=28.1-jre
javaxServletApiVersion=4.0.0
-jacksonVersion=2.10.3
+jacksonVersion=2.11.1
everitVersion=1.12.1
javaxwsrsApiVersion=2.1
jaxbVersion=2.3.0
@@ -50,7 +50,7 @@ pravegaVersion=0.8.0-2591.37c5082-SNAPSHOT
pravegaKeyCloakVersion=0.7.0
# Version and base tags can be overridden at build time
-schemaregistryVersion=0.0.1-SNAPSHOT
+schemaregistryVersion=0.1.0-SNAPSHOT
schemaregistryBaseTag=pravega/schemaregistry
# Pravega Signing Key
diff --git a/serializers/src/main/java/io/pravega/schemaregistry/codec/Codec.java b/serializers/src/main/java/io/pravega/schemaregistry/codec/Codec.java
new file mode 100644
index 000000000..b2e1f26ff
--- /dev/null
+++ b/serializers/src/main/java/io/pravega/schemaregistry/codec/Codec.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.schemaregistry.codec;
+
+import io.pravega.schemaregistry.contract.data.CodecType;
+import io.pravega.schemaregistry.contract.data.EncodingInfo;
+
+/**
+ * Codec interface extends {@link Encoder} and {@link Decoder} interfaces that defines methods to encode and decode
+ * data. Encoder interface takes a codec type and encoding function. Decoder interface defines a decoding function.
+ */
+public interface Codec extends Encoder, Decoder {
+ /**
+ * Name identifying the Codec Type.
+ * This name should be same as the {@link CodecType#getName()} that is registered for the group in schema registry
+ * service.
+ * The deserializers will find the decoder for the encoded data from {@link EncodingInfo#getCodecType()} by matching
+ * the name.
+ *
+ * @return Name of the codec.
+ */
+ String getName();
+}
diff --git a/serializers/src/main/java/io/pravega/schemaregistry/codec/Codecs.java b/serializers/src/main/java/io/pravega/schemaregistry/codec/Codecs.java
new file mode 100644
index 000000000..5f5155c05
--- /dev/null
+++ b/serializers/src/main/java/io/pravega/schemaregistry/codec/Codecs.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.schemaregistry.codec;
+
+import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
+import io.pravega.schemaregistry.contract.data.CodecType;
+import lombok.Getter;
+import org.apache.commons.io.IOUtils;
+import org.xerial.snappy.Snappy;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ * Utility class for creating codecs for none, snappy or gzip.
+ */
+public enum Codecs {
+ None(Constants.NOOP),
+ GzipCompressor(Constants.GZIP_CODEC),
+ SnappyCompressor(Constants.SNAPPY_CODEC);
+
+ @Getter
+ private final Codec codec;
+
+ Codecs(Codec codec) {
+ this.codec = codec;
+ }
+
+ private static class Noop implements Codec {
+ private static final CodecType CODEC_TYPE_NONE = new CodecType(Constants.NONE);
+
+ @Override
+ public String getName() {
+ return CODEC_TYPE_NONE.getName();
+ }
+
+ @Override
+ public CodecType getCodecType() {
+ return CODEC_TYPE_NONE;
+ }
+
+ @Override
+ public void encode(ByteBuffer data, ByteArrayOutputStream bos) {
+ if (data.hasArray()) {
+ bos.write(data.array(), data.arrayOffset() + data.position(), data.remaining());
+ } else {
+ byte[] b = getBytes(data);
+ bos.write(b, 0, b.length);
+ }
+ }
+
+ @Override
+ public ByteBuffer decode(ByteBuffer data, Map codecProperties) {
+ return data;
+ }
+ }
+
+ private static class GZipCodec implements Codec {
+ private static final CodecType CODEC_TYPE_GZIP = new CodecType(Constants.APPLICATION_X_GZIP);
+ @Override
+ public String getName() {
+ return CODEC_TYPE_GZIP.getName();
+ }
+
+ @Override
+ public CodecType getCodecType() {
+ return CODEC_TYPE_GZIP;
+ }
+
+ @Override
+ public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
+ byte[] b = data.hasArray() ? data.array() : getBytes(data);
+ int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0;
+ try (GZIPOutputStream gzipOS = new GZIPOutputStream(bos)) {
+ gzipOS.write(b, offset, data.remaining());
+ }
+ }
+
+ @Override
+ public ByteBuffer decode(ByteBuffer data, Map codecProperties) throws IOException {
+ InputStream bis = new ByteBufferBackedInputStream(data);
+ return ByteBuffer.wrap(IOUtils.toByteArray(new GZIPInputStream(bis)));
+ }
+ }
+
+ private static byte[] getBytes(ByteBuffer data) {
+ byte[] b = new byte[data.remaining()];
+ data.get(b);
+ return b;
+ }
+
+ private static class SnappyCodec implements Codec {
+ private static final CodecType CODEC_TYPE_SNAPPY = new CodecType(Constants.APPLICATION_X_SNAPPY_FRAMED);
+ @Override
+ public String getName() {
+ return CODEC_TYPE_SNAPPY.getName();
+ }
+
+ @Override
+ public CodecType getCodecType() {
+ return CODEC_TYPE_SNAPPY;
+ }
+
+ @Override
+ public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
+ int capacity = Snappy.maxCompressedLength(data.remaining());
+ byte[] encoded = new byte[capacity];
+
+ byte[] b = data.hasArray() ? data.array() : getBytes(data);
+ int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0;
+ int size = Snappy.compress(b, offset, data.remaining(), encoded, 0);
+ bos.write(encoded, 0, size);
+ }
+
+ @Override
+ public ByteBuffer decode(ByteBuffer data, Map codecProperties) throws IOException {
+ byte[] b = data.hasArray() ? data.array() : getBytes(data);
+ int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0;
+
+ ByteBuffer decoded = ByteBuffer.allocate(Snappy.uncompressedLength(b, offset, data.remaining()));
+ Snappy.uncompress(b, offset, data.remaining(), decoded.array(), 0);
+ return decoded;
+ }
+ }
+
+ static class Constants {
+ static final Noop NOOP = new Noop();
+ static final GZipCodec GZIP_CODEC = new GZipCodec();
+ static final SnappyCodec SNAPPY_CODEC = new SnappyCodec();
+ static final String NONE = "";
+ static final String APPLICATION_X_GZIP = "application/x-gzip";
+ static final String APPLICATION_X_SNAPPY_FRAMED = "application/x-snappy-framed";
+ }
+}
diff --git a/serializers/src/main/java/io/pravega/schemaregistry/codec/Decoder.java b/serializers/src/main/java/io/pravega/schemaregistry/codec/Decoder.java
new file mode 100644
index 000000000..5c0d7f3a8
--- /dev/null
+++ b/serializers/src/main/java/io/pravega/schemaregistry/codec/Decoder.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.schemaregistry.codec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * Decoder interface that defines method to decode data.
+ */
+@FunctionalInterface
+public interface Decoder {
+ /**
+ * Implementation should decode the remaining bytes in the buffer and return a new ByteBuffer that includes
+ * the decoded data at its current position.
+ *
+ * @param data encoded ByteBuffer to decode.
+ * @param codecProperties codec properties.
+ * @return decoded ByteBuffer with position set to the start of decoded data.
+ * @throws IOException can be thrown while reading from or writing to byte buffers.
+ */
+ ByteBuffer decode(ByteBuffer data, Map codecProperties) throws IOException;
+}
diff --git a/serializers/src/main/java/io/pravega/schemaregistry/codec/Encoder.java b/serializers/src/main/java/io/pravega/schemaregistry/codec/Encoder.java
new file mode 100644
index 000000000..e6e9764e8
--- /dev/null
+++ b/serializers/src/main/java/io/pravega/schemaregistry/codec/Encoder.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.schemaregistry.codec;
+
+import io.pravega.schemaregistry.contract.data.CodecType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Defines method to encode data.
+ */
+public interface Encoder {
+ /**
+ * Codec type for the encoder.
+ *
+ * @return Codec Type for the encoder.
+ */
+ CodecType getCodecType();
+
+ /**
+ * Implementation should encode the remaining bytes in the buffer and return a new ByteBuffer that includes
+ * the encoded data at its current position.
+ *
+ * The implementation can optionally call flush or close on outputstream with no consequence.
+ *
+ * @param data ByteBuffer to encode.
+ * @param outputStream ByteArrayOutputStream where the encoded data should be written.
+ * @throws IOException IOException can be thrown while reading from or writing to byte buffers.
+ */
+ void encode(ByteBuffer data, ByteArrayOutputStream outputStream) throws IOException;
+}
diff --git a/serializers/src/main/java/io/pravega/schemaregistry/schemas/AvroSchema.java b/serializers/src/main/java/io/pravega/schemaregistry/schemas/AvroSchema.java
new file mode 100644
index 000000000..4fccf058b
--- /dev/null
+++ b/serializers/src/main/java/io/pravega/schemaregistry/schemas/AvroSchema.java
@@ -0,0 +1,141 @@
+/**
+ * Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.schemaregistry.schemas;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import io.pravega.schemaregistry.contract.data.SchemaInfo;
+import io.pravega.schemaregistry.contract.data.SerializationFormat;
+import lombok.Getter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecordBase;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Container class for Avro Schema.
+ *
+ * @param Type of element.
+ */
+public class AvroSchema implements Schema {
+ @Getter
+ private final org.apache.avro.Schema schema;
+ private final SchemaInfo schemaInfo;
+ @Getter
+ private final Class tClass;
+
+ private AvroSchema(org.apache.avro.Schema schema, Class tClass) {
+ this.schema = schema;
+ this.schemaInfo = new SchemaInfo(schema.getFullName(),
+ SerializationFormat.Avro, getSchemaBytes(), ImmutableMap.of());
+ this.tClass = tClass;
+ }
+
+ private AvroSchema(SchemaInfo schemaInfo) {
+ String schemaString = new String(schemaInfo.getSchemaData().array(), Charsets.UTF_8);
+ this.schema = new org.apache.avro.Schema.Parser().parse(schemaString);
+ this.schemaInfo = schemaInfo;
+ this.tClass = null;
+ }
+
+ /**
+ * Method to create a typed AvroSchema for the given class. It extracts the avro schema from the class.
+ * For Avro generated classes, the schema is retrieved from the class.
+ * For POJOs the schema is extracted using avro's {@link ReflectData}.
+ *
+ * @param tClass Class whose object's schema is used.
+ * @param Type of the Java class.
+ * @return {@link AvroSchema} with generic type T that extracts and captures the avro schema.
+ */
+ public static AvroSchema of(Class tClass) {
+ org.apache.avro.Schema schema;
+ if (SpecificRecordBase.class.isAssignableFrom(tClass)) {
+ schema = SpecificData.get().getSchema(tClass);
+ } else {
+ schema = ReflectData.get().getSchema(tClass);
+ }
+ return new AvroSchema<>(schema, tClass);
+ }
+
+ /**
+ * Method to create a typed AvroSchema of type {@link Object} from the given schema.
+ * This schema can be used to express any non record schema.
+ *
+ * @param schema Schema to use.
+ * @return Returns an AvroSchema with {@link Object} type.
+ */
+ public static AvroSchema