From 9a51bce6f63e099a49f0f24dee9fd9dedf06ef29 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 5 Nov 2024 13:16:36 -0800 Subject: [PATCH] Bulk load CDK: Add test with every single airbyte type (#48370) --- .../MockBasicFunctionalityIntegrationTest.kt | 7 + .../io/airbyte/cdk/load/data/AirbyteValue.kt | 1 + .../load/data/json/AirbyteTypeToJsonSchema.kt | 3 +- .../cdk/load/data/json/JsonToAirbyteValue.kt | 11 +- .../cdk/load/test/util/RecordDiffer.kt | 28 +- .../BasicFunctionalityIntegrationTest.kt | 355 +++++++++++++++++- ...evNullBasicFunctionalityIntegrationTest.kt | 2 + .../destination/s3_v2/S3V2WriteTest.kt | 38 ++ 8 files changed, 432 insertions(+), 13 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt index a633183deed4..874489a1d463 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockBasicFunctionalityIntegrationTest.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.test.util.NoopNameMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest +import io.airbyte.cdk.load.write.Untyped import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test @@ -25,6 +26,7 @@ class MockBasicFunctionalityIntegrationTest : promoteUnionToObject = false, preserveUndeclaredFields = true, commitDataIncrementally = false, + allTypesBehavior = Untyped, ) { @Test override fun testBasicWrite() { @@ -86,4 +88,9 @@ class MockBasicFunctionalityIntegrationTest : override fun testUnions() { super.testUnions() } + + @Test + override fun testAllTypes() { + super.testAllTypes() + } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt index 65143aaf5eac..fb7d7657a5ca 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt @@ -16,6 +16,7 @@ sealed interface AirbyteValue { companion object { fun from(value: Any?): AirbyteValue = when (value) { + is AirbyteValue -> value null -> NullValue is String -> StringValue(value) is Boolean -> BooleanValue(value) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt index 101f9344ba41..c1f290a6f894 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/AirbyteTypeToJsonSchema.kt @@ -64,7 +64,8 @@ class AirbyteTypeToJsonSchema { val timestampNode = ofType("string").put("format", "date-time") timestampNode.put("airbyte_type", "timestamp_without_timezone") } - is UnknownType -> throw IllegalArgumentException("Unknown type: $airbyteType") + // In case of unknown type, just return {} (i.e. the accept-all JsonSchema) + is UnknownType -> JsonNodeFactory.instance.objectNode() } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt index 6145f09e0fb8..aada67af68ee 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/json/JsonToAirbyteValue.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.data.json import com.fasterxml.jackson.databind.JsonNode import io.airbyte.cdk.load.data.* +import io.airbyte.cdk.util.Jsons import java.math.BigDecimal /** @@ -34,7 +35,7 @@ class JsonToAirbyteValue { is ObjectType -> toObject(json, schema) is ObjectTypeWithoutSchema, is ObjectTypeWithEmptySchema -> toObjectWithoutSchema(json) - is StringType -> StringValue(json.asText()) + is StringType -> toString(json) is TimeTypeWithTimezone, is TimeTypeWithoutTimezone -> TimeValue(json.asText()) is TimestampTypeWithTimezone, @@ -67,6 +68,14 @@ class JsonToAirbyteValue { return ArrayValue(json.map { fromJson(it) }) } + private fun toString(json: JsonNode): StringValue { + return if (json.isTextual) { + StringValue(json.asText()) + } else { + StringValue(Jsons.writeValueAsString(json)) + } + } + private fun toBoolean(json: JsonNode): BooleanValue { val boolVal = when { diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt index 2dfe4035b657..c9310efe66f1 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt @@ -271,17 +271,25 @@ class RecordDiffer( // Handle temporal types specifically, because they require explicit parsing return when (v1) { is DateValue -> - LocalDate.parse(v1.value) - .compareTo(LocalDate.parse((v2 as DateValue).value)) + try { + LocalDate.parse(v1.value) + .compareTo(LocalDate.parse((v2 as DateValue).value)) + } catch (e: Exception) { + v1.value.compareTo((v2 as DateValue).value) + } is TimeValue -> { try { val time1 = LocalTime.parse(v1.value) val time2 = LocalTime.parse((v2 as TimeValue).value) time1.compareTo(time2) } catch (e: Exception) { - val time1 = OffsetTime.parse(v1.value) - val time2 = OffsetTime.parse((v2 as TimeValue).value) - time1.compareTo(time2) + try { + val time1 = OffsetTime.parse(v1.value) + val time2 = OffsetTime.parse((v2 as TimeValue).value) + time1.compareTo(time2) + } catch (e: Exception) { + v1.value.compareTo((v2 as TimeValue).value) + } } } is TimestampValue -> { @@ -290,9 +298,13 @@ class RecordDiffer( val ts2 = LocalDateTime.parse((v2 as TimestampValue).value) ts1.compareTo(ts2) } catch (e: Exception) { - val ts1 = OffsetDateTime.parse(v1.value) - val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value) - ts1.compareTo(ts2) + try { + val ts1 = OffsetDateTime.parse(v1.value) + val ts2 = OffsetDateTime.parse((v2 as TimestampValue).value) + ts1.compareTo(ts2) + } catch (e: Exception) { + v1.value.compareTo((v2 as TimestampValue).value) + } } } // otherwise, just be a terrible person. diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt index aa4f64483137..f8cbbab5005c 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/write/BasicFunctionalityIntegrationTest.kt @@ -11,20 +11,31 @@ import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayType import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NumberType import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema import io.airbyte.cdk.load.data.ObjectValue import io.airbyte.cdk.load.data.StringType import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimeValue import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampValue import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType import io.airbyte.cdk.load.message.DestinationRecord +import io.airbyte.cdk.load.message.DestinationRecord.Change import io.airbyte.cdk.load.message.StreamCheckpoint import io.airbyte.cdk.load.test.util.DestinationCleaner import io.airbyte.cdk.load.test.util.DestinationDataDumper @@ -39,7 +50,12 @@ import io.airbyte.cdk.util.Jsons import io.airbyte.protocol.models.v0.AirbyteMessage import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange import io.airbyte.protocol.models.v0.AirbyteStateMessage +import java.math.BigDecimal +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime import java.time.OffsetDateTime +import java.time.OffsetTime import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.delay import kotlinx.coroutines.launch @@ -53,6 +69,23 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.junit.jupiter.api.assertThrows +sealed interface AllTypesBehavior + +data class StronglyTyped( + /** + * Whether the destination can cast any value to string. E.g. given a StringType column, if a + * record contains `{"the_column": {"foo": "bar"}}`, does the connector treat this as a type + * error, or does it persist a serialized JSON string? + */ + val convertAllValuesToString: Boolean = true, + /** Whether top-level fields are represented as float64, or as fixed-point values */ + val topLevelFloatLosesPrecision: Boolean = true, + /** Whether floats nested inside objects/arrays are represented as float64. */ + val nestedFloatLosesPrecision: Boolean = true, +) : AllTypesBehavior + +data object Untyped : AllTypesBehavior + abstract class BasicFunctionalityIntegrationTest( /** The config to pass into the connector, as a serialized JSON blob */ val configContents: String, @@ -89,6 +122,7 @@ abstract class BasicFunctionalityIntegrationTest( * would set this parameter to `true`. */ val commitDataIncrementally: Boolean, + val allTypesBehavior: AllTypesBehavior, ) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper) { val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configContents) @@ -115,7 +149,7 @@ abstract class BasicFunctionalityIntegrationTest( emittedAtMs = 1234, changes = mutableListOf( - DestinationRecord.Change( + Change( field = "foo", change = AirbyteRecordMessageMetaChange.Change.NULLED, reason = @@ -171,7 +205,7 @@ abstract class BasicFunctionalityIntegrationTest( OutputRecord.Meta( changes = mutableListOf( - DestinationRecord.Change( + Change( field = "foo", change = AirbyteRecordMessageMetaChange.Change @@ -1436,7 +1470,321 @@ abstract class BasicFunctionalityIntegrationTest( assertDoesNotThrow { runSync(configContents, DestinationCatalog(streams), messages) } } - // TODO basic allTypes() test + /** A basic test that we handle all supported data types in a reasonable way. */ + // Depending on how future connector development goes - we might need to do something similar to + // BaseSqlGeneratorIntegrationTest, where we split out tests for connectors that do/don't + // support safe_cast. (or, we move fully to in-connector typing, and we stop worrying about + // per-destination safe_cast support). + @Test + open fun testAllTypes() { + assumeTrue(verifyDataWriting) + val stream = + DestinationStream( + DestinationStream.Descriptor(randomizedNamespace, "test_stream"), + Append, + ObjectType( + linkedMapOf( + "id" to intType, + "struct" to + FieldType( + ObjectType(linkedMapOf("foo" to numberType)), + nullable = false + ), + "struct_schemaless" to + FieldType(ObjectTypeWithEmptySchema, nullable = false), + "struct_empty" to FieldType(ObjectTypeWithEmptySchema, nullable = false), + "array" to FieldType(ArrayType(numberType), nullable = false), + "array_schemaless" to FieldType(ArrayTypeWithoutSchema, nullable = false), + "string" to FieldType(StringType, nullable = false), + "number" to FieldType(NumberType, nullable = false), + "boolean" to FieldType(BooleanType, nullable = false), + "timestamp_with_timezone" to + FieldType(TimestampTypeWithTimezone, nullable = false), + "timestamp_without_timezone" to + FieldType(TimestampTypeWithoutTimezone, nullable = false), + "time_with_timezone" to FieldType(TimeTypeWithTimezone, nullable = false), + "time_without_timezone" to + FieldType(TimeTypeWithoutTimezone, nullable = false), + "date" to FieldType(DateType, nullable = false), + "unknown" to FieldType(UnknownType("test"), nullable = false), + ) + ), + generationId = 42, + minimumGenerationId = 0, + syncId = 42, + ) + fun makeRecord(data: String) = + DestinationRecord( + randomizedNamespace, + "test_stream", + data, + emittedAtMs = 100, + ) + runSync( + configContents, + stream, + listOf( + // A record with valid values for all fields + makeRecord( + """ + { + "id": 1, + "struct": {"foo": 1.0}, + "struct_schemaless": {"foo": 1.0}, + "struct_empty": {"foo": 1.0}, + "array": [1.0], + "array_schemaless": [1.0], + "string": "foo", + "number": 42.1, + "integer": 42, + "boolean": true, + "timestamp_with_timezone": "2023-01-23T12:34:56Z", + "timestamp_without_timezone": "2023-01-23T12:34:56", + "time_with_timezone": "12:34:56Z", + "time_without_timezone": "12:34:56", + "date": "2023-01-23", + "unknown": {} + } + """.trimIndent() + ), + // A record with null for all fields + makeRecord( + """ + { + "id": 2, + "struct": null, + "struct_schemaless": null, + "struct_empty": null, + "array": null, + "array_schemaless": null, + "string": null, + "number": null, + "integer": null, + "boolean": null, + "timestamp_with_timezone": null, + "timestamp_without_timezone": null, + "time_with_timezone": null, + "time_without_timezone": null, + "date": null, + "unknown": null + } + """.trimIndent() + ), + // A record with all fields unset + makeRecord("""{"id": 3}"""), + // A record that verifies floating-point behavior. + // 67.174118 cannot be represented as a standard float64 + // (it turns into 67.17411800000001). + makeRecord( + """ + { + "id": 4, + "struct": {"foo": 67.174118}, + "struct_schemaless": {"foo": 67.174118}, + "struct_empty": {"foo": 67.174118}, + "array": [67.174118], + "array_schemaless": [67.174118], + "number": 67.174118 + } + """.trimIndent(), + ), + // A record with invalid values for all fields + makeRecord( + """ + { + "id": 5, + "struct": "foo", + "struct_schemaless": "foo", + "struct_empty": "foo", + "array": "foo", + "array_schemaless": "foo", + "string": {}, + "number": "foo", + "integer": "foo", + "boolean": "foo", + "timestamp_with_timezone": "foo", + "timestamp_without_timezone": "foo", + "time_with_timezone": "foo", + "time_without_timezone": "foo", + "date": "foo" + } + """.trimIndent() + ), + ), + ) + + val nestedFloat: BigDecimal + val topLevelFloat: BigDecimal + val badValuesData: Map + val badValuesChanges: MutableList + when (allTypesBehavior) { + is StronglyTyped -> { + nestedFloat = + if (allTypesBehavior.nestedFloatLosesPrecision) { + BigDecimal("67.17411800000001") + } else { + BigDecimal("67.174118") + } + topLevelFloat = + if (allTypesBehavior.topLevelFloatLosesPrecision) { + BigDecimal("67.17411800000001") + } else { + BigDecimal("67.174118") + } + badValuesData = + mapOf( + "id" to 5, + "struct" to null, + "struct_schemaless" to null, + "struct_empty" to null, + "array" to null, + "array_schemaless" to null, + "string" to + if (allTypesBehavior.convertAllValuesToString) { + "{}" + } else { + null + }, + "number" to null, + "integer" to null, + "boolean" to null, + "timestamp_with_timezone" to null, + "timestamp_without_timezone" to null, + "time_with_timezone" to null, + "time_without_timezone" to null, + "date" to null, + ) + badValuesChanges = + (stream.schema as ObjectType) + .properties + .keys + .map { key -> + Change( + key, + AirbyteRecordMessageMetaChange.Change.NULLED, + AirbyteRecordMessageMetaChange.Reason + .DESTINATION_SERIALIZATION_ERROR, + ) + } + .filter { + !allTypesBehavior.convertAllValuesToString || it.field != "string" + } + .toMutableList() + } + Untyped -> { + nestedFloat = BigDecimal("67.174118") + topLevelFloat = BigDecimal("67.174118") + badValuesData = + mapOf( + "id" to 5, + "struct" to "foo", + "struct_schemaless" to "foo", + "struct_empty" to "foo", + "array" to "foo", + "array_schemaless" to "foo", + "string" to StringValue("{}"), + "number" to "foo", + "integer" to "foo", + "boolean" to "foo", + // TODO this probably indicates that we should + // 1. actually parse time types + // 2. and just rely on the fallback to JsonToAirbyteValue.fromJson to return + // a StringValue + "timestamp_with_timezone" to TimestampValue("foo"), + "timestamp_without_timezone" to TimestampValue("foo"), + "time_with_timezone" to TimeValue("foo"), + "time_without_timezone" to TimeValue("foo"), + "date" to DateValue("foo"), + ) + badValuesChanges = mutableListOf() + } + } + dumpAndDiffRecords( + parsedConfig, + listOf( + OutputRecord( + extractedAt = 100, + generationId = 42, + data = + mapOf( + "id" to 1, + "struct" to mapOf("foo" to 1.0), + "struct_schemaless" to mapOf("foo" to 1.0), + "struct_empty" to mapOf("foo" to 1.0), + "array" to listOf(1.0), + "array_schemaless" to listOf(1.0), + "string" to "foo", + "number" to 42.1, + "integer" to 42, + "boolean" to true, + "timestamp_with_timezone" to + OffsetDateTime.parse("2023-01-23T12:34:56Z"), + "timestamp_without_timezone" to + LocalDateTime.parse("2023-01-23T12:34:56"), + "time_with_timezone" to OffsetTime.parse("12:34:56Z"), + "time_without_timezone" to LocalTime.parse("12:34:56"), + "date" to LocalDate.parse("2023-01-23"), + "unknown" to mapOf(), + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + OutputRecord( + extractedAt = 100, + generationId = 42, + data = + mapOf( + "id" to 2, + "struct" to null, + "struct_schemaless" to null, + "struct_empty" to null, + "array" to null, + "array_schemaless" to null, + "string" to null, + "number" to null, + "integer" to null, + "boolean" to null, + "timestamp_with_timezone" to null, + "timestamp_without_timezone" to null, + "time_with_timezone" to null, + "time_without_timezone" to null, + "date" to null, + "unknown" to null, + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + OutputRecord( + extractedAt = 100, + generationId = 42, + data = mapOf("id" to 3), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + OutputRecord( + extractedAt = 100, + generationId = 42, + data = + mapOf( + "id" to 4, + "struct" to mapOf("foo" to nestedFloat), + "struct_schemaless" to mapOf("foo" to nestedFloat), + "struct_empty" to mapOf("foo" to nestedFloat), + "array" to listOf(nestedFloat), + "array_schemaless" to listOf(nestedFloat), + "number" to topLevelFloat, + ), + airbyteMeta = OutputRecord.Meta(syncId = 42), + ), + OutputRecord( + extractedAt = 100, + generationId = 42, + data = badValuesData, + airbyteMeta = OutputRecord.Meta(syncId = 42, changes = badValuesChanges), + ), + ), + stream, + primaryKey = listOf(listOf("id")), + cursor = null, + ) + } /** * Some types (object/array) are expected to contain other types. Verify that we handle them @@ -1858,6 +2206,7 @@ abstract class BasicFunctionalityIntegrationTest( companion object { private val intType = FieldType(IntegerType, nullable = true) + private val numberType = FieldType(NumberType, nullable = true) private val stringType = FieldType(StringType, nullable = true) private val timestamptzType = FieldType(TimestampTypeWithTimezone, nullable = true) } diff --git a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt index 1d316df778f6..f154649416fb 100644 --- a/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt +++ b/airbyte-integrations/connectors/destination-dev-null/src/test-integration/kotlin/io/airbyte/integrations/destination/dev_null/DevNullBasicFunctionalityIntegrationTest.kt @@ -7,6 +7,7 @@ package io.airbyte.integrations.destination.dev_null import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest +import io.airbyte.cdk.load.write.Untyped import org.junit.jupiter.api.Test class DevNullBasicFunctionalityIntegrationTest : @@ -23,6 +24,7 @@ class DevNullBasicFunctionalityIntegrationTest : promoteUnionToObject = false, preserveUndeclaredFields = false, commitDataIncrementally = false, + allTypesBehavior = Untyped, ) { @Test override fun testBasicWrite() { diff --git a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt index 452648757bf3..cebb81db2e8a 100644 --- a/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-s3-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2WriteTest.kt @@ -6,7 +6,10 @@ package io.airbyte.integrations.destination.s3_v2 import io.airbyte.cdk.load.test.util.NoopDestinationCleaner import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper +import io.airbyte.cdk.load.write.AllTypesBehavior import io.airbyte.cdk.load.write.BasicFunctionalityIntegrationTest +import io.airbyte.cdk.load.write.StronglyTyped +import io.airbyte.cdk.load.write.Untyped import org.junit.jupiter.api.Disabled import org.junit.jupiter.api.Test @@ -17,6 +20,7 @@ abstract class S3V2WriteTest( preserveUndeclaredFields: Boolean, /** This is false for staging mode, and true for non-staging mode. */ commitDataIncrementally: Boolean = false, + allTypesBehavior: AllTypesBehavior, ) : BasicFunctionalityIntegrationTest( S3V2TestUtils.getConfig(path), @@ -30,6 +34,7 @@ abstract class S3V2WriteTest( promoteUnionToObject = promoteUnionToObject, preserveUndeclaredFields = preserveUndeclaredFields, commitDataIncrementally = commitDataIncrementally, + allTypesBehavior = allTypesBehavior, ) { @Test override fun testBasicWrite() { @@ -98,6 +103,7 @@ class S3V2WriteTestJsonUncompressed : stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, + allTypesBehavior = Untyped, ) class S3V2WriteTestJsonStaging : @@ -106,6 +112,7 @@ class S3V2WriteTestJsonStaging : stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, + allTypesBehavior = Untyped, ) class S3V2WriteTestJsonGzip : @@ -114,6 +121,7 @@ class S3V2WriteTestJsonGzip : stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, + allTypesBehavior = Untyped, ) class S3V2WriteTestCsvUncompressed : @@ -122,6 +130,7 @@ class S3V2WriteTestCsvUncompressed : stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, + allTypesBehavior = Untyped, ) class S3V2WriteTestCsvGzip : @@ -130,6 +139,7 @@ class S3V2WriteTestCsvGzip : stringifySchemalessObjects = false, promoteUnionToObject = false, preserveUndeclaredFields = true, + allTypesBehavior = Untyped, ) class S3V2WriteTestAvroUncompressed : @@ -138,6 +148,7 @@ class S3V2WriteTestAvroUncompressed : stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, + allTypesBehavior = StronglyTyped(), ) { @Disabled("Not yet working") @Test @@ -150,6 +161,12 @@ class S3V2WriteTestAvroUncompressed : override fun testUnions() { super.testUnions() } + + @Disabled("Not yet working") + @Test + override fun testAllTypes() { + super.testAllTypes() + } } class S3V2WriteTestAvroBzip2 : @@ -158,6 +175,7 @@ class S3V2WriteTestAvroBzip2 : stringifySchemalessObjects = true, promoteUnionToObject = false, preserveUndeclaredFields = false, + allTypesBehavior = StronglyTyped(), ) { @Disabled("Not yet working") @Test @@ -170,6 +188,12 @@ class S3V2WriteTestAvroBzip2 : override fun testUnions() { super.testUnions() } + + @Disabled("Not yet working") + @Test + override fun testAllTypes() { + super.testAllTypes() + } } class S3V2WriteTestParquetUncompressed : @@ -178,6 +202,7 @@ class S3V2WriteTestParquetUncompressed : stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, + allTypesBehavior = StronglyTyped(), ) { @Disabled("Not yet working") @Test @@ -190,6 +215,12 @@ class S3V2WriteTestParquetUncompressed : override fun testUnions() { super.testUnions() } + + @Disabled("Not yet working") + @Test + override fun testAllTypes() { + super.testAllTypes() + } } class S3V2WriteTestParquetSnappy : @@ -198,6 +229,7 @@ class S3V2WriteTestParquetSnappy : stringifySchemalessObjects = true, promoteUnionToObject = true, preserveUndeclaredFields = false, + allTypesBehavior = StronglyTyped(), ) { @Disabled("Not yet working") @Test @@ -210,4 +242,10 @@ class S3V2WriteTestParquetSnappy : override fun testUnions() { super.testUnions() } + + @Disabled("Not yet working") + @Test + override fun testAllTypes() { + super.testAllTypes() + } }