Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 40: Serializers Implementation for Schema Registry #41

Merged
merged 100 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
76d99a3
copying over changes to new fork
shiveshr Jun 8, 2020
b634867
copying over to new fork
shiveshr Jun 8, 2020
a0876cc
serializers copying over
shiveshr Jun 8, 2020
05cc270
copying over
shiveshr Jun 8, 2020
8ee1fef
contract
shiveshr Jun 8, 2020
193776d
Merge branch 'contract' into serializers
shiveshr Jun 8, 2020
600ff74
remove </p>
shiveshr Jun 8, 2020
be5115e
Merge branch 'master' into contract
shiveshr Jun 9, 2020
409c106
merge with master
shiveshr Jun 9, 2020
88af2e7
remove </p>
shiveshr Jun 10, 2020
b1a8894
Merge branch 'contract' into serializers
shiveshr Jun 10, 2020
1961ba1
add name util and hash util
shiveshr Jun 10, 2020
58e7f65
removing all unwanted auto generated swagger files
shiveshr Jun 11, 2020
026b725
Merge branch 'contract' into serializers
shiveshr Jun 11, 2020
7027d88
rename
shiveshr Jun 11, 2020
4be3f40
PR comment
shiveshr Jun 12, 2020
cc223ec
Merge branch 'contract' into serializers
shiveshr Jun 12, 2020
a0a2db7
marking interfaces as beta
shiveshr Jun 15, 2020
8a8c872
Merge branch 'contract' into serializers
shiveshr Jun 15, 2020
3f64c11
validation rules of list
shiveshr Jun 16, 2020
fba77f6
Merge branch 'contract' into serializers
shiveshr Jun 16, 2020
a449b4b
remove Validation rules of list method
shiveshr Jun 16, 2020
22a0a0d
Merge branch 'contract' into serializers
shiveshr Jun 16, 2020
9a50464
PR comment
shiveshr Jun 16, 2020
8cc3091
PR comment
shiveshr Jun 17, 2020
d063e7c
Merge branch 'contract' into serializers
shiveshr Jun 17, 2020
5a7d639
Removing schema validation rules
shiveshr Jun 18, 2020
a9cca3a
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
0b6230a
license
shiveshr Jun 18, 2020
911f793
merge with contract
shiveshr Jun 18, 2020
7ff74af
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
c9f9d3f
license
shiveshr Jun 18, 2020
5d91574
javadoc statement
shiveshr Jun 18, 2020
7c30b4f
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
98a9c6b
javadoc
shiveshr Jun 18, 2020
dcff47c
Merge branch 'contract' into serializers
shiveshr Jun 18, 2020
28660e4
deserializerAsJson name fix
shiveshr Jun 18, 2020
53b8988
compatibility enum reversal
shiveshr Jun 19, 2020
8e2c704
Merge branch 'contract' into serializers
shiveshr Jun 19, 2020
ce1aa98
addschema
shiveshr Jun 19, 2020
151b808
Merge branch 'contract' into serializers
shiveshr Jun 19, 2020
bffd0f7
javadoc
shiveshr Jun 22, 2020
26e34f1
Merge branch 'contract' into serializers
shiveshr Jun 22, 2020
049fb6d
group id generator
shiveshr Jun 22, 2020
910f870
add error message in model helper
shiveshr Jun 22, 2020
d690a21
Merge branch 'contract' into serializers
shiveshr Jun 22, 2020
a44c0fb
required
shiveshr Jun 22, 2020
a82ee56
Merge branch 'contract' into serializers
shiveshr Jun 22, 2020
7b34571
Merge branch 'serializers' of https://github.com/shiveshr/schema-regi…
shiveshr Jun 22, 2020
48da360
add new deserializer factory method
shiveshr Jun 23, 2020
72919bb
PR comments
shiveshr Jun 24, 2020
48e54e9
Merge branch 'contract' into serializers
shiveshr Jun 24, 2020
7c9ba8e
avro reflect deserializer
shiveshr Jun 25, 2020
214ab54
add test for avro reflect deserializer
shiveshr Jun 25, 2020
2241865
javadoc
shiveshr Jun 25, 2020
456e6bd
Merge branch 'contract' into serializers
shiveshr Jun 25, 2020
9741bd5
Merge branch 'master' into contract
shiveshr Jun 25, 2020
6f7f4d5
Merge branch 'contract' into serializers
shiveshr Jun 25, 2020
87c9ebc
make cache package private
shiveshr Jun 25, 2020
f8fd8ad
javadocs
shiveshr Jun 25, 2020
c98ee7b
remove the hashutil from this branch
shiveshr Jun 25, 2020
c3789ca
minor imp
shiveshr Jun 25, 2020
32933ba
use full name
shiveshr Jun 28, 2020
d8b0675
ofRecord in avro schema
shiveshr Jun 28, 2020
322d3c0
exact type to message match for protobuf generic deserializer
shiveshr Jun 29, 2020
a586f4f
generalize the multiformat deserialization
shiveshr Jun 30, 2020
cdc0c0c
revert rename of json generic object
shiveshr Jun 30, 2020
8693357
refactoring serializer factory into multiple classes for readability
shiveshr Jul 2, 2020
223df77
bug fixes
shiveshr Jul 3, 2020
8c69b3f
PR comments
shiveshr Jul 3, 2020
572f34f
javadoc
shiveshr Jul 5, 2020
cf82e07
encode header as a config param
shiveshr Jul 8, 2020
c4a732b
review comments
shiveshr Jul 8, 2020
5daaeb4
temp
shiveshr Jul 8, 2020
eedee88
merge with issue 46
shiveshr Jul 8, 2020
b63a8a4
merge with master
shiveshr Jul 8, 2020
23e6470
rename variable
shiveshr Jul 8, 2020
4925759
remove validate object
shiveshr Jul 8, 2020
1dafddb
ProtobufSchema
shiveshr Jul 8, 2020
53b8f9c
minor refactoring
shiveshr Jul 10, 2020
a744198
PR comments
shiveshr Jul 10, 2020
c6a02c8
fix
shiveshr Jul 10, 2020
bf0dfb4
PR comments
shiveshr Jul 12, 2020
d70ba7e
merge with master
shiveshr Jul 13, 2020
4117557
PR comment
shiveshr Jul 13, 2020
0f770fa
Merge branch 'master' into serializers
shiveshr Jul 13, 2020
9823974
PR comments
shiveshr Jul 13, 2020
7ee6fe0
PR comments
shiveshr Jul 14, 2020
ff47289
PR comments
shiveshr Jul 14, 2020
6c3fd87
PR comments
shiveshr Jul 15, 2020
2d6711d
adding avro protobuf and json creator methods in WithSchema
shiveshr Jul 15, 2020
ade09af
PR comments, javadoc fix, and return schema string instead of JsonSch…
shiveshr Jul 16, 2020
9cbaaae
checkstyle
shiveshr Jul 16, 2020
c0e57be
PR comment
shiveshr Jul 16, 2020
174c4e6
PR comments
shiveshr Jul 16, 2020
fe0974f
revert rename of registry serializer factory
shiveshr Jul 16, 2020
d464b6b
PR comment - generic T on JSONSchema
shiveshr Jul 17, 2020
9137aa6
Merge with master, use everit json schema
shiveshr Jul 17, 2020
ed740ea
Merge with master
shiveshr Jul 17, 2020
a8fb0dd
PR comments, javadoc corrections, test addition
shiveshr Jul 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ project('serializers') {
compile group: 'io.pravega', name: 'pravega-client', version: pravegaVersion
compile group: 'org.xerial.snappy', name: 'snappy-java', version: snappyVersion
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-jsonSchema', version: jacksonVersion
compile group: 'com.github.everit-org.json-schema', name: 'org.everit.json.schema', version: everitVersion
testCompile group: 'org.slf4j', name: 'log4j-over-slf4j', version: slf4jApiVersion
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: qosLogbackVersion
testCompile group: 'io.pravega', name: 'pravega-test-testcommon', version: pravegaVersion
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ gradleSshPluginVersion=2.9.0
guavaVersion=28.1-jre
javaxServletApiVersion=4.0.0
jacksonVersion=2.11.1
everitVersion=1.12.1
javaxwsrsApiVersion=2.1
jaxbVersion=2.3.0
javaxAnnotationVersion=1.3.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ public CodecType getCodecType() {

@Override
public void encode(ByteBuffer data, ByteArrayOutputStream bos) {
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
if (data.hasArray()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is repeated in a few places, it may be worth making a utility function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created issue #70

bos.write(data.array(), data.arrayOffset() + data.position(), data.remaining());
} else {
byte[] b = new byte[data.remaining()];
data.get(b);
bos.write(b, 0, b.length);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.schemaregistry.schemas;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
Expand All @@ -21,6 +22,10 @@
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import lombok.Getter;
import org.apache.avro.specific.SpecificRecordBase;
import org.everit.json.schema.loader.SchemaLoader;
import org.everit.json.schema.loader.SpecificationVersion;
import org.json.JSONObject;
import org.json.JSONTokener;

import java.nio.ByteBuffer;

Expand All @@ -37,30 +42,30 @@ public class JSONSchema<T> implements Schema<T> {
private final Class<T> base;
@Getter
private final Class<? extends T> derived;

@Getter
private final JsonSchema schema;
private final org.everit.json.schema.Schema schema;

private final SchemaInfo schemaInfo;

private JSONSchema(JsonSchema schema, String name, String schemaString, Class<T> derived) {
this(schema, name, schemaString, derived, derived);
private JSONSchema(String name, String schemaString, Class<T> derived) {
this(name, schemaString, derived, derived);
}

private JSONSchema(JsonSchema schema, String name, String schemaString, Class<T> base, Class<? extends T> derived) {
private JSONSchema(String name, String schemaString, Class<T> base, Class<? extends T> derived) {
this.schemaString = schemaString;
this.schemaInfo = new SchemaInfo(name, SerializationFormat.Json, getSchemaBytes(), ImmutableMap.of());
this.base = base;
this.derived = derived;
this.schema = schema;
this.schema = getSchemaObj(schemaString);
}

private JSONSchema(SchemaInfo schemaInfo, JsonSchema schema, String schemaString, Class<T> derived) {
private JSONSchema(SchemaInfo schemaInfo, String schemaString, Class<T> derived) {
this.schemaString = schemaString;
this.schemaInfo = schemaInfo;
this.base = derived;
this.derived = derived;
this.schema = schema;
this.schema = getSchemaObj(schemaString);
}

/**
Expand All @@ -77,8 +82,7 @@ public static <T> JSONSchema<T> of(Class<T> tClass) {
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(OBJECT_MAPPER);
JsonSchema schema = schemaGen.generateSchema(tClass);
shiveshr marked this conversation as resolved.
Show resolved Hide resolved
String schemaString = OBJECT_MAPPER.writeValueAsString(schema);

return new JSONSchema<>(schema, tClass.getName(), schemaString, tClass);
return new JSONSchema<>(tClass.getName(), schemaString, tClass);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Unable to get json schema from the class", e);
}
Expand All @@ -90,36 +94,35 @@ public static <T> JSONSchema<T> of(Class<T> tClass) {
*
* @param type type of object identified by {@link SchemaInfo#getType()}.
* @param schema Schema to use.
* @param tClass class for the type of object
* @param <T> Type of object
* @return Returns an JSONSchema with {@link Object} type.
*/
public static JSONSchema<Object> of(String type, JsonSchema schema) {
public static <T> JSONSchema<T> of(String type, JsonSchema schema, Class<T> tClass) {
Preconditions.checkNotNull(type);
Preconditions.checkNotNull(schema);
try {
String schemaString = OBJECT_MAPPER.writeValueAsString(schema);

return new JSONSchema<>(schema, type, schemaString, Object.class);
return new JSONSchema<>(type, schemaString, tClass);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Unable to get json schema string from the JsonSchema object", e);
}
}

/**
* Method to create a typed JSONSchema of type {@link Object} from the given schema string.
* Method to create a typed JSONSchema of type T from the given schema string.
*
* @param type type of object identified by {@link SchemaInfo#getType()}.
* @param schemaString Schema string to use.
* @param tClass class for the type of object
* @param <T> Type of object
* @return Returns an JSONSchema with {@link Object} type.
*/
public static JSONSchema<Object> of(String type, String schemaString) {
public static <T> JSONSchema<T> of(String type, String schemaString, Class<T> tClass) {
Preconditions.checkNotNull(type, "Type cannot be null.");
Preconditions.checkArgument(!Strings.isNullOrEmpty(schemaString), "Schema String cannot be null or empty.");
try {
JsonSchema schema = OBJECT_MAPPER.readValue(schemaString, JsonSchema.class);
return new JSONSchema<>(schema, type, schemaString, Object.class);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Unable to parse schema string", e);
}
return new JSONSchema<>(type, schemaString, tClass);
}

/**
Expand All @@ -137,33 +140,43 @@ public static <T> JSONSchema<T> ofBaseType(Class<? extends T> tDerived, Class<T>
Preconditions.checkNotNull(tBase);
try {
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(OBJECT_MAPPER);
JsonSchema schema = schemaGen.generateSchema(tDerived);
String schemaString = OBJECT_MAPPER.writeValueAsString(schema);
JsonSchema jsonSchema = schemaGen.generateSchema(tDerived);
String schemaString = OBJECT_MAPPER.writeValueAsString(jsonSchema);

return new JSONSchema<>(schema, tDerived.getName(), schemaString, tBase, tDerived);
return new JSONSchema<>(tDerived.getName(), schemaString, tBase, tDerived);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Unable to get json schema from the class", e);
}
}

/**
* Method to create a typed JSONSchema of type {@link Object} from the given schema.
* Method to create a typed JSONSchema of type {@link JsonNode} from the given schema.
*
* @param schemaInfo Schema info to translate into json schema.
* @return Returns an JSONSchema with {@link Object} type.
* @return Returns an JSONSchema with {@link JsonNode} type.
*/
public static JSONSchema<Object> from(SchemaInfo schemaInfo) {
public static JSONSchema<JsonNode> from(SchemaInfo schemaInfo) {
Preconditions.checkNotNull(schemaInfo);
try {
String schemaString = new String(schemaInfo.getSchemaData().array(), Charsets.UTF_8);
String schemaString = new String(schemaInfo.getSchemaData().array(), Charsets.UTF_8);

JsonSchema schema = OBJECT_MAPPER.readValue(schemaString, JsonSchema.class);
return new JSONSchema<>(schemaInfo, schema, schemaString, Object.class);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Unable to get json schema from schema info", e);
}
return new JSONSchema<>(schemaInfo, schemaString, JsonNode.class);
}

private static org.everit.json.schema.Schema getSchemaObj(String schemaString) {
JSONObject rawSchema = new JSONObject(new JSONTokener(schemaString));
// It will check if the schema has "id" then it is definitely version 4.
// if $schema draft is specified, the schemaloader will automatically use the correct specification version
// however, $schema is not mandatory. So we will check with presence of id and if id is specified with draft 4
// specification, then we use draft 4, else we will use draft 7 as other keywords are added in draft 7.
if (rawSchema.has(SpecificationVersion.DRAFT_4.idKeyword())) {
return SchemaLoader.builder().useDefaults(true).schemaJson(rawSchema)
.build().load().build();
} else {
return SchemaLoader.builder().useDefaults(true).schemaJson(rawSchema).draftV7Support()
.build().load().build();
}
}

private ByteBuffer getSchemaBytes() {
return ByteBuffer.wrap(schemaString.getBytes(Charsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,26 @@ public class ProtobufSchema<T extends Message> implements Schema<T> {
@Getter
private final Class<T> tClass;
@Getter
private final FileDescriptorSet descriptorProto;
private final FileDescriptorSet fileDescriptorSet;

private final SchemaInfo schemaInfo;

private ProtobufSchema(String name, Parser<T> parser, Class<T> tClass, FileDescriptorSet fileDescriptorSet) {
this.parser = parser;
this.tClass = tClass;
this.descriptorProto = fileDescriptorSet;
this.fileDescriptorSet = fileDescriptorSet;
this.schemaInfo = new SchemaInfo(name, SerializationFormat.Protobuf, getSchemaBytes(), ImmutableMap.of());
}

private ProtobufSchema(FileDescriptorSet fileDescriptorSet, SchemaInfo schemaInfo, Class<T> tClass) {
this.parser = null;
this.tClass = null;
this.descriptorProto = fileDescriptorSet;
this.fileDescriptorSet = fileDescriptorSet;
this.schemaInfo = schemaInfo;
}

private ByteBuffer getSchemaBytes() {
return ByteBuffer.wrap(descriptorProto.toByteArray());
return ByteBuffer.wrap(fileDescriptorSet.toByteArray());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

abstract class AbstractSerializer<T> extends BaseSerializer<T> {
private static final byte PROTOCOL = 0x1;
private static final int HEADER_LENGTH = Byte.BYTES + Integer.BYTES;

private final String groupId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ class JsonGenericDeserializer extends AbstractDeserializer<WithSchema<JsonNode>>
super(groupId, client, null, false, decoders, encodingCache, encodeHeader);
this.objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.setVisibility(PropertyAccessor.CREATOR, JsonAutoDetect.Visibility.ANY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ class JsonSerializer<T> extends AbstractSerializer<T> {
super(groupId, client, schema, encoder, registerSchema, encodeHeader);
objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper.setVisibility(PropertyAccessor.CREATOR, JsonAutoDetect.Visibility.ANY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,13 @@ private static AbstractSerializer<Object> getPravegaSerializer(
return new AvroSerializer<>(groupId, schemaRegistryClient,
AvroSchema.from(schemaInfo), config.getEncoder(), config.isRegisterSchema());
case Protobuf:
ProtobufSerializer<?> m = new ProtobufSerializer<>(groupId, schemaRegistryClient,
ProtobufSerializer<?> pSerializer = new ProtobufSerializer<>(groupId, schemaRegistryClient,
ProtobufSchema.from(schemaInfo), config.getEncoder(), config.isRegisterSchema(), config.isWriteEncodingHeader());
return (AbstractSerializer<Object>) m;
return (AbstractSerializer<Object>) pSerializer;
case Json:
return new JsonSerializer<>(groupId, schemaRegistryClient, JSONSchema.from(schemaInfo),
JsonSerializer<?> jsonSerializer = new JsonSerializer<>(groupId, schemaRegistryClient, JSONSchema.from(schemaInfo),
config.getEncoder(), config.isRegisterSchema(), config.isWriteEncodingHeader());
return (AbstractSerializer<Object>) jsonSerializer;
case Custom:
return getCustomSerializer(config, customSerializers, schemaRegistryClient, groupId, schemaInfo);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected DynamicMessage deserialize(InputStream inputStream, SchemaInfo writerS
}

private Descriptors.Descriptor parseSchema(SchemaInfo schemaToUse) {
DescriptorProtos.FileDescriptorSet descriptorSet = ProtobufSchema.from(schemaToUse).getDescriptorProto();
DescriptorProtos.FileDescriptorSet descriptorSet = ProtobufSchema.from(schemaToUse).getFileDescriptorSet();

int count = descriptorSet.getFileCount();
String[] tokens = NameUtil.extractNameAndQualifier(schemaToUse.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ public class SerializerConfig {
private final Decoders decoders;
/**
* Tells the deserializer that if supplied decoder codecTypes do not match group codecTypes then fail and exit upfront.
* This is important when the writers have used a custom codec for which reader should be instantiated with a corresponding
* decoder otherwise it would fail to decode and read the data.
* As an example, if writer applications had implemented a custom encryption encoder which encrypted the data after
* serializing it, then the data will include an encoding id that will be resolved to the schema and the codec type name
* for the encryption codec. If the readers are not provided with a decoder for all data encoded with that codec type,
* it would fail to decode that data. This flag ensures that the readers check retrieve all the registered codec types
* with the registry service and fail if they are not instantiated with decoders for all the registered codec types.
*
* The default value for this is true.
*/
private final boolean failOnCodecMismatch;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@

@Slf4j
public class SerializerFactory {
public static final String PRAVEGA_EVENT_HEADER = "PravegaEventHeader";

// region avro
/**
* Creates a typed avro serializer for the Schema. The serializer implementation returned from this method is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.google.common.base.Preconditions;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.GeneratedMessageV3;
Expand Down Expand Up @@ -127,7 +127,7 @@ public boolean hasProtobufSchema() {
*/
@SuppressWarnings("unchecked")
public DescriptorProtos.FileDescriptorSet getProtobufSchema() {
return ((ProtobufSchema<DynamicMessage>) schema).getDescriptorProto();
return ((ProtobufSchema<DynamicMessage>) schema).getFileDescriptorSet();
}

/**
Expand All @@ -143,11 +143,11 @@ public boolean hasJsonSchema() {
* Json Schema for the underlying deserialized object. This is available if {@link WithSchema#hasJsonSchema()} returns true.
* This means underlying object was serialized as Json.
*
* @return Protobuf {@link JsonSchema} representing the schema for the object.
* @return Json schema String representing the schema for the object.
*/
@SuppressWarnings("unchecked")
public JsonSchema getJsonSchema() {
return ((JSONSchema<Object>) schema).getSchema();
public String getJsonSchema() {
return ((JSONSchema<Object>) schema).getSchemaString();
}

/**
Expand Down Expand Up @@ -213,6 +213,8 @@ private static String toJsonString(SerializationFormat format, Object deserializ
* @return A WithSchema object which has Avro Schema and the corresponding object.
*/
public static <T> WithSchema<T> avro(T object, AvroSchema<T> avroSchema) {
Preconditions.checkNotNull(object, "object cannot be null");
Preconditions.checkNotNull(avroSchema, "schema cannot be null");
return new WithSchema<>(avroSchema.getSchemaInfo(), object, (x, y) -> object);
}

Expand All @@ -225,6 +227,8 @@ public static <T> WithSchema<T> avro(T object, AvroSchema<T> avroSchema) {
* @return A WithSchema object which has Protobuf Schema and the corresponding object.
*/
public static <T extends GeneratedMessageV3> WithSchema<T> proto(T object, ProtobufSchema<T> protobufSchema) {
Preconditions.checkNotNull(object, "object cannot be null");
Preconditions.checkNotNull(protobufSchema, "schema cannot be null");
return new WithSchema<>(protobufSchema.getSchemaInfo(), object, (x, y) -> object);
}

Expand All @@ -237,6 +241,8 @@ public static <T extends GeneratedMessageV3> WithSchema<T> proto(T object, Proto
* @return A WithSchema object which has Json schema and the corresponding object.
*/
public static <T> WithSchema<T> json(T object, JSONSchema<T> jsonSchema) {
Preconditions.checkNotNull(object, "object cannot be null");
Preconditions.checkNotNull(jsonSchema, "schema cannot be null");
return new WithSchema<>(jsonSchema.getSchemaInfo(), object, (x, y) -> object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public void testCodec() throws IOException {
Codec none = Codecs.None.getCodec();
assertEquals(none.getCodecType(), Codecs.None.getCodec().getCodecType());
none.encode(ByteBuffer.wrap(testStringBytes), byteArrayOutputStream);
byteArrayOutputStream.write(testStringBytes);
encoded = ByteBuffer.wrap(byteArrayOutputStream.getData().array(), 0, byteArrayOutputStream.getData().getLength());
assertEquals(encoded.remaining(), testStringBytes.length);
decoded = none.decode(encoded, ImmutableMap.of());
Expand Down
Loading