From 8f3afd0fd4c0ca071845512809955c1f76d04606 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 17 Dec 2024 14:54:13 -0800 Subject: [PATCH] fix null equality --- .../MockDestinationBackend.kt | 8 ++- .../cdk/load/test/util/RecordDifferTest.kt | 50 +++++++++++++++- .../cdk/load/test/util/RecordDiffer.kt | 57 ++++++++++++++++--- .../iceberg/v2/IcebergV2WriteTest.kt | 4 +- 4 files changed, 105 insertions(+), 14 deletions(-) diff --git a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt index 85c81a30fd26..8b1718645da3 100644 --- a/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt +++ b/airbyte-cdk/bulk/core/load/src/integrationTest/kotlin/io/airbyte/cdk/load/mock_integration_test/MockDestinationBackend.kt @@ -56,13 +56,17 @@ object MockDestinationBackend { // Assume that in dedup mode, we don't have duplicates - so we can just find the first // record with the same PK as the incoming record val existingRecord = - file.firstOrNull { RecordDiffer.comparePks(incomingPk, getPk(it)) == 0 } + file.firstOrNull { + RecordDiffer.comparePks(incomingPk, getPk(it), nullEqualsUnset = false) == 0 + } if (existingRecord == null) { file.add(incomingRecord) } else { val incomingCursor = getCursor(incomingRecord) val existingCursor = getCursor(existingRecord) - val compare = RecordDiffer.valueComparator.compare(incomingCursor, existingCursor) + val compare = + RecordDiffer.getValueComparator(nullEqualsUnset = false) + .compare(incomingCursor, existingCursor) // If the incoming record has a later cursor, // or the same cursor but a later extractedAt, // then upsert. (otherwise discard the incoming record.) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt index e6d9c21cdad4..5f567b00643d 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/test/util/RecordDifferTest.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.test.util import java.time.OffsetDateTime import kotlin.test.assertEquals +import kotlin.test.assertNull import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test @@ -155,7 +156,7 @@ class RecordDifferTest { ), ) ) - assertEquals(null, diff) + assertNull(diff) } /** Verify that the differ can sort records which are identical other than the cursor */ @@ -193,7 +194,7 @@ class RecordDifferTest { ), ), ) - assertEquals(null, diff) + assertNull(diff) } /** Verify that the differ can sort records which are identical other than extractedAt */ @@ -231,6 +232,49 @@ class RecordDifferTest { ), ) ) - assertEquals(null, diff) + assertNull(diff) + } + + @Test + fun testNullEqualsUnset() { + val diff = + RecordDiffer(primaryKey = listOf(listOf("id")), cursor = null, nullEqualsUnset = true) + .diffRecords( + listOf( + OutputRecord( + extractedAt = 1, + generationId = 0, + data = + mapOf( + "id" to 1, + "sub_object" to + mapOf( + "foo" to "bar", + "sub_list" to listOf(mapOf()), + ) + ), + airbyteMeta = null, + ), + ), + listOf( + OutputRecord( + extractedAt = 1, + generationId = 0, + data = + mapOf( + "id" to 1, + "name" to null, + "sub_object" to + mapOf( + "foo" to "bar", + "bar" to null, + "sub_list" to listOf(mapOf("foo" to null)), + ) + ), + airbyteMeta = null, + ), + ), + ) + assertNull(diff) } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt index 871116815541..49f275c17ead 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/RecordDiffer.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.load.test.util import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue import io.airbyte.cdk.load.data.DateValue import io.airbyte.cdk.load.data.IntegerValue import io.airbyte.cdk.load.data.NullValue @@ -48,6 +49,8 @@ class RecordDiffer( */ val allowUnexpectedRecord: Boolean = false, ) { + private val valueComparator = getValueComparator(nullEqualsUnset) + private fun extract(data: Map, path: List): AirbyteValue { return when (path.size) { 0 -> throw IllegalArgumentException("Empty path") @@ -87,7 +90,7 @@ class RecordDiffer( ) } - comparePks(pk1, pk2) + comparePks(pk1, pk2, nullEqualsUnset) } /** @@ -276,30 +279,39 @@ class RecordDiffer( } companion object { - val valueComparator: Comparator = - Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!) } + fun getValueComparator(nullEqualsUnset: Boolean): Comparator = + Comparator.nullsFirst { v1, v2 -> compare(v1!!, v2!!, nullEqualsUnset) } /** * Compare each PK field in order, until we find a field that the two records differ in. If * all the fields are equal, then these two records have the same PK. */ - fun comparePks(pk1: List, pk2: List) = - (pk1.zip(pk2) - .map { (pk1Field, pk2Field) -> valueComparator.compare(pk1Field, pk2Field) } + fun comparePks( + pk1: List, + pk2: List, + nullEqualsUnset: Boolean, + ): Int { + return (pk1.zip(pk2) + .map { (pk1Field, pk2Field) -> + getValueComparator(nullEqualsUnset).compare(pk1Field, pk2Field) + } .firstOrNull { it != 0 } ?: 0) + } - private fun compare(v1: AirbyteValue, v2: AirbyteValue): Int { + private fun compare(v1: AirbyteValue, v2: AirbyteValue, nullEqualsUnset: Boolean): Int { if (v1 is UnknownValue) { return compare( JsonToAirbyteValue().fromJson(v1.value), v2, + nullEqualsUnset, ) } if (v2 is UnknownValue) { return compare( v1, JsonToAirbyteValue().fromJson(v2.value), + nullEqualsUnset, ) } @@ -348,6 +360,37 @@ class RecordDiffer( } } } + is ObjectValue -> { + fun objComp(a: ObjectValue, b: ObjectValue): Int { + // objects aren't really comparable, so just do an equality check + return if (a == b) 0 else 1 + } + if (nullEqualsUnset) { + // Walk through the airbyte value, removing any NullValue entries + // from ObjectValues. + fun removeObjectNullValues(value: AirbyteValue): AirbyteValue = + when (value) { + is ObjectValue -> + ObjectValue( + value.values + .filterTo(linkedMapOf()) { (_, v) -> + v !is NullValue + } + .mapValuesTo(linkedMapOf()) { (_, v) -> + removeObjectNullValues(v) + } + ) + is ArrayValue -> + ArrayValue(value.values.map { removeObjectNullValues(it) }) + else -> value + } + val filteredV1 = removeObjectNullValues(v1) as ObjectValue + val filteredV2 = removeObjectNullValues(v2) as ObjectValue + objComp(filteredV1, filteredV2) + } else { + objComp(v1, v2 as ObjectValue) + } + } // otherwise, just be a terrible person. // we know these are the same type, so this is safe to do. is Comparable<*> -> diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt index 569b1fe2843f..4ff4dc9b6ab0 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergV2WriteTest.kt @@ -62,7 +62,7 @@ abstract class IcebergV2WriteTest( } @Test -// @Disabled + // @Disabled override fun testContainerTypes() { super.testContainerTypes() } @@ -82,7 +82,7 @@ abstract class IcebergV2WriteTest( } @Test -// @Disabled + // @Disabled override fun testUnions() { super.testUnions() }