Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 40: Serializers Implementation for Schema Registry #41

Merged
merged 100 commits into from
Jul 21, 2020

Conversation

shiveshr
Copy link
Contributor

@shiveshr shiveshr commented Jun 25, 2020

Change log description
Serializers implementation for schema registry.
Refer to https://github.com/pravega/schema-registry/wiki/Sample-Usage to see how the usage for pravega applications will look like.

Purpose of the change
Fixes #40

What the code does
The SerializerFactory.java implements several serializers to be used in pravega applications.
It provides both generic and specific deserialization for avro, protobuf and json.
It also provides custom serialization support.

Config:
The serializer requires a SerializerConfig. At a minimum config takes Schema registry client config and group Id.
Users could also build the SerializerConfig with "create group" and "register Schema" options which will automatically create the group and register the schema before using it.

Serializers:
Each registry aware serializer derives from AbstractSerializer and AbstractDeserializer respectively.
These classes implement the core logic of talking to the registry service.
All format specific serializers and deserializers derive from these classes.
We provide specific (typed) and generic deserializers for protobuf, json and avro.
Json also has a deserialize to String deserializer.
We also provide multiplexed serializers and deserializers for all of the above formats.
There is a provision for users to supply their custom serializers and deserializers too.
And there are multi format deserializers which are capable of deserializing objects serialized with different formats into format specific generic objects.
There is a multi format Json String deserializer which deserializes protbuf, json and avro into json strings.

Cache:
Serializer factory also supplies an encoding cache to each serializer/deserializer it creates so that they dont have to talk to registry service for an encoding id they have already seen.

Codecs:
A codecFactory class is included which provides implementation for snappy and gzip codecs.
Schema registry config can optionally be built with encoders for serializers and decoders for deserializers.
If an encoder is supplied, all events will be encoded using the encoder after serializing.
0 or more decoders could be supplied to the Deserializer. The serializer factory, before creating the deserializer, talks with the registry service to get the list of codectypes registered with the service. If supplied codecTypes do not match the registered codecTypes, config setting fail on codectype mismatch could determine whether the deserializer initialization would fail or succeed.

How to verify it
Unit tests added.

shiveshr added 30 commits June 7, 2020 19:23
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Copy link
Contributor

@fpj fpj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks very nice, Shivesh. I have left a few minor comments. The only real concern I have is the split of serializer into different artifacts.

dependencies {
compile project(':common')
compile project(':client')
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it require a change to SerializerFactory? I'm concerned that this could be an API change, and even though we are making this beta, this is an anticipated change, it is not something that we learned later. If we can do it now, I'd say it is better.

Signed-off-by: Shivesh Ranjan <[email protected]>
Copy link
Contributor

@shrids shrids left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Completed one more pass of the PR. No new comments from my end.

Signed-off-by: Shivesh Ranjan <[email protected]>

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
try (GZIPOutputStream gzipOS = new GZIPOutputStream(bos)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to call close on the output stream. I am not sure we want that.
Should these codecs be "stackable"? IE could there be a Base64 codec which wrapps any one of these compression codecs.
If so then closing the output stream should be left to the caller.

Copy link
Contributor Author

@shiveshr shiveshr Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to call close on the output stream. I am not sure we want that. Should these codecs be "stackable"?

The codecs are not stackable.
However, why the close is inconsequential here is for two reasons:

  1. Encode is the last operation we do on the outputstream.
  2. we use ByteArrayOutputStream where close method is a no-op.

So calling close on bos is inconsequential and the Serializer does not impose any restriction. However, just from the sanctity of contract perspective, i will update the javadoc to illustrate explicitly that it is inconsequential if users close the outputstream.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this me we are precluding support for base64?
for pravega, maybe it never makes sense, because we do the decode before handing the data to user code (assuming both ends use this client). But is this intended to be used in any other contexts?

Copy link
Contributor Author

@shiveshr shiveshr Jul 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this me we are precluding support for base64?

No.
theoretically a user could create a codec for encoding and decoding with base64.

But is this intended to be used in any other contexts?
these serializers are specific to pravega.

Service currently generates encoding id for a single codec + schema pair.
If users want to stack multiple encodings, presently they would create a new codec, register its name, and then in the implementation for that codec they would perform multiple encodings.

for example:

compressAndBase64Encoder ==>
void encode(bytes, outputstream) {
   b1 = compress(bytes)
   b2 = base64encode(b1)
   outputstream.write(b2)
}

bytes decode(bytes) {
   b1 = base64decode(bytes)
   b2 = uncompress(b1)
   return b2
}

If we want we could change this in future to allow for an ordered list of "codecs" for generating an encoding id. Presently that is not the case though.
I have created an issue to explore the possibilities #56

@Ranganaths8 Ranganaths8 requested a review from tkaitchuck July 15, 2020 04:40
Signed-off-by: Shivesh Ranjan <[email protected]>
dependencies {
compile project(':common')
compile project(':client')
compile group: 'org.apache.avro', name: 'avro', version: avroVersion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If applications are supposed to use the specific modules, why do we need the base module? If I understand correctly, the base module is a level of indirection to the specific modules.


@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) throws IOException {
try (GZIPOutputStream gzipOS = new GZIPOutputStream(bos)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this me we are precluding support for base64?
for pravega, maybe it never makes sense, because we do the decode before handing the data to user code (assuming both ends use this client). But is this intended to be used in any other contexts?

shiveshr added 5 commits July 15, 2020 21:34
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
Signed-off-by: Shivesh Ranjan <[email protected]>
shiveshr added 2 commits July 17, 2020 01:06
Signed-off-by: Shivesh Ranjan <[email protected]>
@@ -53,6 +53,13 @@ public CodecType getCodecType() {

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) {
if (data.hasArray()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is repeated in a few places, it may be worth making a utility function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created issue #70

if (this.encodeHeader) {
SchemaInfo writerSchema = null;
ByteBuffer decoded;
if (skipHeaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not clear on why one would set encodeHeader to true and skipHeader to true as opposed to setting encodeHeader to false. It looks like in both cases it means that writerSchema is null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip header is only an optimization flag used during deserialization.
It is used to tell the deserializer to skip the encoded header and not to talk to the service to resolve it to the write time schema.. instead use the supplied read time schema to read into.

This is relevant for protobuf and json typed deserializers where user supplies a read time schema to read into.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Serializers Implementation for Schema Registry for usage in Pravega Applications
4 participants