Skip to content

Commit

Permalink
Issue 40: Serializers Implementation for Schema Registry (#41)
Browse files Browse the repository at this point in the history
Signed-off-by: Shivesh Ranjan <[email protected]>
  • Loading branch information
shiveshr authored Jul 21, 2020
1 parent a82a400 commit 9aac414
Show file tree
Hide file tree
Showing 62 changed files with 8,922 additions and 2 deletions.
44 changes: 44 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,40 @@ def getProjectVersion() {
return ver
}

project('serializers') {
dependencies {
compile project(':common')
compile project(':client')
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
compile group: 'com.google.protobuf', name: 'protobuf-java', version: protobufProtocVersion
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: protobufUtilVersion
compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
compile group: 'com.github.everit-org.json-schema', name: 'org.everit.json.schema', version: everitVersion
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
}

javadoc {
title = "Serializers"
dependsOn delombok
source = delombok.outputDir
failOnError = true
exclude "**/impl/**";
options.addBooleanOption("Xdoclint:all,-reference", true)
}

jar {
manifest {}

from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
}

project('server') {
sourceSets {
main.resources.srcDirs += "$projectDir/src/conf"
Expand Down Expand Up @@ -311,6 +345,15 @@ distributions {
from 'NOTICE'
}
}
serializers {
baseName = "schema-registry-serializers"
contents {
from { project(":serializers").configurations.runtime }
from { project(":serializers").configurations.runtime.allArtifacts.files }
from 'LICENSE'
from 'NOTICE'
}
}
}

task sourceCopy(type: Copy) {
Expand Down Expand Up @@ -350,6 +393,7 @@ task publishAllJars() {
dependsOn ':common:publish'
dependsOn ':contract:publish'
dependsOn ':server:publish'
dependsOn ':serializers:publish'
}

task prepareRegistryImage(type: Copy) {
Expand Down
3 changes: 3 additions & 0 deletions checkstyle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
<Match> <!-- generated code -->
<Package name="io.pravega.schemaregistry.test.integrationtest.generated" />
</Match>
<Match> <!-- generated code -->
<Package name="io.pravega.schemaregistry.testobjs.generated" />
</Match>
<Match> <!-- does not work well with futures -->
<Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
</Match>
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ gradleLombokPluginVersion=3.2.0
gradleSshPluginVersion=2.9.0
guavaVersion=28.1-jre
javaxServletApiVersion=4.0.0
jacksonVersion=2.10.3
jacksonVersion=2.11.1
everitVersion=1.12.1
javaxwsrsApiVersion=2.1
jaxbVersion=2.3.0
Expand All @@ -50,7 +50,7 @@ pravegaVersion=0.8.0-2591.37c5082-SNAPSHOT
pravegaKeyCloakVersion=0.7.0

# Version and base tags can be overridden at build time
schemaregistryVersion=0.0.1-SNAPSHOT
schemaregistryVersion=0.1.0-SNAPSHOT
schemaregistryBaseTag=pravega/schemaregistry

# Pravega Signing Key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import io.pravega.schemaregistry.contract.data.CodecType;
import io.pravega.schemaregistry.contract.data.EncodingInfo;

/**
* Codec interface extends {@link Encoder} and {@link Decoder} interfaces that defines methods to encode and decode
* data. Encoder interface takes a codec type and encoding function. Decoder interface defines a decoding function.
*/
public interface Codec extends Encoder, Decoder {
/**
* Name identifying the Codec Type.
* This name should be same as the {@link CodecType#getName()} that is registered for the group in schema registry
* service.
* The deserializers will find the decoder for the encoded data from {@link EncodingInfo#getCodecType()} by matching
* the name.
*
* @return Name of the codec.
*/
String getName();
}
146 changes: 146 additions & 0 deletions serializers/src/main/java/io/pravega/schemaregistry/codec/Codecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import io.pravega.schemaregistry.contract.data.CodecType;
import lombok.Getter;
import org.apache.commons.io.IOUtils;
import org.xerial.snappy.Snappy;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* Utility class for creating codecs for none, snappy or gzip.
*/
public enum Codecs {
None(Constants.NOOP),
GzipCompressor(Constants.GZIP_CODEC),
SnappyCompressor(Constants.SNAPPY_CODEC);

@Getter
private final Codec codec;

Codecs(Codec codec) {
this.codec = codec;
}

private static class Noop implements Codec {
private static final CodecType CODEC_TYPE_NONE = new CodecType(Constants.NONE);

@Override
public String getName() {
return CODEC_TYPE_NONE.getName();
}

@Override
public CodecType getCodecType() {
return CODEC_TYPE_NONE;
}

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) {
if (data.hasArray()) {
bos.write(data.array(), data.arrayOffset() + data.position(), data.remaining());
} else {
byte[] b = getBytes(data);
bos.write(b, 0, b.length);
}
}

@Override
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) {
return data;
}
}

private static class GZipCodec implements Codec {
private static final CodecType CODEC_TYPE_GZIP = new CodecType(Constants.APPLICATION_X_GZIP);
@Override
public String getName() {
return CODEC_TYPE_GZIP.getName();
}

@Override
public CodecType getCodecType() {
return CODEC_TYPE_GZIP;
}

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
byte[] b = data.hasArray() ? data.array() : getBytes(data);
int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0;
try (GZIPOutputStream gzipOS = new GZIPOutputStream(bos)) {
gzipOS.write(b, offset, data.remaining());
}
}

@Override
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException {
InputStream bis = new ByteBufferBackedInputStream(data);
return ByteBuffer.wrap(IOUtils.toByteArray(new GZIPInputStream(bis)));
}
}

private static byte[] getBytes(ByteBuffer data) {
byte[] b = new byte[data.remaining()];
data.get(b);
return b;
}

private static class SnappyCodec implements Codec {
private static final CodecType CODEC_TYPE_SNAPPY = new CodecType(Constants.APPLICATION_X_SNAPPY_FRAMED);
@Override
public String getName() {
return CODEC_TYPE_SNAPPY.getName();
}

@Override
public CodecType getCodecType() {
return CODEC_TYPE_SNAPPY;
}

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
int capacity = Snappy.maxCompressedLength(data.remaining());
byte[] encoded = new byte[capacity];

byte[] b = data.hasArray() ? data.array() : getBytes(data);
int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0;
int size = Snappy.compress(b, offset, data.remaining(), encoded, 0);
bos.write(encoded, 0, size);
}

@Override
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException {
byte[] b = data.hasArray() ? data.array() : getBytes(data);
int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0;

ByteBuffer decoded = ByteBuffer.allocate(Snappy.uncompressedLength(b, offset, data.remaining()));
Snappy.uncompress(b, offset, data.remaining(), decoded.array(), 0);
return decoded;
}
}

static class Constants {
static final Noop NOOP = new Noop();
static final GZipCodec GZIP_CODEC = new GZipCodec();
static final SnappyCodec SNAPPY_CODEC = new SnappyCodec();
static final String NONE = "";
static final String APPLICATION_X_GZIP = "application/x-gzip";
static final String APPLICATION_X_SNAPPY_FRAMED = "application/x-snappy-framed";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

/**
* Decoder interface that defines method to decode data.
*/
@FunctionalInterface
public interface Decoder {
/**
* Implementation should decode the remaining bytes in the buffer and return a new ByteBuffer that includes
* the decoded data at its current position.
*
* @param data encoded ByteBuffer to decode.
* @param codecProperties codec properties.
* @return decoded ByteBuffer with position set to the start of decoded data.
* @throws IOException can be thrown while reading from or writing to byte buffers.
*/
ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import io.pravega.schemaregistry.contract.data.CodecType;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Defines method to encode data.
*/
public interface Encoder {
/**
* Codec type for the encoder.
*
* @return Codec Type for the encoder.
*/
CodecType getCodecType();

/**
* Implementation should encode the remaining bytes in the buffer and return a new ByteBuffer that includes
* the encoded data at its current position.
*
* The implementation can optionally call flush or close on outputstream with no consequence.
*
* @param data ByteBuffer to encode.
* @param outputStream ByteArrayOutputStream where the encoded data should be written.
* @throws IOException IOException can be thrown while reading from or writing to byte buffers.
*/
void encode(ByteBuffer data, ByteArrayOutputStream outputStream) throws IOException;
}
Loading

0 comments on commit 9aac414

Please sign in to comment.