diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java index cabcf7b9df7c1..8374dd24bf7de 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaConnectData.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -34,16 +35,13 @@ import org.apache.kafka.connect.errors.DataException; import org.apache.pulsar.client.api.schema.GenericRecord; +@Slf4j public class KafkaConnectData { public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) { if (kafkaSchema == null) { return nativeObject; } - if (nativeObject == null) { - return defaultOrThrow(kafkaSchema); - } - if (nativeObject instanceof JsonNode) { JsonNode node = (JsonNode) nativeObject; return jsonAsConnectData(node, kafkaSchema); @@ -56,6 +54,73 @@ public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema); } + return castToKafkaSchema(nativeObject, kafkaSchema); + } + + public static Object castToKafkaSchema(Object nativeObject, Schema kafkaSchema) { + if (nativeObject == null) { + return defaultOrThrow(kafkaSchema); + } + + if (nativeObject instanceof Number) { + // This is needed in case + // jackson decided to fit value into some other type internally + // (e.g. Double instead of Float). + // Kafka's ConnectSchema expects exact type + // https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L47-L71 + Number num = (Number) nativeObject; + switch (kafkaSchema.type()) { + case INT8: + if (!(nativeObject instanceof Byte)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Byte", nativeObject.getClass()); + } + return num.byteValue(); + } + break; + case INT16: + if (!(nativeObject instanceof Short)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Short", nativeObject.getClass()); + } + return num.shortValue(); + } + break; + case INT32: + if (!(nativeObject instanceof Integer)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Integer", nativeObject.getClass()); + } + return num.intValue(); + } + break; + case INT64: + if (!(nativeObject instanceof Long)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Long", nativeObject.getClass()); + } + return num.longValue(); + } + break; + case FLOAT32: + if (!(nativeObject instanceof Float)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Float", nativeObject.getClass()); + } + return num.floatValue(); + } + break; + case FLOAT64: + if (!(nativeObject instanceof Double)) { + if (log.isDebugEnabled()) { + log.debug("nativeObject of type {} converted to Double", nativeObject.getClass()); + } + return num.doubleValue(); + } + break; + } + } + return nativeObject; } @@ -106,9 +171,9 @@ static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) { case BOOLEAN: return jsonNode.booleanValue(); case NUMBER: - jsonNode.doubleValue(); + return jsonNode.doubleValue(); case STRING: - jsonNode.textValue(); + return jsonNode.textValue(); default: throw new DataException("Don't know how to convert " + jsonNode + " to Connect data (schema is null)."); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index c0d150e6b051f..1b767e22b5216 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -337,8 +337,28 @@ private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected assertEquals(expectedKeySchema, result.get("keySchema")); assertEquals(expectedSchema, result.get("valueSchema")); - SinkRecord sinkRecord = sink.toSinkRecord(record); - return sinkRecord; + if (schema.getSchemaInfo().getType().isPrimitive()) { + // to test cast of primitive values + Message msgOut = mock(MessageImpl.class); + when(msgOut.getValue()).thenReturn(getGenericRecord(result.get("value"), schema)); + when(msgOut.getKey()).thenReturn(result.get("key").toString()); + when(msgOut.hasKey()).thenReturn(true); + when(msgOut.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + Record recordOut = PulsarRecord.builder() + .topicName("fake-topic") + .message(msgOut) + .schema(schema) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .build(); + + SinkRecord sinkRecord = sink.toSinkRecord(recordOut); + return sinkRecord; + } else { + SinkRecord sinkRecord = sink.toSinkRecord(record); + return sinkRecord; + } } private GenericRecord getGenericRecord(Object value, Schema schema) { @@ -364,71 +384,135 @@ private GenericRecord getGenericRecord(Object value, Schema schema) { return rec; } + + @Test + public void genericRecordCastTest() throws Exception { + props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName()); + + KafkaConnectSink sink = new KafkaConnectSink(); + sink.open(props, context); + + AvroSchema pulsarAvroSchema + = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class); + + final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema()); + // schema type INT32 + obj.put("field1", (byte)10); + // schema type STRING + obj.put("field2", "test"); + // schema type INT64 + obj.put("field3", (short)100); + + final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema); + Message msg = mock(MessageImpl.class); + when(msg.getValue()).thenReturn(rec); + when(msg.getKey()).thenReturn("key"); + when(msg.hasKey()).thenReturn(true); + when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0)); + + final AtomicInteger status = new AtomicInteger(0); + Record record = PulsarRecord.builder() + .topicName("fake-topic") + .message(msg) + .schema(pulsarAvroSchema) + .ackFunction(status::incrementAndGet) + .failFunction(status::decrementAndGet) + .build(); + + SinkRecord sinkRecord = sink.toSinkRecord(record); + + Struct out = (Struct) sinkRecord.value(); + Assert.assertEquals(out.get("field1").getClass(), Integer.class); + Assert.assertEquals(out.get("field2").getClass(), String.class); + Assert.assertEquals(out.get("field3").getClass(), Long.class); + + Assert.assertEquals(out.get("field1"), 10); + Assert.assertEquals(out.get("field2"), "test"); + Assert.assertEquals(out.get("field3"), 100L); + + sink.close(); + } + @Test public void bytesRecordSchemaTest() throws Exception { byte[] in = "val".getBytes(StandardCharsets.US_ASCII); SinkRecord sinkRecord = recordSchemaTest(in, Schema.BYTES, "val", "BYTES"); - byte[] out = (byte[]) sinkRecord.value(); - Assert.assertEquals(out, in); + // test/mock writes it as string + Assert.assertEquals(sinkRecord.value(), "val"); } @Test public void stringRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest("val", Schema.STRING, "val", "STRING"); - String out = (String) sinkRecord.value(); - Assert.assertEquals(out, "val"); + Assert.assertEquals(sinkRecord.value().getClass(), String.class); + Assert.assertEquals(sinkRecord.value(), "val"); } @Test public void booleanRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN"); - boolean out = (boolean) sinkRecord.value(); - Assert.assertEquals(out, true); + Assert.assertEquals(sinkRecord.value().getClass(), Boolean.class); + Assert.assertEquals(sinkRecord.value(), true); } @Test public void byteRecordSchemaTest() throws Exception { // int 1 is coming back from ObjectMapper SinkRecord sinkRecord = recordSchemaTest((byte)1, Schema.INT8, 1, "INT8"); - byte out = (byte) sinkRecord.value(); - Assert.assertEquals(out, 1); + Assert.assertEquals(sinkRecord.value().getClass(), Byte.class); + Assert.assertEquals(sinkRecord.value(), (byte)1); } @Test public void shortRecordSchemaTest() throws Exception { // int 1 is coming back from ObjectMapper SinkRecord sinkRecord = recordSchemaTest((short)1, Schema.INT16, 1, "INT16"); - short out = (short) sinkRecord.value(); - Assert.assertEquals(out, 1); + Assert.assertEquals(sinkRecord.value().getClass(), Short.class); + Assert.assertEquals(sinkRecord.value(), (short)1); } @Test public void integerRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32"); - int out = (int) sinkRecord.value(); - Assert.assertEquals(out, Integer.MAX_VALUE); + Assert.assertEquals(sinkRecord.value().getClass(), Integer.class); + Assert.assertEquals(sinkRecord.value(), Integer.MAX_VALUE); } @Test public void longRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64"); - long out = (long) sinkRecord.value(); - Assert.assertEquals(out, Long.MAX_VALUE); + Assert.assertEquals(sinkRecord.value().getClass(), Long.class); + Assert.assertEquals(sinkRecord.value(), Long.MAX_VALUE); + } + + @Test + public void longRecordSchemaTestCast() throws Exception { + // int 1 is coming from ObjectMapper, expect Long (as in schema) from sinkRecord + SinkRecord sinkRecord = recordSchemaTest(1L, Schema.INT64, 1, "INT64"); + Assert.assertEquals(sinkRecord.value().getClass(), Long.class); + Assert.assertEquals(sinkRecord.value(), 1L); } @Test public void floatRecordSchemaTest() throws Exception { - // 1.0d is coming back from ObjectMapper + // 1.0d is coming back from ObjectMapper, expect Float (as in schema) from sinkRecord SinkRecord sinkRecord = recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32"); - float out = (float) sinkRecord.value(); - Assert.assertEquals(out, 1.0d); + Assert.assertEquals(sinkRecord.value().getClass(), Float.class); + Assert.assertEquals(sinkRecord.value(), 1.0f); } @Test public void doubleRecordSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64"); - double out = (double) sinkRecord.value(); - Assert.assertEquals(out, Double.MAX_VALUE); + Assert.assertEquals(sinkRecord.value().getClass(), Double.class); + Assert.assertEquals(sinkRecord.value(), Double.MAX_VALUE); + } + + @Test + public void doubleRecordSchemaTestCast() throws Exception { + SinkRecord sinkRecord = recordSchemaTest(1.0d, Schema.DOUBLE, 1.0d, "FLOAT64"); + Assert.assertEquals(sinkRecord.value().getClass(), Double.class); + Assert.assertEquals(sinkRecord.value(), 1.0d); } @Test @@ -455,9 +539,12 @@ public void jsonSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(jsonNode, jsonSchema, expected, "STRUCT"); Struct out = (Struct) sinkRecord.value(); - Assert.assertEquals((int)out.get("field1"), 10); - Assert.assertEquals((String)out.get("field2"), "test"); - Assert.assertEquals((long)out.get("field3"), 100L); + Assert.assertEquals(out.get("field1").getClass(), Integer.class); + Assert.assertEquals(out.get("field1"), 10); + Assert.assertEquals(out.get("field2").getClass(), String.class); + Assert.assertEquals(out.get("field2"), "test"); + Assert.assertEquals(out.get("field3").getClass(), Long.class); + Assert.assertEquals(out.get("field3"), 100L); } @Test @@ -479,9 +566,9 @@ public void avroSchemaTest() throws Exception { SinkRecord sinkRecord = recordSchemaTest(obj, pulsarAvroSchema, expected, "STRUCT"); Struct out = (Struct) sinkRecord.value(); - Assert.assertEquals((int)out.get("field1"), 10); - Assert.assertEquals((String)out.get("field2"), "test"); - Assert.assertEquals((long)out.get("field3"), 100L); + Assert.assertEquals(out.get("field1"), 10); + Assert.assertEquals(out.get("field2"), "test"); + Assert.assertEquals(out.get("field3"), 100L); } @Test diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java index 9075dd9c3d3bd..60caa2bbe81d8 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java @@ -29,9 +29,11 @@ import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; +import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData; import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema; import org.testng.annotations.Test; +import java.math.BigInteger; import java.util.List; import static org.testng.Assert.assertEquals; @@ -167,6 +169,37 @@ public void jsonSchemaTest() { } } + @Test + public void castToKafkaSchemaTest() { + assertEquals(Byte.class, + KafkaConnectData.castToKafkaSchema(100L, + org.apache.kafka.connect.data.Schema.INT8_SCHEMA).getClass()); + + assertEquals(Short.class, + KafkaConnectData.castToKafkaSchema(100.0d, + org.apache.kafka.connect.data.Schema.INT16_SCHEMA).getClass()); + + assertEquals(Integer.class, + KafkaConnectData.castToKafkaSchema((byte)5, + org.apache.kafka.connect.data.Schema.INT32_SCHEMA).getClass()); + + assertEquals(Long.class, + KafkaConnectData.castToKafkaSchema((short)5, + org.apache.kafka.connect.data.Schema.INT64_SCHEMA).getClass()); + + assertEquals(Float.class, + KafkaConnectData.castToKafkaSchema(1.0d, + org.apache.kafka.connect.data.Schema.FLOAT32_SCHEMA).getClass()); + + assertEquals(Double.class, + KafkaConnectData.castToKafkaSchema(1.5f, + org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass()); + + assertEquals(Double.class, + KafkaConnectData.castToKafkaSchema(new BigInteger("100"), + org.apache.kafka.connect.data.Schema.FLOAT64_SCHEMA).getClass()); + } + @Test public void dateSchemaTest() { org.apache.kafka.connect.data.Schema kafkaSchema =