Skip to content

Commit

Permalink
Bulk Load CDK: Airbyte Type cleanup, Basic ID Schema Mapper (#47190)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Oct 22, 2024
1 parent f5bf624 commit fb8063b
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

interface AirbyteSchemaIdentityMapper {
fun map(schema: AirbyteType): AirbyteType =
when (schema) {
is NullType -> mapNull(schema)
is StringType -> mapString(schema)
is BooleanType -> mapBoolean(schema)
is IntegerType -> mapInteger(schema)
is NumberType -> mapNumber(schema)
is ArrayType -> mapArray(schema)
is ArrayTypeWithoutSchema -> mapArrayWithoutSchema(schema)
is ObjectType -> mapObject(schema)
is ObjectTypeWithoutSchema -> mapObjectWithoutSchema(schema)
is ObjectTypeWithEmptySchema -> mapObjectWithEmptySchema(schema)
is UnionType -> mapUnion(schema)
is DateType -> mapDate(schema)
is TimeTypeWithTimezone -> mapTimeTypeWithTimezone(schema)
is TimeTypeWithoutTimezone -> mapTimeTypeWithoutTimezone(schema)
is TimestampTypeWithTimezone -> mapTimestampTypeWithTimezone(schema)
is TimestampTypeWithoutTimezone -> mapTimestampTypeWithoutTimezone(schema)
is UnknownType -> mapUnknown(schema)
}

fun mapNull(schema: NullType): AirbyteType = schema
fun mapString(schema: StringType): AirbyteType = schema
fun mapBoolean(schema: BooleanType): AirbyteType = schema
fun mapInteger(schema: IntegerType): AirbyteType = schema
fun mapNumber(schema: NumberType): AirbyteType = schema
fun mapArray(schema: ArrayType): AirbyteType {
return ArrayType(mapField(schema.items))
}
fun mapArrayWithoutSchema(schema: ArrayTypeWithoutSchema): AirbyteType = schema
fun mapObject(schema: ObjectType): AirbyteType {
val properties = LinkedHashMap<String, FieldType>()
schema.properties.forEach { (name, field) -> properties[name] = mapField(field) }
return ObjectType(properties)
}
fun mapObjectWithoutSchema(schema: ObjectTypeWithoutSchema): AirbyteType = schema
fun mapObjectWithEmptySchema(schema: ObjectTypeWithEmptySchema): AirbyteType = schema
fun mapUnion(schema: UnionType): AirbyteType {
return UnionType(schema.options.map { map(it) })
}
fun mapDate(schema: DateType): AirbyteType = schema
fun mapTimeTypeWithTimezone(schema: TimeTypeWithTimezone): AirbyteType = schema
fun mapTimeTypeWithoutTimezone(schema: TimeTypeWithoutTimezone): AirbyteType = schema
fun mapTimestampTypeWithTimezone(schema: TimestampTypeWithTimezone): AirbyteType = schema
fun mapTimestampTypeWithoutTimezone(schema: TimestampTypeWithoutTimezone): AirbyteType = schema
fun mapUnknown(schema: UnknownType): AirbyteType = schema
fun mapField(field: FieldType): FieldType = FieldType(map(field.type), field.nullable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ data object NumberType : AirbyteType

data object DateType : AirbyteType

data class TimestampType(val hasTimezone: Boolean) : AirbyteType
data object TimestampTypeWithTimezone : AirbyteType

data class TimeType(val hasTimezone: Boolean) : AirbyteType
data object TimestampTypeWithoutTimezone : AirbyteType

data object TimeTypeWithTimezone : AirbyteType

data object TimeTypeWithoutTimezone : AirbyteType

data class ArrayType(val items: FieldType) : AirbyteType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ class AirbyteTypeToJsonSchema {
unionNode
}
is DateType -> ofType("string").put("format", "date")
is TimeType -> {
is TimeTypeWithTimezone -> {
val timeNode = ofType("string").put("format", "time")
if (airbyteType.hasTimezone) {
timeNode.put("airbyte_type", "time_with_timezone")
} else {
timeNode.put("airbyte_type", "time_without_timezone")
}
timeNode.put("airbyte_type", "time_with_timezone")
}
is TimeTypeWithoutTimezone -> {
val timeNode = ofType("string").put("format", "time")
timeNode.put("airbyte_type", "time_without_timezone")
}
is TimestampType -> {
is TimestampTypeWithTimezone -> {
val timestampNode = ofType("string").put("format", "date-time")
if (airbyteType.hasTimezone) {
timestampNode.put("airbyte_type", "timestamp_with_timezone")
} else {
timestampNode.put("airbyte_type", "timestamp_without_timezone")
}
timestampNode.put("airbyte_type", "timestamp_with_timezone")
}
is TimestampTypeWithoutTimezone -> {
val timestampNode = ofType("string").put("format", "date-time")
timestampNode.put("airbyte_type", "timestamp_without_timezone")
}
else -> throw IllegalArgumentException("Unknown type: $airbyteType")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,17 @@ class JsonSchemaToAirbyteType {
when (schema.get("format")?.asText()) {
"date" -> DateType
"time" ->
TimeType(
hasTimezone = schema.get("airbyte_type")?.asText() != "time_without_timezone"
)
if (schema.get("airbyte_type")?.asText() == "time_without_timezone") {
TimeTypeWithoutTimezone
} else {
TimeTypeWithTimezone
}
"date-time" ->
TimestampType(
hasTimezone =
schema.get("airbyte_type")?.asText() != "timestamp_without_timezone"
)
if (schema.get("airbyte_type")?.asText() == "timestamp_without_timezone") {
TimestampTypeWithoutTimezone
} else {
TimestampTypeWithTimezone
}
null -> StringType
else ->
throw IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ class JsonToAirbyteValue {
is ObjectTypeWithoutSchema,
is ObjectTypeWithEmptySchema -> toObjectWithoutSchema(json)
is StringType -> StringValue(json.asText())
is TimeType -> TimeValue(json.asText())
is TimestampType -> TimestampValue(json.asText())
is TimeTypeWithTimezone,
is TimeTypeWithoutTimezone -> TimeValue(json.asText())
is TimestampTypeWithTimezone,
is TimestampTypeWithoutTimezone -> TimestampValue(json.asText())
is UnionType -> toUnion(json, schema.options)
is UnknownType -> UnknownValue("From $schema: $json")
}
Expand Down Expand Up @@ -172,8 +174,10 @@ class JsonToAirbyteValue {
is ObjectTypeWithoutSchema,
is ObjectTypeWithEmptySchema -> json.isObject
is StringType -> json.isTextual
is TimeType -> json.isTextual
is TimestampType -> json.isTextual
is TimeTypeWithTimezone,
is TimeTypeWithoutTimezone,
is TimestampTypeWithTimezone,
is TimestampTypeWithoutTimezone -> json.isTextual
is UnionType -> schema.options.any { matchesStrictly(it, json) }
is UnknownType -> false
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.test.util.SchemaTestBuilder
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class AirbyteSchemaIdentityMapperTest {
@Test
fun testIdMapping() {
val (inputSchema, expectedOutput) =
SchemaTestBuilder()
.with(DateType)
.with(StringType)
.with(IntegerType)
.with(BooleanType)
.with(NumberType)
.with(NullType)
.with(ArrayType(FieldType(StringType, true)))
.with(UnionType(listOf(StringType, IntegerType, NullType)))
.withRecord()
.with(TimeTypeWithTimezone)
.with(TimeTypeWithoutTimezone)
.with(TimestampTypeWithTimezone)
.with(TimestampTypeWithoutTimezone)
.withRecord()
.with(ObjectTypeWithoutSchema)
.with(ObjectTypeWithEmptySchema)
.with(ArrayTypeWithoutSchema)
.endRecord()
.endRecord()
.with(NullType)
.build()

val mapper = object : AirbyteSchemaIdentityMapper {}
Assertions.assertEquals(expectedOutput, mapper.map(inputSchema))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class AirbyteValueToJsonTest {
true
),
"date" to FieldType(DateType, false),
"time" to FieldType(TimeType(false), false),
"timestamp" to FieldType(TimestampType(false), false),
"time_without_timezone" to FieldType(TimeType(true), false),
"timestamp_without_timezone" to FieldType(TimestampType(true), false)
"time" to FieldType(TimeTypeWithoutTimezone, false),
"timestamp" to FieldType(TimestampTypeWithoutTimezone, false),
"time_without_timezone" to FieldType(TimeTypeWithTimezone, false),
"timestamp_without_timezone" to FieldType(TimestampTypeWithTimezone, false)
)
)
val jsonValue = AirbyteValueToJson().convert(airbyteValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,26 @@ class JsonSchemaToAirbyteSchemaTypeTest {
fun testStringTime() {
val stringType = ofType("string").put("format", "time")
val airbyteType = JsonSchemaToAirbyteType().convert(stringType)
Assertions.assertEquals(airbyteType, TimeType(hasTimezone = true))
Assertions.assertEquals(airbyteType, TimeTypeWithTimezone)
stringType.put("airbyte_type", "time_without_timezone")
val airbyteType2 = JsonSchemaToAirbyteType().convert(stringType)
Assertions.assertEquals(airbyteType2, TimeType(hasTimezone = false))
Assertions.assertEquals(airbyteType2, TimeTypeWithoutTimezone)
stringType.put("airbyte_type", "time_with_timezone")
val airbyteType3 = JsonSchemaToAirbyteType().convert(stringType)
Assertions.assertEquals(airbyteType3, TimeType(hasTimezone = true))
Assertions.assertEquals(airbyteType3, TimeTypeWithTimezone)
}

@Test
fun testStringTimestamp() {
val stringType = ofType("string").put("format", "date-time")
val airbyteType = JsonSchemaToAirbyteType().convert(stringType)
Assertions.assertEquals(airbyteType, TimestampType(hasTimezone = true))
Assertions.assertEquals(airbyteType, TimestampTypeWithTimezone)
stringType.put("airbyte_type", "timestamp_without_timezone")
val airbyteType2 = JsonSchemaToAirbyteType().convert(stringType)
Assertions.assertEquals(airbyteType2, TimestampType(hasTimezone = false))
Assertions.assertEquals(airbyteType2, TimestampTypeWithoutTimezone)
stringType.put("airbyte_type", "timestamp_with_timezone")
val airbyteType3 = JsonSchemaToAirbyteType().convert(stringType)
Assertions.assertEquals(airbyteType3, TimestampType(hasTimezone = true))
Assertions.assertEquals(airbyteType3, TimestampTypeWithTimezone)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class JsonToAirbyteValueTest {
JsonToAirbyteValue()
.convert(
JsonNodeFactory.instance.textNode("2021-01-01T00:00:00Z"),
TimestampType(true)
TimestampTypeWithTimezone
)
Assertions.assertTrue(value is TimestampValue)
Assertions.assertEquals("2021-01-01T00:00:00Z", (value as TimestampValue).value)
Expand All @@ -156,7 +156,7 @@ class JsonToAirbyteValueTest {
fun testTime() {
val value =
JsonToAirbyteValue()
.convert(JsonNodeFactory.instance.textNode("00:00:00"), TimeType(true))
.convert(JsonNodeFactory.instance.textNode("00:00:00"), TimeTypeWithTimezone)
Assertions.assertTrue(value is TimeValue)
Assertions.assertEquals("00:00:00", (value as TimeValue).value)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.ObjectType
import java.util.*
import kotlin.collections.LinkedHashMap

class SchemaTestBuilder(
val inputSchema: ObjectType = ObjectType(properties = LinkedHashMap()),
val expectedSchema: ObjectType = ObjectType(properties = LinkedHashMap()),
val parent: SchemaTestBuilder? = null
) {

fun with(given: FieldType, expected: FieldType = given): SchemaTestBuilder {
val name = UUID.randomUUID().toString()
inputSchema.properties[name] = given
expectedSchema.properties[name] = expected
return this
}

fun with(given: AirbyteType, expected: AirbyteType = given): SchemaTestBuilder {
return with(FieldType(given, false), FieldType(expected, false))
}

fun withRecord(nullable: Boolean = false): SchemaTestBuilder {
val name = UUID.randomUUID().toString()
val inputRecord = ObjectType(properties = LinkedHashMap())
val outputRecord = ObjectType(properties = LinkedHashMap())
inputSchema.properties[name] = FieldType(inputRecord, nullable = nullable)
expectedSchema.properties[name] = FieldType(outputRecord, nullable = nullable)
return SchemaTestBuilder(
inputSchema = inputRecord,
expectedSchema = outputRecord,
parent = this
)
}

fun endRecord(): SchemaTestBuilder {
if (parent == null) {
throw IllegalStateException("Cannot end record without parent")
}
return parent
}

fun build(): Pair<ObjectType, ObjectType> {
if (parent != null) {
throw IllegalStateException("Cannot build nested schema")
}
return Pair(inputSchema, expectedSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.TimeType
import io.airbyte.cdk.load.data.TimestampType
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import org.apache.avro.Schema
Expand Down Expand Up @@ -57,9 +59,11 @@ class AirbyteTypeToAvroSchema {
is ObjectTypeWithoutSchema ->
throw IllegalArgumentException("Object type without schema is not supported")
is StringType -> return SchemaBuilder.builder().stringType()
is TimeType ->
is TimeTypeWithTimezone,
is TimeTypeWithoutTimezone ->
throw IllegalArgumentException("String-based time types are not supported")
is TimestampType ->
is TimestampTypeWithTimezone,
is TimestampTypeWithoutTimezone ->
throw IllegalArgumentException("String-based timestamp types are not supported")
is UnionType ->
return Schema.createUnion(airbyteSchema.options.map { convert(it, path) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.data.TimeType
import io.airbyte.cdk.load.data.TimestampType
import io.airbyte.cdk.load.data.TimeTypeWithTimezone
import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone
import io.airbyte.cdk.load.data.UnionType
import io.airbyte.cdk.load.data.UnknownType
import org.apache.avro.generic.GenericArray
Expand Down Expand Up @@ -68,8 +70,11 @@ class AvroRecordToAirbyteValue {
throw IllegalArgumentException("Unsupported string type: $avroValue")
}
)
is TimeType -> throw UnsupportedOperationException("TimeType is not supported")
is TimestampType ->
is TimeTypeWithoutTimezone,
is TimeTypeWithTimezone ->
throw UnsupportedOperationException("TimeType is not supported")
is TimestampTypeWithoutTimezone,
is TimestampTypeWithTimezone ->
throw UnsupportedOperationException("TimestampType is not supported")
is UnionType -> return tryConvertUnion(avroValue, schema)
is UnknownType -> throw UnsupportedOperationException("UnknownType is not supported")
Expand Down

0 comments on commit fb8063b

Please sign in to comment.