Skip to content

Commit

Permalink
Bulk load CDK: Add test with every single airbyte type (#48370)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Nov 5, 2024
1 parent ddfa537 commit 9a51bce
Show file tree
Hide file tree
Showing 8 changed files with 432 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.NoopNameMapper
import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest
import io.airbyte.cdk.load.write.Untyped
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test

Expand All @@ -25,6 +26,7 @@ class MockBasicFunctionalityIntegrationTest :
promoteUnionToObject = false,
preserveUndeclaredFields = true,
commitDataIncrementally = false,
allTypesBehavior = Untyped,
) {
@Test
override fun testBasicWrite() {
Expand Down Expand Up @@ -86,4 +88,9 @@ class MockBasicFunctionalityIntegrationTest :
override fun testUnions() {
super.testUnions()
}

@Test
override fun testAllTypes() {
super.testAllTypes()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ sealed interface AirbyteValue {
companion object {
fun from(value: Any?): AirbyteValue =
when (value) {
is AirbyteValue -> value
null -> NullValue
is String -> StringValue(value)
is Boolean -> BooleanValue(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class AirbyteTypeToJsonSchema {
val timestampNode = ofType("string").put("format", "date-time")
timestampNode.put("airbyte_type", "timestamp_without_timezone")
}
is UnknownType -> throw IllegalArgumentException("Unknown type: $airbyteType")
// In case of unknown type, just return {} (i.e. the accept-all JsonSchema)
is UnknownType -> JsonNodeFactory.instance.objectNode()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data.json

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.util.Jsons
import java.math.BigDecimal

/**
Expand Down Expand Up @@ -34,7 +35,7 @@ class JsonToAirbyteValue {
is ObjectType -> toObject(json, schema)
is ObjectTypeWithoutSchema,
is ObjectTypeWithEmptySchema -> toObjectWithoutSchema(json)
is StringType -> StringValue(json.asText())
is StringType -> toString(json)
is TimeTypeWithTimezone,
is TimeTypeWithoutTimezone -> TimeValue(json.asText())
is TimestampTypeWithTimezone,
Expand Down Expand Up @@ -67,6 +68,14 @@ class JsonToAirbyteValue {
return ArrayValue(json.map { fromJson(it) })
}

private fun toString(json: JsonNode): StringValue {
return if (json.isTextual) {
StringValue(json.asText())
} else {
StringValue(Jsons.writeValueAsString(json))
}
}

private fun toBoolean(json: JsonNode): BooleanValue {
val boolVal =
when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,17 +271,25 @@ class RecordDiffer(
// Handle temporal types specifically, because they require explicit parsing
return when (v1) {
is DateValue ->
LocalDate.parse(v1.value)
.compareTo(LocalDate.parse((v2 as DateValue).value))
try {
LocalDate.parse(v1.value)
.compareTo(LocalDate.parse((v2 as DateValue).value))
} catch (e: Exception) {
v1.value.compareTo((v2 as DateValue).value)
}
is TimeValue -> {
try {
val time1 = LocalTime.parse(v1.value)
val time2 = LocalTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
} catch (e: Exception) {
val time1 = OffsetTime.parse(v1.value)
val time2 = OffsetTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
try {
val time1 = OffsetTime.parse(v1.value)
val time2 = OffsetTime.parse((v2 as TimeValue).value)
time1.compareTo(time2)
} catch (e: Exception) {
v1.value.compareTo((v2 as TimeValue).value)
}
}
}
is TimestampValue -> {
Expand All @@ -290,9 +298,13 @@ class RecordDiffer(
val ts2 = LocalDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
} catch (e: Exception) {
val ts1 = OffsetDateTime.parse(v1.value)
val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
try {
val ts1 = OffsetDateTime.parse(v1.value)
val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value)
ts1.compareTo(ts2)
} catch (e: Exception) {
v1.value.compareTo((v2 as TimestampValue).value)
}
}
}
// otherwise, just be a terrible person.
Expand Down
Loading

0 comments on commit 9a51bce

Please sign in to comment.