Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Iceberg: Fix object type handling #49848

Draft
wants to merge 7 commits into
base: edgao/iceberg_glue_integration_test
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,4 @@ class MockBasicFunctionalityIntegrationTest :
override fun testBasicTypes() {
super.testBasicTypes()
}

@Test @Disabled override fun testBasicWriteFile() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ object MockDestinationBackend {
// Assume that in dedup mode, we don't have duplicates - so we can just find the first
// record with the same PK as the incoming record
val existingRecord =
file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 }
file.firstOrNull {
RecordDiffer.comparePks(incomingPk, getPk(it), nullEqualsUnset = false) == 0
}
if (existingRecord == null) {
file.add(incomingRecord)
} else {
val incomingCursor = getCursor(incomingRecord)
val existingCursor = getCursor(existingRecord)
val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor)
val compare =
RecordDiffer.getValueComparator(nullEqualsUnset = false)
.compare(incomingCursor, existingCursor)
// If the incoming record has a later cursor,
// or the same cursor but a later extractedAt,
// then upsert. (otherwise discard the incoming record.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.test.util

import java.time.OffsetDateTime
import kotlin.test.assertEquals
import kotlin.test.assertNull
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

Expand Down Expand Up @@ -155,7 +156,7 @@ class RecordDifferTest {
),
)
)
assertEquals(null, diff)
assertNull(diff)
}

/** Verify that the differ can sort records which are identical other than the cursor */
Expand Down Expand Up @@ -193,7 +194,7 @@ class RecordDifferTest {
),
),
)
assertEquals(null, diff)
assertNull(diff)
}

/** Verify that the differ can sort records which are identical other than extractedAt */
Expand Down Expand Up @@ -231,6 +232,49 @@ class RecordDifferTest {
),
)
)
assertEquals(null, diff)
assertNull(diff)
}

@Test
fun testNullEqualsUnset() {
val diff =
RecordDiffer(primaryKey = listOf(listOf("id")), cursor = null, nullEqualsUnset = true)
.diffRecords(
listOf(
OutputRecord(
extractedAt = 1,
generationId = 0,
data =
mapOf(
"id" to 1,
"sub_object" to
mapOf(
"foo" to "bar",
"sub_list" to listOf(mapOf<String, Any?>()),
)
),
airbyteMeta = null,
),
),
listOf(
OutputRecord(
extractedAt = 1,
generationId = 0,
data =
mapOf(
"id" to 1,
"name" to null,
"sub_object" to
mapOf(
"foo" to "bar",
"bar" to null,
"sub_list" to listOf(mapOf("foo" to null)),
)
),
airbyteMeta = null,
),
),
)
assertNull(diff)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.data.AirbyteType

fun interface ExpectedRecordMapper {
fun mapRecord(expectedRecord: OutputRecord): OutputRecord
fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord
}

object NoopExpectedRecordMapper : ExpectedRecordMapper {
override fun mapRecord(expectedRecord: OutputRecord): OutputRecord = expectedRecord
override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord =
expectedRecord
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ abstract class IntegrationTest(
) {
val actualRecords: List<OutputRecord> = dataDumper.dumpRecords(config, stream)
val expectedRecords: List<OutputRecord> =
canonicalExpectedRecords.map { recordMangler.mapRecord(it) }
canonicalExpectedRecords.map { recordMangler.mapRecord(it, stream.schema) }

RecordDiffer(
primaryKey = primaryKey.map { nameMapper.mapFieldName(it) },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.test.util

import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.DateValue
import io.airbyte.cdk.load.data.IntegerValue
import io.airbyte.cdk.load.data.NullValue
Expand Down Expand Up @@ -48,6 +49,8 @@ class RecordDiffer(
*/
val allowUnexpectedRecord: Boolean = false,
) {
private val valueComparator = getValueComparator(nullEqualsUnset)

private fun extract(data: Map<String, AirbyteValue>, path: List<String>): AirbyteValue {
return when (path.size) {
0 -> throw IllegalArgumentException("Empty path")
Expand Down Expand Up @@ -87,7 +90,7 @@ class RecordDiffer(
)
}

comparePks(pk1, pk2)
comparePks(pk1, pk2, nullEqualsUnset)
}

/**
Expand Down Expand Up @@ -276,30 +279,39 @@ class RecordDiffer(
}

companion object {
val valueComparator: Comparator<AirbyteValue> =
Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) }
fun getValueComparator(nullEqualsUnset: Boolean): Comparator<AirbyteValue> =
Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!, nullEqualsUnset) }

/**
* Compare each PK field in order, until we find a field that the two records differ in. If
* all the fields are equal, then these two records have the same PK.
*/
fun comparePks(pk1: List<AirbyteValue?>, pk2: List<AirbyteValue?>) =
(pk1.zip(pk2)
.map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) }
fun comparePks(
pk1: List<AirbyteValue?>,
pk2: List<AirbyteValue?>,
nullEqualsUnset: Boolean,
): Int {
return (pk1.zip(pk2)
.map { (pk1Field, pk2Field) ->
getValueComparator(nullEqualsUnset).compare(pk1Field, pk2Field)
}
.firstOrNull { it != 0 }
?: 0)
}

private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int {
private fun compare(v1: AirbyteValue, v2: AirbyteValue, nullEqualsUnset: Boolean): Int {
if (v1 is UnknownValue) {
return compare(
JsonToAirbyteValue().fromJson(v1.value),
v2,
nullEqualsUnset,
)
}
if (v2 is UnknownValue) {
return compare(
v1,
JsonToAirbyteValue().fromJson(v2.value),
nullEqualsUnset,
)
}

Expand Down Expand Up @@ -348,6 +360,37 @@ class RecordDiffer(
}
}
}
is ObjectValue -> {
fun objComp(a: ObjectValue, b: ObjectValue): Int {
// objects aren't really comparable, so just do an equality check
return if (a == b) 0 else 1
}
if (nullEqualsUnset) {
// Walk through the airbyte value, removing any NullValue entries
// from ObjectValues.
fun removeObjectNullValues(value: AirbyteValue): AirbyteValue =
when (value) {
is ObjectValue ->
ObjectValue(
value.values
.filterTo(linkedMapOf()) { (_, v) ->
v !is NullValue
}
.mapValuesTo(linkedMapOf()) { (_, v) ->
removeObjectNullValues(v)
}
)
is ArrayValue ->
ArrayValue(value.values.map { removeObjectNullValues(it) })
else -> value
}
val filteredV1 = removeObjectNullValues(v1) as ObjectValue
val filteredV2 = removeObjectNullValues(v2) as ObjectValue
objComp(filteredV1, filteredV2)
} else {
objComp(v1, v2 as ObjectValue)
}
}
// otherwise, just be a terrible person.
// we know these are the same type, so this is safe to do.
is Comparable<*> ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.iceberg.v2

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.ObjectValue
import io.airbyte.cdk.load.data.SchemalessValuesToJsonString
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
import io.airbyte.cdk.load.test.util.OutputRecord

object IcebergRecordMapper : ExpectedRecordMapper {
override fun mapRecord(expectedRecord: OutputRecord, schema: AirbyteType): OutputRecord {
val (mappedData, _) = SchemalessValuesToJsonString().map(expectedRecord.data, schema)
return expectedRecord.copy(data = mappedData as ObjectValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ package io.airbyte.integrations.destination.iceberg.v2
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.*
import io.airbyte.cdk.load.data.parquet.ParquetMapperPipelineFactory
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.math.BigDecimal
import java.time.Instant
import java.util.LinkedHashMap
import java.util.UUID
Expand All @@ -21,28 +19,22 @@ import org.apache.iceberg.data.Record

object IcebergV2DataDumper : DestinationDataDumper {

private fun convert(value: Any?, type: AirbyteType): AirbyteValue {
return if (value == null) {
NullValue
} else {
when (type) {
StringType -> StringValue(value as String)
is ArrayType -> ArrayValue((value as List<*>).map { convert(it, type.items.type) })
BooleanType -> BooleanValue(value as Boolean)
IntegerType -> IntegerValue(value as Long)
NumberType -> NumberValue(BigDecimal(value as Double))
else ->
throw IllegalArgumentException("Object type with empty schema is not supported")
}
}
}

private fun getCastedData(schema: ObjectType, record: Record): ObjectValue {
private fun toAirbyteValue(record: Record): ObjectValue {
return ObjectValue(
LinkedHashMap(
schema.properties
.map { (name, field) -> name to convert(record.getField(name), field.type) }
.toMap()
record
.struct()
.fields()
.filterNot { DestinationRecord.Meta.COLUMN_NAMES.contains(it.name()) }
.associate { field ->
val name = field.name()
val airbyteValue =
when (val value = record.getField(field.name())) {
is Record -> toAirbyteValue(value)
else -> AirbyteValue.from(value)
}
name to airbyteValue
}
)
)
}
Expand Down Expand Up @@ -80,8 +72,6 @@ object IcebergV2DataDumper : DestinationDataDumper {
stream: DestinationStream
): List<OutputRecord> {
val config = IcebergV2TestUtil.getConfig(spec)
val pipeline = ParquetMapperPipelineFactory().create(stream)
val schema = pipeline.finalSchema as ObjectType
val catalog = IcebergV2TestUtil.getCatalog(config)
val table =
catalog.loadTable(
Expand All @@ -108,7 +98,7 @@ object IcebergV2DataDumper : DestinationDataDumper {
generationId =
record.getField(DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID)
as Long,
data = getCastedData(schema, record),
data = toAirbyteValue(record),
airbyteMeta = getMetaData(record)
)
)
Expand Down
Loading
Loading