diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt new file mode 100644 index 0000000000000..c91b8afb36013 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapper.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +interface AirbyteSchemaIdentityMapper { + fun map(schema: AirbyteType): AirbyteType = + when (schema) { + is NullType -> mapNull(schema) + is StringType -> mapString(schema) + is BooleanType -> mapBoolean(schema) + is IntegerType -> mapInteger(schema) + is NumberType -> mapNumber(schema) + is ArrayType -> mapArray(schema) + is ArrayTypeWithoutSchema -> mapArrayWithoutSchema(schema) + is ObjectType -> mapObject(schema) + is ObjectTypeWithoutSchema -> mapObjectWithoutSchema(schema) + is ObjectTypeWithEmptySchema -> mapObjectWithEmptySchema(schema) + is UnionType -> mapUnion(schema) + is DateType -> mapDate(schema) + is TimeTypeWithTimezone -> mapTimeTypeWithTimezone(schema) + is TimeTypeWithoutTimezone -> mapTimeTypeWithoutTimezone(schema) + is TimestampTypeWithTimezone -> mapTimestampTypeWithTimezone(schema) + is TimestampTypeWithoutTimezone -> mapTimestampTypeWithoutTimezone(schema) + is UnknownType -> mapUnknown(schema) + } + + fun mapNull(schema: NullType): AirbyteType = schema + fun mapString(schema: StringType): AirbyteType = schema + fun mapBoolean(schema: BooleanType): AirbyteType = schema + fun mapInteger(schema: IntegerType): AirbyteType = schema + fun mapNumber(schema: NumberType): AirbyteType = schema + fun mapArray(schema: ArrayType): AirbyteType { + return ArrayType(mapField(schema.items)) + } + fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType = schema + fun mapObject(schema: ObjectType): AirbyteType { + val properties = LinkedHashMap<String, FieldType>() + schema.properties.forEach { (name, field) -> properties[name] = mapField(field) } + return ObjectType(properties) + } + fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema): AirbyteType = schema + fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema): AirbyteType = schema + fun mapUnion(schema: UnionType): AirbyteType { + return UnionType(schema.options.map { map(it) }) + } + fun mapDate(schema: DateType): AirbyteType = schema + fun mapTimeTypeWithTimezone(schema: TimeTypeWithTimezone): AirbyteType = schema + fun mapTimeTypeWithoutTimezone(schema: TimeTypeWithoutTimezone): AirbyteType = schema + fun mapTimestampTypeWithTimezone(schema: TimestampTypeWithTimezone): AirbyteType = schema + fun mapTimestampTypeWithoutTimezone(schema: TimestampTypeWithoutTimezone): AirbyteType = schema + fun mapUnknown(schema: UnknownType): AirbyteType = schema + fun mapField(field: FieldType): FieldType = FieldType(map(field.type), field.nullable) +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt index 7d0ff0ec224c5..5093f05970772 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteType.kt @@ -18,9 +18,13 @@ data object NumberType : AirbyteType data object DateType : AirbyteType -data class TimestampType(val hasTimezone: Boolean) : AirbyteType +data object TimestampTypeWithTimezone : AirbyteType -data class TimeType(val hasTimezone: Boolean) : AirbyteType +data object TimestampTypeWithoutTimezone : AirbyteType + +data object TimeTypeWithTimezone : AirbyteType + +data object TimeTypeWithoutTimezone : AirbyteType data class ArrayType(val items: FieldType) : AirbyteType diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToJsonSchema.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToJsonSchema.kt index 93f2c5b2aa06e..a13be8556b668 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToJsonSchema.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteTypeToJsonSchema.kt @@ -47,21 +47,21 @@ class AirbyteTypeToJsonSchema { unionNode } is DateType -> ofType("string").put("format", "date") - is TimeType -> { + is TimeTypeWithTimezone -> { val timeNode = ofType("string").put("format", "time") - if (airbyteType.hasTimezone) { - timeNode.put("airbyte_type", "time_with_timezone") - } else { - timeNode.put("airbyte_type", "time_without_timezone") - } + timeNode.put("airbyte_type", "time_with_timezone") + } + is TimeTypeWithoutTimezone -> { + val timeNode = ofType("string").put("format", "time") + timeNode.put("airbyte_type", "time_without_timezone") } - is TimestampType -> { + is TimestampTypeWithTimezone -> { val timestampNode = ofType("string").put("format", "date-time") - if (airbyteType.hasTimezone) { - timestampNode.put("airbyte_type", "timestamp_with_timezone") - } else { - timestampNode.put("airbyte_type", "timestamp_without_timezone") - } + timestampNode.put("airbyte_type", "timestamp_with_timezone") + } + is TimestampTypeWithoutTimezone -> { + val timestampNode = ofType("string").put("format", "date-time") + timestampNode.put("airbyte_type", "timestamp_without_timezone") } else -> throw IllegalArgumentException("Unknown type: $airbyteType") } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteType.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteType.kt index b6c67a6ee8a74..2814fc9ffadac 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteType.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteType.kt @@ -62,14 +62,17 @@ class JsonSchemaToAirbyteType { when (schema.get("format")?.asText()) { "date" -> DateType "time" -> - TimeType( - hasTimezone = schema.get("airbyte_type")?.asText() != "time_without_timezone" - ) + if (schema.get("airbyte_type")?.asText() == "time_without_timezone") { + TimeTypeWithoutTimezone + } else { + TimeTypeWithTimezone + } "date-time" -> - TimestampType( - hasTimezone = - schema.get("airbyte_type")?.asText() != "timestamp_without_timezone" - ) + if (schema.get("airbyte_type")?.asText() == "timestamp_without_timezone") { + TimestampTypeWithoutTimezone + } else { + TimestampTypeWithTimezone + } null -> StringType else -> throw IllegalArgumentException( diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValue.kt index d7389dc1ee69a..d84f6b6136fb2 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValue.kt @@ -31,8 +31,10 @@ class JsonToAirbyteValue { is ObjectTypeWithoutSchema, is ObjectTypeWithEmptySchema -> toObjectWithoutSchema(json) is StringType -> StringValue(json.asText()) - is TimeType -> TimeValue(json.asText()) - is TimestampType -> TimestampValue(json.asText()) + is TimeTypeWithTimezone, + is TimeTypeWithoutTimezone -> TimeValue(json.asText()) + is TimestampTypeWithTimezone, + is TimestampTypeWithoutTimezone -> TimestampValue(json.asText()) is UnionType -> toUnion(json, schema.options) is UnknownType -> UnknownValue("From $schema: $json") } @@ -172,8 +174,10 @@ class JsonToAirbyteValue { is ObjectTypeWithoutSchema, is ObjectTypeWithEmptySchema -> json.isObject is StringType -> json.isTextual - is TimeType -> json.isTextual - is TimestampType -> json.isTextual + is TimeTypeWithTimezone, + is TimeTypeWithoutTimezone, + is TimestampTypeWithTimezone, + is TimestampTypeWithoutTimezone -> json.isTextual is UnionType -> schema.options.any { matchesStrictly(it, json) } is UnknownType -> false } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapperTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapperTest.kt new file mode 100644 index 0000000000000..b9a4c680dd752 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteSchemaIdentityMapperTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import io.airbyte.cdk.load.test.util.SchemaTestBuilder +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test + +class AirbyteSchemaIdentityMapperTest { + @Test + fun testIdMapping() { + val (inputSchema, expectedOutput) = + SchemaTestBuilder() + .with(DateType) + .with(StringType) + .with(IntegerType) + .with(BooleanType) + .with(NumberType) + .with(NullType) + .with(ArrayType(FieldType(StringType, true))) + .with(UnionType(listOf(StringType, IntegerType, NullType))) + .withRecord() + .with(TimeTypeWithTimezone) + .with(TimeTypeWithoutTimezone) + .with(TimestampTypeWithTimezone) + .with(TimestampTypeWithoutTimezone) + .withRecord() + .with(ObjectTypeWithoutSchema) + .with(ObjectTypeWithEmptySchema) + .with(ArrayTypeWithoutSchema) + .endRecord() + .endRecord() + .with(NullType) + .build() + + val mapper = object : AirbyteSchemaIdentityMapper {} + Assertions.assertEquals(expectedOutput, mapper.map(inputSchema)) + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueToJsonTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueToJsonTest.kt index e74922f2430f2..a35942a55da68 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueToJsonTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/AirbyteValueToJsonTest.kt @@ -74,10 +74,10 @@ class AirbyteValueToJsonTest { true ), "date" to FieldType(DateType, false), - "time" to FieldType(TimeType(false), false), - "timestamp" to FieldType(TimestampType(false), false), - "time_without_timezone" to FieldType(TimeType(true), false), - "timestamp_without_timezone" to FieldType(TimestampType(true), false) + "time" to FieldType(TimeTypeWithoutTimezone, false), + "timestamp" to FieldType(TimestampTypeWithoutTimezone, false), + "time_without_timezone" to FieldType(TimeTypeWithTimezone, false), + "timestamp_without_timezone" to FieldType(TimestampTypeWithTimezone, false) ) ) val jsonValue = AirbyteValueToJson().convert(airbyteValue) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteSchemaTypeTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteSchemaTypeTest.kt index 24847b151e0c6..749463ddbd485 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteSchemaTypeTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonSchemaToAirbyteSchemaTypeTest.kt @@ -64,26 +64,26 @@ class JsonSchemaToAirbyteSchemaTypeTest { fun testStringTime() { val stringType = ofType("string").put("format", "time") val airbyteType = JsonSchemaToAirbyteType().convert(stringType) - Assertions.assertEquals(airbyteType, TimeType(hasTimezone = true)) + Assertions.assertEquals(airbyteType, TimeTypeWithTimezone) stringType.put("airbyte_type", "time_without_timezone") val airbyteType2 = JsonSchemaToAirbyteType().convert(stringType) - Assertions.assertEquals(airbyteType2, TimeType(hasTimezone = false)) + Assertions.assertEquals(airbyteType2, TimeTypeWithoutTimezone) stringType.put("airbyte_type", "time_with_timezone") val airbyteType3 = JsonSchemaToAirbyteType().convert(stringType) - Assertions.assertEquals(airbyteType3, TimeType(hasTimezone = true)) + Assertions.assertEquals(airbyteType3, TimeTypeWithTimezone) } @Test fun testStringTimestamp() { val stringType = ofType("string").put("format", "date-time") val airbyteType = JsonSchemaToAirbyteType().convert(stringType) - Assertions.assertEquals(airbyteType, TimestampType(hasTimezone = true)) + Assertions.assertEquals(airbyteType, TimestampTypeWithTimezone) stringType.put("airbyte_type", "timestamp_without_timezone") val airbyteType2 = JsonSchemaToAirbyteType().convert(stringType) - Assertions.assertEquals(airbyteType2, TimestampType(hasTimezone = false)) + Assertions.assertEquals(airbyteType2, TimestampTypeWithoutTimezone) stringType.put("airbyte_type", "timestamp_with_timezone") val airbyteType3 = JsonSchemaToAirbyteType().convert(stringType) - Assertions.assertEquals(airbyteType3, TimestampType(hasTimezone = true)) + Assertions.assertEquals(airbyteType3, TimestampTypeWithTimezone) } @Test diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValueTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValueTest.kt index 225b8d12720bf..58019e3c24316 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValueTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/data/JsonToAirbyteValueTest.kt @@ -146,7 +146,7 @@ class JsonToAirbyteValueTest { JsonToAirbyteValue() .convert( JsonNodeFactory.instance.textNode("2021-01-01T00:00:00Z"), - TimestampType(true) + TimestampTypeWithTimezone ) Assertions.assertTrue(value is TimestampValue) Assertions.assertEquals("2021-01-01T00:00:00Z", (value as TimestampValue).value) @@ -156,7 +156,7 @@ class JsonToAirbyteValueTest { fun testTime() { val value = JsonToAirbyteValue() - .convert(JsonNodeFactory.instance.textNode("00:00:00"), TimeType(true)) + .convert(JsonNodeFactory.instance.textNode("00:00:00"), TimeTypeWithTimezone) Assertions.assertTrue(value is TimeValue) Assertions.assertEquals("00:00:00", (value as TimeValue).value) } diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/SchemaTestBuilder.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/SchemaTestBuilder.kt new file mode 100644 index 0000000000000..f0dcb87e46115 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/SchemaTestBuilder.kt @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.test.util + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.ObjectType +import java.util.* +import kotlin.collections.LinkedHashMap + +class SchemaTestBuilder( + val inputSchema: ObjectType = ObjectType(properties = LinkedHashMap()), + val expectedSchema: ObjectType = ObjectType(properties = LinkedHashMap()), + val parent: SchemaTestBuilder? = null +) { + + fun with(given: FieldType, expected: FieldType = given): SchemaTestBuilder { + val name = UUID.randomUUID().toString() + inputSchema.properties[name] = given + expectedSchema.properties[name] = expected + return this + } + + fun with(given: AirbyteType, expected: AirbyteType = given): SchemaTestBuilder { + return with(FieldType(given, false), FieldType(expected, false)) + } + + fun withRecord(nullable: Boolean = false): SchemaTestBuilder { + val name = UUID.randomUUID().toString() + val inputRecord = ObjectType(properties = LinkedHashMap()) + val outputRecord = ObjectType(properties = LinkedHashMap()) + inputSchema.properties[name] = FieldType(inputRecord, nullable = nullable) + expectedSchema.properties[name] = FieldType(outputRecord, nullable = nullable) + return SchemaTestBuilder( + inputSchema = inputRecord, + expectedSchema = outputRecord, + parent = this + ) + } + + fun endRecord(): SchemaTestBuilder { + if (parent == null) { + throw IllegalStateException("Cannot end record without parent") + } + return parent + } + + fun build(): Pair<ObjectType, ObjectType> { + if (parent != null) { + throw IllegalStateException("Cannot build nested schema") + } + return Pair(inputSchema, expectedSchema) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt index c2b3eddfc6f75..6f1e733a62976 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AirbyteTypeToAvroSchema.kt @@ -17,8 +17,10 @@ import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.StringType -import io.airbyte.cdk.load.data.TimeType -import io.airbyte.cdk.load.data.TimestampType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone import io.airbyte.cdk.load.data.UnionType import io.airbyte.cdk.load.data.UnknownType import org.apache.avro.Schema @@ -57,9 +59,11 @@ class AirbyteTypeToAvroSchema { is ObjectTypeWithoutSchema -> throw IllegalArgumentException("Object type without schema is not supported") is StringType -> return SchemaBuilder.builder().stringType() - is TimeType -> + is TimeTypeWithTimezone, + is TimeTypeWithoutTimezone -> throw IllegalArgumentException("String-based time types are not supported") - is TimestampType -> + is TimestampTypeWithTimezone, + is TimestampTypeWithoutTimezone -> throw IllegalArgumentException("String-based timestamp types are not supported") is UnionType -> return Schema.createUnion(airbyteSchema.options.map { convert(it, path) }) diff --git a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt index 64aaef992a624..0cd0ba1f680d2 100644 --- a/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt +++ b/airbyte-cdk/bulk/toolkits/load-avro/src/main/kotlin/io/airbyte/cdk/load/data/avro/AvroRecordToAirbyteValue.kt @@ -24,8 +24,10 @@ import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue -import io.airbyte.cdk.load.data.TimeType -import io.airbyte.cdk.load.data.TimestampType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone import io.airbyte.cdk.load.data.UnionType import io.airbyte.cdk.load.data.UnknownType import org.apache.avro.generic.GenericArray @@ -68,8 +70,11 @@ class AvroRecordToAirbyteValue { throw IllegalArgumentException("Unsupported string type: $avroValue") } ) - is TimeType -> throw UnsupportedOperationException("TimeType is not supported") - is TimestampType -> + is TimeTypeWithoutTimezone, + is TimeTypeWithTimezone -> + throw UnsupportedOperationException("TimeType is not supported") + is TimestampTypeWithoutTimezone, + is TimestampTypeWithTimezone -> throw UnsupportedOperationException("TimestampType is not supported") is UnionType -> return tryConvertUnion(avroValue, schema) is UnknownType -> throw UnsupportedOperationException("UnknownType is not supported")