-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue 40: Serializers Implementation for Schema Registry #41
Changes from 96 commits
76d99a3
b634867
a0876cc
05cc270
8ee1fef
193776d
600ff74
be5115e
409c106
88af2e7
b1a8894
1961ba1
58e7f65
026b725
7027d88
4be3f40
cc223ec
a0a2db7
8a8c872
3f64c11
fba77f6
a449b4b
22a0a0d
9a50464
8cc3091
d063e7c
5a7d639
a9cca3a
0b6230a
911f793
7ff74af
c9f9d3f
5d91574
7c30b4f
98a9c6b
dcff47c
28660e4
53b8988
8e2c704
ce1aa98
151b808
bffd0f7
26e34f1
049fb6d
910f870
d690a21
a44c0fb
a82ee56
7b34571
48da360
72919bb
48e54e9
7c9ba8e
214ab54
2241865
456e6bd
9741bd5
6f7f4d5
87c9ebc
f8fd8ad
c98ee7b
c3789ca
32933ba
d8b0675
322d3c0
a586f4f
cdc0c0c
8693357
223df77
8c69b3f
572f34f
cf82e07
c4a732b
5daaeb4
eedee88
b63a8a4
23e6470
4925759
1dafddb
53b8f9c
a744198
c6a02c8
bf0dfb4
d70ba7e
4117557
0f770fa
9823974
7ee6fe0
ff47289
6c3fd87
2d6711d
ade09af
9cbaaae
c0e57be
174c4e6
fe0974f
d464b6b
9137aa6
ed740ea
a8fb0dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/** | ||
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.pravega.schemaregistry.codec; | ||
|
||
import io.pravega.schemaregistry.contract.data.CodecType; | ||
import io.pravega.schemaregistry.contract.data.EncodingInfo; | ||
|
||
/** | ||
* Codec interface extends {@link Encoder} and {@link Decoder} interfaces that defines methods to encode and decode | ||
* data. Encoder interface takes a codec type and encoding function. Decoder interface defines a decoding function. | ||
*/ | ||
public interface Codec extends Encoder, Decoder { | ||
/** | ||
* Name identifying the Codec Type. | ||
* This name should be same as the {@link CodecType#getName()} that is registered for the group in schema registry | ||
* service. | ||
* The deserializers will find the decoder for the encoded data from {@link EncodingInfo#getCodecType()} by matching | ||
* the name. | ||
* | ||
* @return Name of the codec. | ||
*/ | ||
String getName(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
/** | ||
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.pravega.schemaregistry.codec; | ||
|
||
import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; | ||
import io.pravega.schemaregistry.contract.data.CodecType; | ||
import lombok.Getter; | ||
import org.apache.commons.io.IOUtils; | ||
import org.xerial.snappy.Snappy; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.ByteBuffer; | ||
import java.util.Map; | ||
import java.util.zip.GZIPInputStream; | ||
import java.util.zip.GZIPOutputStream; | ||
|
||
/** | ||
* Utility class for creating codecs for none, snappy or gzip. | ||
*/ | ||
public enum Codecs { | ||
fpj marked this conversation as resolved.
Show resolved
Hide resolved
|
||
None(Constants.NOOP), | ||
GzipCompressor(Constants.GZIP_CODEC), | ||
SnappyCompressor(Constants.SNAPPY_CODEC); | ||
|
||
@Getter | ||
private final Codec codec; | ||
|
||
Codecs(Codec codec) { | ||
this.codec = codec; | ||
} | ||
|
||
private static class Noop implements Codec { | ||
private static final CodecType CODEC_TYPE_NONE = new CodecType(Constants.NONE); | ||
|
||
@Override | ||
public String getName() { | ||
return CODEC_TYPE_NONE.getName(); | ||
} | ||
|
||
@Override | ||
public CodecType getCodecType() { | ||
return CODEC_TYPE_NONE; | ||
} | ||
|
||
@Override | ||
public void encode(ByteBuffer data, ByteArrayOutputStream bos) { | ||
shiveshr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (data.hasArray()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This pattern is repeated in a few places, it may be worth making a utility function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. created issue #70 |
||
bos.write(data.array(), data.arrayOffset() + data.position(), data.remaining()); | ||
} else { | ||
byte[] b = new byte[data.remaining()]; | ||
data.get(b); | ||
bos.write(b, 0, b.length); | ||
} | ||
} | ||
|
||
@Override | ||
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) { | ||
return data; | ||
} | ||
} | ||
|
||
private static class GZipCodec implements Codec { | ||
private static final CodecType CODEC_TYPE_GZIP = new CodecType(Constants.APPLICATION_X_GZIP); | ||
@Override | ||
public String getName() { | ||
return CODEC_TYPE_GZIP.getName(); | ||
} | ||
|
||
@Override | ||
public CodecType getCodecType() { | ||
return CODEC_TYPE_GZIP; | ||
} | ||
|
||
@Override | ||
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException { | ||
byte[] b = data.hasArray() ? data.array() : getBytes(data); | ||
int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0; | ||
try (GZIPOutputStream gzipOS = new GZIPOutputStream(bos)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is going to call close on the output stream. I am not sure we want that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The codecs are not stackable.
So calling close on bos is inconsequential and the Serializer does not impose any restriction. However, just from the sanctity of contract perspective, i will update the javadoc to illustrate explicitly that it is inconsequential if users close the outputstream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this me we are precluding support for base64? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No.
Service currently generates encoding id for a single codec + schema pair. for example:
If we want we could change this in future to allow for an ordered list of "codecs" for generating an encoding id. Presently that is not the case though. |
||
gzipOS.write(b, offset, data.remaining()); | ||
} | ||
} | ||
|
||
@Override | ||
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException { | ||
InputStream bis = new ByteBufferBackedInputStream(data); | ||
shiveshr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return ByteBuffer.wrap(IOUtils.toByteArray(new GZIPInputStream(bis))); | ||
} | ||
} | ||
|
||
private static byte[] getBytes(ByteBuffer data) { | ||
byte[] b = new byte[data.remaining()]; | ||
data.get(b); | ||
return b; | ||
} | ||
|
||
private static class SnappyCodec implements Codec { | ||
private static final CodecType CODEC_TYPE_SNAPPY = new CodecType(Constants.APPLICATION_X_SNAPPY_FRAMED); | ||
@Override | ||
public String getName() { | ||
return CODEC_TYPE_SNAPPY.getName(); | ||
} | ||
|
||
@Override | ||
public CodecType getCodecType() { | ||
return CODEC_TYPE_SNAPPY; | ||
} | ||
|
||
@Override | ||
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException { | ||
int capacity = Snappy.maxCompressedLength(data.remaining()); | ||
byte[] encoded = new byte[capacity]; | ||
|
||
byte[] b = data.hasArray() ? data.array() : getBytes(data); | ||
int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0; | ||
int size = Snappy.compress(b, offset, data.remaining(), encoded, 0); | ||
bos.write(encoded, 0, size); | ||
} | ||
|
||
@Override | ||
public ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException { | ||
byte[] b = data.hasArray() ? data.array() : getBytes(data); | ||
int offset = data.hasArray() ? data.arrayOffset() + data.position() : 0; | ||
|
||
ByteBuffer decoded = ByteBuffer.allocate(Snappy.uncompressedLength(b, offset, data.remaining())); | ||
Snappy.uncompress(b, offset, data.remaining(), decoded.array(), 0); | ||
return decoded; | ||
} | ||
} | ||
|
||
static class Constants { | ||
static final Noop NOOP = new Noop(); | ||
static final GZipCodec GZIP_CODEC = new GZipCodec(); | ||
static final SnappyCodec SNAPPY_CODEC = new SnappyCodec(); | ||
static final String NONE = ""; | ||
static final String APPLICATION_X_GZIP = "application/x-gzip"; | ||
static final String APPLICATION_X_SNAPPY_FRAMED = "application/x-snappy-framed"; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/** | ||
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.pravega.schemaregistry.codec; | ||
|
||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.util.Map; | ||
|
||
/** | ||
* Decoder interface that defines method to decode data. | ||
*/ | ||
@FunctionalInterface | ||
public interface Decoder { | ||
/** | ||
* Implementation should decode the remaining bytes in the buffer and return a new ByteBuffer that includes | ||
* the decoded data at its current position. | ||
* | ||
* @param data encoded ByteBuffer to decode. | ||
* @param codecProperties codec properties. | ||
* @return decoded ByteBuffer with position set to the start of decoded data. | ||
* @throws IOException can be thrown while reading from or writing to byte buffers. | ||
*/ | ||
ByteBuffer decode(ByteBuffer data, Map<String, String> codecProperties) throws IOException; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/** | ||
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
package io.pravega.schemaregistry.codec; | ||
|
||
import io.pravega.schemaregistry.contract.data.CodecType; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* Defines method to encode data. | ||
*/ | ||
public interface Encoder { | ||
/** | ||
* Codec type for the encoder. | ||
* | ||
* @return Codec Type for the encoder. | ||
*/ | ||
CodecType getCodecType(); | ||
|
||
/** | ||
* Implementation should encode the remaining bytes in the buffer and return a new ByteBuffer that includes | ||
* the encoded data at its current position. | ||
* | ||
* The implementation can optionally call flush or close on outputstream with no consequence. | ||
* | ||
* @param data ByteBuffer to encode. | ||
* @param outputStream ByteArrayOutputStream where the encoded data should be written. | ||
* @throws IOException IOException can be thrown while reading from or writing to byte buffers. | ||
*/ | ||
void encode(ByteBuffer data, ByteArrayOutputStream outputStream) throws IOException; | ||
shiveshr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should probably be separated into individual targets so that users don't need to depend on more than what they actually need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created issue #45 to tackle this later as it would require considerable refactoring at this time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it require a change to
SerializerFactory
? I'm concerned that this could be an API change, and even though we are making this beta, this is an anticipated change, it is not something that we learned later. If we can do it now, I'd say it is better.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way i am thinking is as follows:
have format specific serializers implemented in format specific module.
So one for protobuf, one for json, one for avro. Each such module will have a factory with only serializers/deserializers for that format.
Then we will have a consolidated one, which is this library and factory class. This is because here we have serializers which are multi format.
So this way users can take dependency on one common module (all type 1 apps that want to work for multiple formats can depend on this one module).
So the user contract, which is the SerializerFactory, doesnt change.
Only the serializer implementations would move internal to their respective modules and this base module would take dependence on those.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If applications are supposed to use the specific modules, why do we need the base module? If I understand correctly, the base module is a level of indirection to the specific modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it serves as two purposes:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this direction, @tkaitchuck needs to resolve this thread if he is satisfied.