Skip to content

Commit

Permalink
fix: org.apache.kafka.connect.errors.DataException: Invalid Java obje…
Browse files Browse the repository at this point in the history
…ct for schema with type... (apache#15598)

(cherry picked from commit f90ef9c)
  • Loading branch information
dlg99 authored and nicoloboschi committed May 16, 2022
1 parent 9ff755a commit b0a2a5e
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.pulsar.client.api.schema.GenericRecord;

@Slf4j
public class KafkaConnectData {
public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
if (kafkaSchema == null) {
return nativeObject;
}

if (nativeObject == null) {
return defaultOrThrow(kafkaSchema);
}

if (nativeObject instanceof JsonNode) {
JsonNode node = (JsonNode) nativeObject;
return jsonAsConnectData(node, kafkaSchema);
Expand All @@ -56,6 +54,73 @@ public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema
return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
}

return castToKafkaSchema(nativeObject, kafkaSchema);
}

public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) {
if (nativeObject == null) {
return defaultOrThrow(kafkaSchema);
}

if (nativeObject instanceof Number) {
// This is needed in case
// jackson decided to fit value into some other type internally
// (e.g. Double instead of Float).
// Kafka's ConnectSchema expects exact type
// https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71
Number num = (Number) nativeObject;
switch (kafkaSchema.type()) {
case INT8:
if (!(nativeObject instanceof Byte)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass());
}
return num.byteValue();
}
break;
case INT16:
if (!(nativeObject instanceof Short)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Short", nativeObject.getClass());
}
return num.shortValue();
}
break;
case INT32:
if (!(nativeObject instanceof Integer)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass());
}
return num.intValue();
}
break;
case INT64:
if (!(nativeObject instanceof Long)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Long", nativeObject.getClass());
}
return num.longValue();
}
break;
case FLOAT32:
if (!(nativeObject instanceof Float)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Float", nativeObject.getClass());
}
return num.floatValue();
}
break;
case FLOAT64:
if (!(nativeObject instanceof Double)) {
if (log.isDebugEnabled()) {
log.debug("nativeObject of type {} converted to Double", nativeObject.getClass());
}
return num.doubleValue();
}
break;
}
}

return nativeObject;
}

Expand Down Expand Up @@ -106,9 +171,9 @@ static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
case BOOLEAN:
return jsonNode.booleanValue();
case NUMBER:
jsonNode.doubleValue();
return jsonNode.doubleValue();
case STRING:
jsonNode.textValue();
return jsonNode.textValue();
default:
throw new DataException("Don't know how to convert " + jsonNode
+ " to Connect data (schema is null).");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,28 @@ private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected
assertEquals(expectedKeySchema, result.get("keySchema"));
assertEquals(expectedSchema, result.get("valueSchema"));

SinkRecord sinkRecord = sink.toSinkRecord(record);
return sinkRecord;
if (schema.getSchemaInfo().getType().isPrimitive()) {
// to test cast of primitive values
Message msgOut = mock(MessageImpl.class);
when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"), schema));
when(msgOut.getKey()).thenReturn(result.get("key").toString());
when(msgOut.hasKey()).thenReturn(true);
when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));

Record<GenericObject> recordOut = PulsarRecord.<String>builder()
.topicName("fake-topic")
.message(msgOut)
.schema(schema)
.ackFunction(status::incrementAndGet)
.failFunction(status::decrementAndGet)
.build();

SinkRecord sinkRecord = sink.toSinkRecord(recordOut);
return sinkRecord;
} else {
SinkRecord sinkRecord = sink.toSinkRecord(record);
return sinkRecord;
}
}

private GenericRecord getGenericRecord(Object value, Schema schema) {
Expand All @@ -364,71 +384,135 @@ private GenericRecord getGenericRecord(Object value, Schema schema) {
return rec;
}


@Test
public void genericRecordCastTest() throws Exception {
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());

KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, context);

AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
= AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);

final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
// schema type INT32
obj.put("field1", (byte)10);
// schema type STRING
obj.put("field2", "test");
// schema type INT64
obj.put("field3", (short)100);

final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
Message msg = mock(MessageImpl.class);
when(msg.getValue()).thenReturn(rec);
when(msg.getKey()).thenReturn("key");
when(msg.hasKey()).thenReturn(true);
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));

final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
.topicName("fake-topic")
.message(msg)
.schema(pulsarAvroSchema)
.ackFunction(status::incrementAndGet)
.failFunction(status::decrementAndGet)
.build();

SinkRecord sinkRecord = sink.toSinkRecord(record);

Struct out = (Struct) sinkRecord.value();
Assert.assertEquals(out.get("field1").getClass(), Integer.class);
Assert.assertEquals(out.get("field2").getClass(), String.class);
Assert.assertEquals(out.get("field3").getClass(), Long.class);

Assert.assertEquals(out.get("field1"), 10);
Assert.assertEquals(out.get("field2"), "test");
Assert.assertEquals(out.get("field3"), 100L);

sink.close();
}

@Test
public void bytesRecordSchemaTest() throws Exception {
byte[] in = "val".getBytes(StandardCharsets.US_ASCII);
SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES");
byte[] out = (byte[]) sinkRecord.value();
Assert.assertEquals(out, in);
// test/mock writes it as string
Assert.assertEquals(sinkRecord.value(), "val");
}

@Test
public void stringRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING");
String out = (String) sinkRecord.value();
Assert.assertEquals(out, "val");
Assert.assertEquals(sinkRecord.value().getClass(), String.class);
Assert.assertEquals(sinkRecord.value(), "val");
}

@Test
public void booleanRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
boolean out = (boolean) sinkRecord.value();
Assert.assertEquals(out, true);
Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class);
Assert.assertEquals(sinkRecord.value(), true);
}

@Test
public void byteRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
byte out = (byte) sinkRecord.value();
Assert.assertEquals(out, 1);
Assert.assertEquals(sinkRecord.value().getClass(), Byte.class);
Assert.assertEquals(sinkRecord.value(), (byte)1);
}

@Test
public void shortRecordSchemaTest() throws Exception {
// int 1 is coming back from ObjectMapper
SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
short out = (short) sinkRecord.value();
Assert.assertEquals(out, 1);
Assert.assertEquals(sinkRecord.value().getClass(), Short.class);
Assert.assertEquals(sinkRecord.value(), (short)1);
}

@Test
public void integerRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
int out = (int) sinkRecord.value();
Assert.assertEquals(out, Integer.MAX_VALUE);
Assert.assertEquals(sinkRecord.value().getClass(), Integer.class);
Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE);
}

@Test
public void longRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
long out = (long) sinkRecord.value();
Assert.assertEquals(out, Long.MAX_VALUE);
Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE);
}

@Test
public void longRecordSchemaTestCast() throws Exception {
// int 1 is coming from ObjectMapper, expect Long (as in schema) from sinkRecord
SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64");
Assert.assertEquals(sinkRecord.value().getClass(), Long.class);
Assert.assertEquals(sinkRecord.value(), 1L);
}

@Test
public void floatRecordSchemaTest() throws Exception {
// 1.0d is coming back from ObjectMapper
// 1.0d is coming back from ObjectMapper, expect Float (as in schema) from sinkRecord
SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
float out = (float) sinkRecord.value();
Assert.assertEquals(out, 1.0d);
Assert.assertEquals(sinkRecord.value().getClass(), Float.class);
Assert.assertEquals(sinkRecord.value(), 1.0f);
}

@Test
public void doubleRecordSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
double out = (double) sinkRecord.value();
Assert.assertEquals(out, Double.MAX_VALUE);
Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE);
}

@Test
public void doubleRecordSchemaTestCast() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d, "FLOAT64");
Assert.assertEquals(sinkRecord.value().getClass(), Double.class);
Assert.assertEquals(sinkRecord.value(), 1.0d);
}

@Test
Expand All @@ -455,9 +539,12 @@ public void jsonSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT");

Struct out = (Struct) sinkRecord.value();
Assert.assertEquals((int)out.get("field1"), 10);
Assert.assertEquals((String)out.get("field2"), "test");
Assert.assertEquals((long)out.get("field3"), 100L);
Assert.assertEquals(out.get("field1").getClass(), Integer.class);
Assert.assertEquals(out.get("field1"), 10);
Assert.assertEquals(out.get("field2").getClass(), String.class);
Assert.assertEquals(out.get("field2"), "test");
Assert.assertEquals(out.get("field3").getClass(), Long.class);
Assert.assertEquals(out.get("field3"), 100L);
}

@Test
Expand All @@ -479,9 +566,9 @@ public void avroSchemaTest() throws Exception {
SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT");

Struct out = (Struct) sinkRecord.value();
Assert.assertEquals((int)out.get("field1"), 10);
Assert.assertEquals((String)out.get("field2"), "test");
Assert.assertEquals((long)out.get("field3"), 100L);
Assert.assertEquals(out.get("field1"), 10);
Assert.assertEquals(out.get("field2"), "test");
Assert.assertEquals(out.get("field3"), 100L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.testng.annotations.Test;

import java.math.BigInteger;
import java.util.List;

import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -167,6 +169,37 @@ public void jsonSchemaTest() {
}
}

@Test
public void castToKafkaSchemaTest() {
assertEquals(Byte.class,
KafkaConnectData.castToKafkaSchema(100L,
org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass());

assertEquals(Short.class,
KafkaConnectData.castToKafkaSchema(100.0d,
org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass());

assertEquals(Integer.class,
KafkaConnectData.castToKafkaSchema((byte)5,
org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass());

assertEquals(Long.class,
KafkaConnectData.castToKafkaSchema((short)5,
org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass());

assertEquals(Float.class,
KafkaConnectData.castToKafkaSchema(1.0d,
org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass());

assertEquals(Double.class,
KafkaConnectData.castToKafkaSchema(1.5f,
org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());

assertEquals(Double.class,
KafkaConnectData.castToKafkaSchema(new BigInteger("100"),
org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass());
}

@Test
public void dateSchemaTest() {
org.apache.kafka.connect.data.Schema kafkaSchema =
Expand Down

0 comments on commit b0a2a5e

Please sign in to comment.