Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 40: Serializers Implementation for Schema Registry #41

Merged
merged 100 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from 89 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
76d99a3
copying over changes to new fork
shiveshr Jun 8, 2020
b634867
copying over to new fork
shiveshr Jun 8, 2020
a0876cc
serializers copying over
shiveshr Jun 8, 2020
05cc270
copying over
shiveshr Jun 8, 2020
8ee1fef
contract
shiveshr Jun 8, 2020
193776d
Merge branch 'contract' into serializers
shiveshr Jun 8, 2020
600ff74
remove </p>
shiveshr Jun 8, 2020
be5115e
Merge branch 'master' into contract
shiveshr Jun 9, 2020
409c106
merge with master
shiveshr Jun 9, 2020
88af2e7
remove </p>
shiveshr Jun 10, 2020
b1a8894
Merge branch 'contract' into serializers
shiveshr Jun 10, 2020
1961ba1
add name util and hash util
shiveshr Jun 10, 2020
58e7f65
removing all unwanted auto generated swagger files
shiveshr Jun 11, 2020
026b725
Merge branch 'contract' into serializers
shiveshr Jun 11, 2020
7027d88
rename
shiveshr Jun 11, 2020
4be3f40
PR comment
shiveshr Jun 12, 2020
cc223ec
Merge branch 'contract' into serializers
shiveshr Jun 12, 2020
a0a2db7
marking interfaces as beta
shiveshr Jun 15, 2020
8a8c872
Merge branch 'contract' into serializers
shiveshr Jun 15, 2020
3f64c11
validation rules of list
shiveshr Jun 16, 2020
fba77f6
Merge branch 'contract' into serializers
shiveshr Jun 16, 2020
a449b4b
remove Validation rules of list method
shiveshr Jun 16, 2020
22a0a0d
Merge branch 'contract' into serializers
shiveshr Jun 16, 2020
9a50464
PR comment
shiveshr Jun 16, 2020
8cc3091
PR comment
shiveshr Jun 17, 2020
d063e7c
Merge branch 'contract' into serializers
shiveshr Jun 17, 2020
5a7d639
Removing schema validation rules
shiveshr Jun 18, 2020
a9cca3a
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
0b6230a
license
shiveshr Jun 18, 2020
911f793
merge with contract
shiveshr Jun 18, 2020
7ff74af
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
c9f9d3f
license
shiveshr Jun 18, 2020
5d91574
javadoc statement
shiveshr Jun 18, 2020
7c30b4f
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
98a9c6b
javadoc
shiveshr Jun 18, 2020
dcff47c
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
28660e4
deserializerAsJson name fix
shiveshr Jun 18, 2020
53b8988
compatibility enum reversal
shiveshr Jun 19, 2020
8e2c704
Merge branch 'contract' into serializers
shiveshr Jun 19, 2020
ce1aa98
addschema
shiveshr Jun 19, 2020
151b808
Merge branch 'contract' into serializers
shiveshr Jun 19, 2020
bffd0f7
javadoc
shiveshr Jun 22, 2020
26e34f1
Merge branch 'contract' into serializers
shiveshr Jun 22, 2020
049fb6d
group id generator
shiveshr Jun 22, 2020
910f870
add error message in model helper
shiveshr Jun 22, 2020
d690a21
Merge branch 'contract' into serializers
shiveshr Jun 22, 2020
a44c0fb
required
shiveshr Jun 22, 2020
a82ee56
Merge branch 'contract' into serializers
shiveshr Jun 22, 2020
7b34571
Merge branch 'serializers' of https://github.com/shiveshr/schema-regi…
shiveshr Jun 22, 2020
48da360
add new deserializer factory method
shiveshr Jun 23, 2020
72919bb
PR comments
shiveshr Jun 24, 2020
48e54e9
Merge branch 'contract' into serializers
shiveshr Jun 24, 2020
7c9ba8e
avro reflect deserializer
shiveshr Jun 25, 2020
214ab54
add test for avro reflect deserializer
shiveshr Jun 25, 2020
2241865
javadoc
shiveshr Jun 25, 2020
456e6bd
Merge branch 'contract' into serializers
shiveshr Jun 25, 2020
9741bd5
Merge branch 'master' into contract
shiveshr Jun 25, 2020
6f7f4d5
Merge branch 'contract' into serializers
shiveshr Jun 25, 2020
87c9ebc
make cache package private
shiveshr Jun 25, 2020
f8fd8ad
javadocs
shiveshr Jun 25, 2020
c98ee7b
remove the hashutil from this branch
shiveshr Jun 25, 2020
c3789ca
minor imp
shiveshr Jun 25, 2020
32933ba
use full name
shiveshr Jun 28, 2020
d8b0675
ofRecord in avro schema
shiveshr Jun 28, 2020
322d3c0
exact type to message match for protobuf generic deserializer
shiveshr Jun 29, 2020
a586f4f
generalize the multiformat deserialization
shiveshr Jun 30, 2020
cdc0c0c
revert rename of json generic object
shiveshr Jun 30, 2020
8693357
refactoring serializer factory into multiple classes for readability
shiveshr Jul 2, 2020
223df77
bug fixes
shiveshr Jul 3, 2020
8c69b3f
PR comments
shiveshr Jul 3, 2020
572f34f
javadoc
shiveshr Jul 5, 2020
cf82e07
encode header as a config param
shiveshr Jul 8, 2020
c4a732b
review comments
shiveshr Jul 8, 2020
5daaeb4
temp
shiveshr Jul 8, 2020
eedee88
merge with issue 46
shiveshr Jul 8, 2020
b63a8a4
merge with master
shiveshr Jul 8, 2020
23e6470
rename variable
shiveshr Jul 8, 2020
4925759
remove validate object
shiveshr Jul 8, 2020
1dafddb
ProtobufSchema
shiveshr Jul 8, 2020
53b8f9c
minor refactoring
shiveshr Jul 10, 2020
a744198
PR comments
shiveshr Jul 10, 2020
c6a02c8
fix
shiveshr Jul 10, 2020
bf0dfb4
PR comments
shiveshr Jul 12, 2020
d70ba7e
merge with master
shiveshr Jul 13, 2020
4117557
PR comment
shiveshr Jul 13, 2020
0f770fa
Merge branch 'master' into serializers
shiveshr Jul 13, 2020
9823974
PR comments
shiveshr Jul 13, 2020
7ee6fe0
PR comments
shiveshr Jul 14, 2020
ff47289
PR comments
shiveshr Jul 14, 2020
6c3fd87
PR comments
shiveshr Jul 15, 2020
2d6711d
adding avro protobuf and json creator methods in WithSchema
shiveshr Jul 15, 2020
ade09af
PR comments, javadoc fix, and return schema string instead of JsonSch…
shiveshr Jul 16, 2020
9cbaaae
checkstyle
shiveshr Jul 16, 2020
c0e57be
PR comment
shiveshr Jul 16, 2020
174c4e6
PR comments
shiveshr Jul 16, 2020
fe0974f
revert rename of registry serializer factory
shiveshr Jul 16, 2020
d464b6b
PR comment - generic T on JSONSchema
shiveshr Jul 17, 2020
9137aa6
Merge with master, use everit json schema
shiveshr Jul 17, 2020
ed740ea
Merge with master
shiveshr Jul 17, 2020
a8fb0dd
PR comments, javadoc corrections, test addition
shiveshr Jul 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,39 @@ def getProjectVersion() {
return ver
}

project('serializers') {
dependencies {
compile project(':common')
compile project(':client')
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should probably be separated into individual targets so that users don't need to depend on more than what they actually need.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue #45 to tackle this later as it would require considerable refactoring at this time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it require a change to SerializerFactory? I'm concerned that this could be an API change, and even though we are making this beta, this is an anticipated change, it is not something that we learned later. If we can do it now, I'd say it is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way i am thinking is as follows:
have format specific serializers implemented in format specific module.
So one for protobuf, one for json, one for avro. Each such module will have a factory with only serializers/deserializers for that format.

Then we will have a consolidated one, which is this library and factory class. This is because here we have serializers which are multi format.

So this way users can take dependency on one common module (all type 1 apps that want to work for multiple formats can depend on this one module).

So the user contract, which is the SerializerFactory, doesnt change.

Only the serializer implementations would move internal to their respective modules and this base module would take dependence on those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If applications are supposed to use the specific modules, why do we need the base module? If I understand correctly, the base module is a level of indirection to the specific modules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it serves as two purposes:

  1. level of indirection
  2. allows for multi-format serializers and deserializers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this direction, @tkaitchuck needs to resolve this thread if he is satisfied.

compile group: 'com.google.protobuf', name: 'protobuf-java', version: protobufProtocVersion
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: protobufUtilVersion
compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
}

javadoc {
title = "Serializers"
dependsOn delombok
source = delombok.outputDir
failOnError = true
exclude "**/impl/**";
options.addBooleanOption("Xdoclint:all,-reference", true)
}

jar {
manifest {}

from {
configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}
}
}

project('server') {
sourceSets {
main.resources.srcDirs += "$projectDir/src/conf"
Expand Down Expand Up @@ -307,6 +340,15 @@ distributions {
from 'NOTICE'
}
}
serializers {
baseName = "schema-registry-serializers"
contents {
from { project(":serializers").configurations.runtime }
from { project(":serializers").configurations.runtime.allArtifacts.files }
from 'LICENSE'
from 'NOTICE'
}
}
}

task sourceCopy(type: Copy) {
Expand Down Expand Up @@ -346,6 +388,7 @@ task publishAllJars() {
dependsOn ':common:publish'
dependsOn ':contract:publish'
dependsOn ':server:publish'
dependsOn ':serializers:publish'
}

task prepareRegistryImage(type: Copy) {
Expand Down
3 changes: 3 additions & 0 deletions checkstyle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
<Match> <!-- generated code -->
<Package name="io.pravega.schemaregistry.test.integrationtest.generated" />
</Match>
<Match> <!-- generated code -->
<Package name="io.pravega.schemaregistry.testobjs.generated" />
</Match>
<Match> <!-- does not work well with futures -->
<Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
</Match>
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ gradleLombokPluginVersion=3.2.0
gradleSshPluginVersion=2.9.0
guavaVersion=28.1-jre
javaxServletApiVersion=4.0.0
jacksonVersion=2.10.3
jacksonVersion=2.11.1
javaxwsrsApiVersion=2.1
jaxbVersion=2.3.0
javaxAnnotationVersion=1.3.2
Expand All @@ -49,7 +49,7 @@ pravegaVersion=0.7.0
pravegaKeyCloakVersion=0.7.0

# Version and base tags can be overridden at build time
schemaregistryVersion=0.0.1-SNAPSHOT
schemaregistryVersion=0.1.0-SNAPSHOT
schemaregistryBaseTag=pravega/schemaregistry

# Pravega Signing Key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import io.pravega.schemaregistry.contract.data.CodecType;
import io.pravega.schemaregistry.contract.data.EncodingInfo;

/**
* Codec interface extends {@link Encoder} and {@link Decoder} interfaces that defines methods to encode and decode
* data. Encoder interface takes a codec type and encoding function. Decoder interface defines a decoding function.
*/
public interface Codec extends Encoder, Decoder {
/**
* Name identifying the Codec Type.
* This name should be same as the {@link CodecType#getName()} that is registered for the group in schema registry
* service.
* The deserializers will find the decoder for the encoded data from {@link EncodingInfo#getCodecType()} by matching
* the name.
*
* @return Name of the codec.
*/
String getName();
}
130 changes: 130 additions & 0 deletions serializers/src/main/java/io/pravega/schemaregistry/codec/Codecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import io.pravega.schemaregistry.contract.data.CodecType;
import lombok.Getter;
import org.apache.commons.io.IOUtils;
import org.xerial.snappy.Snappy;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

/**
* Utility class for creating codecs for none, snappy or gzip.
*/
public enum Codecs {
fpj marked this conversation as resolved.
Show resolved Hide resolved
None(Constants.NOOP),
GzipCompressor(Constants.GZIP_CODEC),
SnappyCompressor(Constants.SNAPPY_CODEC);

@Getter
private final Codec codec;

Codecs(Codec codec) {
this.codec = codec;
}

private static class Noop implements Codec {
private static final CodecType CODEC_TYPE_NONE = new CodecType(Constants.NONE);

@Override
public String getName() {
return CODEC_TYPE_NONE.getName();
}

@Override
public CodecType getCodecType() {
return CODEC_TYPE_NONE;
}

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) {
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) {
return data;
}
}

private static class GZipCodec implements Codec {
private static final CodecType CODEC_TYPE_GZIP = new CodecType(Constants.APPLICATION_X_GZIP);
@Override
public String getName() {
return CODEC_TYPE_GZIP.getName();
}

@Override
public CodecType getCodecType() {
return CODEC_TYPE_GZIP;
}

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
try (GZIPOutputStream gzipOS = new GZIPOutputStream(bos)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to call close on the output stream. I am not sure we want that.
Should these codecs be "stackable"? IE could there be a Base64 codec which wrapps any one of these compression codecs.
If so then closing the output stream should be left to the caller.

Copy link
Contributor Author

@shiveshr shiveshr Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to call close on the output stream. I am not sure we want that. Should these codecs be "stackable"?

The codecs are not stackable.
However, why the close is inconsequential here is for two reasons:

  1. Encode is the last operation we do on the outputstream.
  2. we use ByteArrayOutputStream where close method is a no-op.

So calling close on bos is inconsequential and the Serializer does not impose any restriction. However, just from the sanctity of contract perspective, i will update the javadoc to illustrate explicitly that it is inconsequential if users close the outputstream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this me we are precluding support for base64?
for pravega, maybe it never makes sense, because we do the decode before handing the data to user code (assuming both ends use this client). But is this intended to be used in any other contexts?

Copy link
Contributor Author

@shiveshr shiveshr Jul 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this me we are precluding support for base64?

No.
theoretically a user could create a codec for encoding and decoding with base64.

But is this intended to be used in any other contexts?
these serializers are specific to pravega.

Service currently generates encoding id for a single codec + schema pair.
If users want to stack multiple encodings, presently they would create a new codec, register its name, and then in the implementation for that codec they would perform multiple encodings.

for example:

compressAndBase64Encoder ==>
void encode(bytes, outputstream) {
   b1 = compress(bytes)
   b2 = base64encode(b1)
   outputstream.write(b2)
}

bytes decode(bytes) {
   b1 = base64decode(bytes)
   b2 = uncompress(b1)
   return b2
}

If we want we could change this in future to allow for an ordered list of "codecs" for generating an encoding id. Presently that is not the case though.
I have created an issue to explore the possibilities #56

gzipOS.write(data.array(), data.arrayOffset() + data.position(), data.remaining());
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
}
}

@Override
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException {
InputStream bis = new ByteBufferBackedInputStream(data);
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
return ByteBuffer.wrap(IOUtils.toByteArray(new GZIPInputStream(bis)));
}
}

private static class SnappyCodec implements Codec {
private static final CodecType CODEC_TYPE_SNAPPY = new CodecType(Constants.APPLICATION_X_SNAPPY_FRAMED);
@Override
public String getName() {
return CODEC_TYPE_SNAPPY.getName();
}

@Override
public CodecType getCodecType() {
return CODEC_TYPE_SNAPPY;
}

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
int capacity = Snappy.maxCompressedLength(data.remaining());
byte[] encoded = new byte[capacity];

int size = Snappy.compress(data.array(), data.arrayOffset() + data.position(),
shrids marked this conversation as resolved.
Show resolved Hide resolved
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
data.remaining(), encoded, 0);
bos.write(encoded, 0, size);
}

@Override
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException {
ByteBuffer decoded = ByteBuffer.allocate(Snappy.uncompressedLength(data.array(), data.arrayOffset() + data.position(),
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
data.remaining()));
Snappy.uncompress(data.array(), data.arrayOffset() + data.position(),
data.remaining(), decoded.array(), 0);
return decoded;
}
}

static class Constants {
static final Noop NOOP = new Noop();
static final GZipCodec GZIP_CODEC = new GZipCodec();
static final SnappyCodec SNAPPY_CODEC = new SnappyCodec();
static final String NONE = "";
static final String APPLICATION_X_GZIP = "application/x-gzip";
static final String APPLICATION_X_SNAPPY_FRAMED = "application/x-snappy-framed";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;

/**
* Decoder interface that defines method to decode data.
*/
@FunctionalInterface
public interface Decoder {
/**
* Implementation should decode the remaining bytes in the buffer and return a new ByteBuffer that includes
* the decoded data at its current position.
*
* @param data encoded ByteBuffer to decode.
* @param codecProperties codec properties.
* @return decoded ByteBuffer with position set to the start of decoded data.
* @throws IOException can be thrown while reading from or writing to byte buffers.
*/
ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.schemaregistry.codec;

import io.pravega.schemaregistry.contract.data.CodecType;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* Defines method to encode data.
*/
public interface Encoder {
/**
* Codec type for the encoder.
*
* @return Codec Type for the encoder.
*/
CodecType getCodecType();

/**
* Implementation should encode the remaining bytes in the buffer and return a new ByteBuffer that includes
* the encoded data at its current position.
*
* @param data ByteBuffer to encode.
* @param outputStream ByteArrayOutputStream where the encoded data should be written.
* @throws IOException IOException can be thrown while reading from or writing to byte buffers.
*/
void encode(ByteBuffer data, ByteArrayOutputStream outputStream) throws IOException;
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
}
Loading