Skip to content

Commit

Permalink
Destination S3V2: Only fail on bad types for avro and parquet (parity… (
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt authored Dec 23, 2024
1 parent 8688072 commit 666e847
Show file tree
Hide file tree
Showing 21 changed files with 334 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,36 @@ package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.data.json.toJson
import io.airbyte.cdk.load.util.serializeToString

/**
* Intended for Avro and Parquet Conversions and similar use cases.
*
* The contract is to serialize the values of schemaless and unknown types to a json string.
*
* Because there is no JsonBlob `AirbyteType`, we leave the types as-is and just serialize them. It
* is expected that the serializer will know to expect strings for each type.
*
* This means there's no need for a type mapper, unless you also want to support some subset of the
* Unknown types.
*
* For example, [FailOnAllUnknownTypesExceptNull] is used to add support for `{ "type": "null" }`
*/
class FailOnAllUnknownTypesExceptNull : AirbyteSchemaIdentityMapper {
override fun mapUnknown(schema: UnknownType) =
if (
schema.schema.isObject &&
((schema.schema.get("type").isTextual &&
schema.schema.get("type").textValue() == "null") ||
(schema.schema.get("type").isArray &&
schema.schema.get("type").elements().asSequence().all {
it.isTextual && it.textValue() == "null"
}))
) {
schema
} else {
throw IllegalStateException("Unknown type: $schema")
}
}

class SchemalessValuesToJsonString : AirbyteValueIdentityMapper() {
override fun mapObjectWithoutSchema(
value: AirbyteValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class UnionTypeToDisjointRecord : AirbyteSchemaIdentityMapper {
is ObjectType -> "object"
is ArrayTypeWithoutSchema,
is ObjectTypeWithoutSchema,
is ObjectTypeWithEmptySchema -> "string"
is ObjectTypeWithEmptySchema -> "object"
is UnionType -> "union"
is UnknownType -> "unknown"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,7 @@ class JsonSchemaToAirbyteType {
"array" -> fromArray(schema)
"object" -> fromObject(schema)
"null" -> null
else ->
throw IllegalArgumentException(
"Unknown type: ${
schema.get("type").asText()
}"
)
else -> UnknownType(schema)
}
} else if (schemaType.isArray) {
// {"type": [...], ...}
Expand Down Expand Up @@ -92,12 +87,7 @@ class JsonSchemaToAirbyteType {
TimestampTypeWithTimezone
}
null -> StringType
else ->
throw IllegalArgumentException(
"Unknown string format: ${
schema.get("format").asText()
}"
)
else -> UnknownType(schema)
}

private fun fromNumber(schema: ObjectNode): AirbyteType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,24 +392,30 @@ class DestinationMessageFactory(
name = message.record.stream,
)
if (fileTransferEnabled) {
@Suppress("UNCHECKED_CAST")
val fileMessage =
message.record.additionalProperties["file"] as Map<String, Any>

DestinationFile(
stream = stream.descriptor,
emittedAtMs = message.record.emittedAt,
serialized = serialized,
fileMessage =
DestinationFile.AirbyteRecordMessageFile(
fileUrl = fileMessage["file_url"] as String?,
bytes = toLong(fileMessage["bytes"], "message.record.bytes"),
fileRelativePath = fileMessage["file_relative_path"] as String?,
modified =
toLong(fileMessage["modified"], "message.record.modified"),
sourceFileUrl = fileMessage["source_file_url"] as String?
)
)
try {
@Suppress("UNCHECKED_CAST")
val fileMessage =
message.record.additionalProperties["file"] as Map<String, Any>

DestinationFile(
stream = stream.descriptor,
emittedAtMs = message.record.emittedAt,
serialized = serialized,
fileMessage =
DestinationFile.AirbyteRecordMessageFile(
fileUrl = fileMessage["file_url"] as String?,
bytes = toLong(fileMessage["bytes"], "message.record.bytes"),
fileRelativePath = fileMessage["file_relative_path"] as String?,
modified =
toLong(fileMessage["modified"], "message.record.modified"),
sourceFileUrl = fileMessage["source_file_url"] as String?
)
)
} catch (e: Exception) {
throw IllegalArgumentException(
"Failed to construct file message: ${e.message}"
)
}
} else {
DestinationRecord(stream.descriptor, message, serialized, stream.schema)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import com.fasterxml.jackson.databind.node.JsonNodeFactory
import io.airbyte.cdk.load.test.util.Root
import io.airbyte.cdk.load.test.util.SchemaRecordBuilder
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class FailOnAllUnknownTypesExceptNullTest {
@Test
fun testBasicTypeBehavior() {
val nullType = JsonNodeFactory.instance.objectNode().put("type", "null")
val (inputSchema, expectedOutput) =
SchemaRecordBuilder<Root>()
.with(UnknownType(nullType))
.with(
UnknownType(
JsonNodeFactory.instance
.objectNode()
.set(
"type",
JsonNodeFactory.instance.arrayNode().add("null").add("null")
)
)
)
.build()
FailOnAllUnknownTypesExceptNull().map(inputSchema).let {
Assertions.assertEquals(expectedOutput, it)
}
}

@Test
fun `test throws on non-null unknown types`() {
val (inputSchema, _) =
SchemaRecordBuilder<Root>()
.with(UnknownType(JsonNodeFactory.instance.objectNode().put("type", "whatever")))
.build()
Assertions.assertThrows(IllegalStateException::class.java) {
FailOnAllUnknownTypesExceptNull().map(inputSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class UnionTypeToDisjointRecordTest {
fun testUnionOfTypesWithSameNameThrows() {
val (inputSchema, _) =
SchemaRecordBuilder<Root>()
// Both `StringType` and `ObjectWithoutSchema` are mapped to `string`
.with(UnionType.of(StringType, ObjectTypeWithoutSchema))
// Both are mapped to `string`
.with(UnionType.of(ObjectTypeWithEmptySchema, ObjectTypeWithoutSchema))
.build()
Assertions.assertThrows(IllegalArgumentException::class.java) {
UnionTypeToDisjointRecord().map(inputSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import org.junit.jupiter.api.Assumptions.assumeTrue
import org.junit.jupiter.api.Disabled
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll
import org.junit.jupiter.api.assertThrows

sealed interface AllTypesBehavior

Expand Down Expand Up @@ -126,6 +127,7 @@ abstract class BasicFunctionalityIntegrationTest(
*/
val commitDataIncrementally: Boolean,
val allTypesBehavior: AllTypesBehavior,
val failOnUnknownTypes: Boolean = false,
nullEqualsUnset: Boolean = false,
) : IntegrationTest(dataDumper, destinationCleaner, recordMangler, nameMapper, nullEqualsUnset) {
val parsedConfig = ValidatedJsonUtils.parseOne(configSpecClass, configContents)
Expand Down Expand Up @@ -1795,11 +1797,6 @@ abstract class BasicFunctionalityIntegrationTest(
"schemaless_object" to FieldType(ObjectTypeWithoutSchema, nullable = true),
"schematized_array" to FieldType(ArrayType(intType), nullable = true),
"schemaless_array" to FieldType(ArrayTypeWithoutSchema, nullable = true),
"unknown" to
FieldType(
UnknownType(JsonNodeFactory.instance.textNode("test")),
nullable = true
),
),
),
generationId = 42,
Expand All @@ -1820,8 +1817,7 @@ abstract class BasicFunctionalityIntegrationTest(
"empty_object": {},
"schemaless_object": { "uuid": "38F52396-736D-4B23-B5B4-F504D8894B97", "probability": 1.5 },
"schematized_array": [10, null],
"schemaless_array": [ 10, "foo", null, { "bar": "qua" } ],
"unknown": {"foo": "bar"}
"schemaless_array": [ 10, "foo", null, { "bar": "qua" } ]
}""".trimIndent(),
emittedAtMs = 1602637589100,
),
Expand All @@ -1835,8 +1831,7 @@ abstract class BasicFunctionalityIntegrationTest(
"empty_object": {"extra": "stuff"},
"schemaless_object": { "address": { "street": "113 Hickey Rd", "zip": "37932" }, "flags": [ true, false, false ] },
"schematized_array": [],
"schemaless_array": [],
"unknown": {}
"schemaless_array": []
}""".trimIndent(),
emittedAtMs = 1602637589200,
),
Expand All @@ -1850,8 +1845,7 @@ abstract class BasicFunctionalityIntegrationTest(
"empty_object": null,
"schemaless_object": null,
"schematized_array": null,
"schemaless_array": null,
"unknown": null
"schemaless_array": null
}""".trimIndent(),
emittedAtMs = 1602637589300,
),
Expand Down Expand Up @@ -1885,12 +1879,6 @@ abstract class BasicFunctionalityIntegrationTest(
} else {
listOf(10, "foo", null, mapOf("bar" to "qua"))
},
"unknown" to
if (stringifySchemalessObjects) {
"""{"foo":"bar"}"""
} else {
mapOf("foo" to "bar")
},
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
Expand Down Expand Up @@ -1927,12 +1915,6 @@ abstract class BasicFunctionalityIntegrationTest(
} else {
emptyList<Any>()
},
"unknown" to
if (stringifySchemalessObjects) {
"""{}"""
} else {
emptyMap<String, Any?>()
},
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
Expand All @@ -1947,7 +1929,6 @@ abstract class BasicFunctionalityIntegrationTest(
"schemaless_object" to null,
"schematized_array" to null,
"schemaless_array" to null,
"unknown" to null,
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
Expand All @@ -1962,6 +1943,74 @@ abstract class BasicFunctionalityIntegrationTest(
)
}

@Test
open fun testUnknownTypes() {
val stream =
DestinationStream(
DestinationStream.Descriptor(randomizedNamespace, "problematic_types"),
Append,
ObjectType(
linkedMapOf(
"id" to
FieldType(
UnknownType(
JsonNodeFactory.instance.objectNode().put("type", "whatever")
),
nullable = true
),
),
),
generationId = 42,
minimumGenerationId = 0,
syncId = 42,
)
runSync(
configContents,
stream,
listOf(
InputRecord(
randomizedNamespace,
"problematic_types",
"""
{
"id": "ex falso quodlibet"
}""".trimIndent(),
emittedAtMs = 1602637589100,
)
)
)

val expectedRecords: List<OutputRecord> =
listOf(
OutputRecord(
extractedAt = 1602637589100,
generationId = 42,
data =
mapOf(
"id" to "ex falso quodlibet",
),
airbyteMeta = OutputRecord.Meta(syncId = 42),
),
)

val dumpBlock = {
dumpAndDiffRecords(
parsedConfig,
expectedRecords,
stream,
primaryKey = listOf(listOf("id")),
cursor = null,
)
}
if (failOnUnknownTypes) {
// Note: this will not catch assertion errors against data
// if the destination actually succeeds (by design).
assertThrows<Exception> { dumpBlock() }
} else {
dumpBlock()
}
}

/**
* This test verifies that destinations handle unions correctly.
*
Expand Down
Loading

0 comments on commit 666e847

Please sign in to comment.