From 08b31970aaf6299a71f220e6c9f27af6eb16272c Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Tue, 24 Dec 2024 15:28:16 +0530 Subject: [PATCH 1/3] feat: implement logic to determine super types between iceberg types --- .../iceberg/v2/IcebergSuperTypeFinder.kt | 133 ++++++++++++ .../iceberg/v2/IcebergSuperTypeFinderTest.kt | 189 ++++++++++++++++++ 2 files changed, 322 insertions(+) create mode 100644 airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt create mode 100644 airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt new file mode 100644 index 000000000000..d8fc4c0632e6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt @@ -0,0 +1,133 @@ +package io.airbyte.integrations.destination.iceberg.v2 + +import jakarta.inject.Singleton +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.PrimitiveType +import org.apache.iceberg.types.Type.TypeID +import org.apache.iceberg.types.Type.TypeID.* +import org.apache.iceberg.types.TypeUtil +import org.apache.iceberg.types.Types.* + +/** + * A utility class that determines a "supertype" given two Iceberg [Type]s. + * + * The "supertype" is a type to which both input types can safely be promoted without data loss. For + * instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc. + * + * @property icebergTypesComparator comparator used to verify deep type equality. + */ +@Singleton +class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) { + + /** + * Returns a supertype for [existingType] and [incomingType] if one exists. + * - If they are deeply equal (according to [IcebergTypesComparator.typesAreEqual]), returns the + * [existingType] as-is. + * - Otherwise, attempts to combine them into a valid supertype. + * - Throws [IllegalArgumentException] if no valid supertype can be found. + */ + fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type { + // If the two types are already deeply equal, return one of them (arbitrary). + if (icebergTypesComparator.typesAreEqual(incomingType, existingType)) { + return existingType + } + // Otherwise, attempt to combine them into a valid supertype. + return combineTypes(existingType, incomingType, columnName) + } + + /** + * Combines two top-level [Type]s. If exactly one is primitive and the other is non-primitive, + * no supertype is possible => throws [IllegalArgumentException]. + */ + private fun combineTypes(existingType: Type, incomingType: Type, columnName: String): Type { + if (existingType.isPrimitiveType != incomingType.isPrimitiveType) { + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + // Both are primitive + if (existingType.isPrimitiveType && incomingType.isPrimitiveType) { + return combinePrimitives( + existingType.asPrimitiveType(), + incomingType.asPrimitiveType(), + columnName + ) + } + + // Both are non-primitive => not currently supported + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + /** + * Checks whether either type is unsupported or unmapped (e.g. BINARY, DECIMAL, FIXED, etc.). + * + * @throws IllegalArgumentException if either type is unsupported. + */ + private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) { + val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) + if (typeId1 in unsupportedTypeIds || typeId2 in unsupportedTypeIds) { + val badTypeId = if (typeId1 in unsupportedTypeIds) typeId1 else typeId2 + throw IllegalArgumentException( + "Unsupported or unmapped Iceberg type: $badTypeId. Please implement handling if needed." + ) + } + } + + /** + * Attempts to combine two [PrimitiveType]s into a valid supertype by using + * [TypeUtil.isPromotionAllowed]. + * + * - If they have the same [TypeID], just returns the existing type (since they’re not deeply + * equal, but the top-level ID is the same. You may want to consider e.g. TIMESTAMP with/without + * UTC). + * - If they have different IDs, tries known promotions (INT->LONG, FLOAT->DOUBLE). + * - If promotion is not allowed, throws [IllegalArgumentException]. + */ + private fun combinePrimitives( + existingType: PrimitiveType, + incomingType: PrimitiveType, + columnName: String + ): Type { + val existingTypeId = existingType.typeId() + val incomingTypeId = incomingType.typeId() + + validateTypeIds(existingTypeId, incomingTypeId) + + // If promotion is not allowed by Iceberg, fail fast. + if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) { + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + // If both are the same type ID, we just use the existing type + if (existingTypeId == incomingTypeId) { + // For timestamps, you'd want to reconcile UTC. This is simplified here. + return existingType + } + + // Otherwise, we attempt known promotions + return when (existingTypeId) { + INTEGER -> + when (incomingTypeId) { + LONG -> LongType.get() + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + FLOAT -> + when (incomingTypeId) { + DOUBLE -> DoubleType.get() + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + } + + /** + * Helper function to throw a standardized [IllegalArgumentException] for invalid type combos. + */ + private fun throwIllegalTypeCombination( + existingType: Type, + incomingType: Type, + columnName: String + ): Nothing = + throw IllegalArgumentException( + "Conversion for column \"$columnName\" between $existingType and $incomingType is not allowed." + ) +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt new file mode 100644 index 000000000000..c31b700a57bc --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt @@ -0,0 +1,189 @@ +package io.airbyte.integrations.destination.iceberg.v2 + +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.TypeID.DOUBLE +import org.apache.iceberg.types.Type.TypeID.LONG +import org.apache.iceberg.types.Type.TypeID.TIMESTAMP_NANO +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.Types.TimestampType +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.Test + +/** Comprehensive test suite for [IcebergSuperTypeFinder]. */ +class IcebergSuperTypeFinderTest { + + private val superTypeFinder = IcebergSuperTypeFinder(IcebergTypesComparator()) + + @Test + fun testIdenticalPrimitiveTypes() { + val intType = Types.IntegerType.get() + val result = superTypeFinder.findSuperType(intType, intType, "column_name") + + // They are "equal" => expect the existing type to be returned + assertThat(result).isSameAs(intType) + } + + @Test + fun testIdenticalTimestampTypesWithZone() { + val tsWithZone = TimestampType.withZone() + val result = superTypeFinder.findSuperType(tsWithZone, tsWithZone, "column_name") + + assertThat(result).isSameAs(tsWithZone) + } + + @Test + fun testDifferentTimestampZoneThrows() { + val tsWithZone = TimestampType.withZone() + val tsWithoutZone = TimestampType.withoutZone() + + assertThatThrownBy { + superTypeFinder.findSuperType(tsWithZone, tsWithoutZone, "column_name") + } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Conversion for column \"column_name\" between") + } + + @Test + fun testIntToLongPromotion() { + val intType = Types.IntegerType.get() + val longType = Types.LongType.get() + + val result = superTypeFinder.findSuperType(intType, longType, "column_name") + // INT -> LONG => LONG is the supertype + assertThat(result.typeId()).isEqualTo(LONG) + } + + @Test + fun testFloatToDoublePromotion() { + val floatType = Types.FloatType.get() + val doubleType = Types.DoubleType.get() + + val result = superTypeFinder.findSuperType(floatType, doubleType, "column_name") + assertThat(result.typeId()).isEqualTo(DOUBLE) + } + + @Test + fun testIntToDoubleIsNotAllowed() { + val intType = Types.IntegerType.get() + val doubleType = Types.DoubleType.get() + + // By default, TypeUtil.isPromotionAllowed(int, double) returns false + assertThatThrownBy { superTypeFinder.findSuperType(intType, doubleType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and double is not allowed." + ) + } + + @Test + fun testPrimitiveToNonPrimitiveThrows() { + val intType = Types.IntegerType.get() + val structType = + Types.StructType.of( + Types.NestedField.optional(1, "field", Types.StringType.get()), + ) + + // Attempting to combine int (primitive) with struct (non-primitive) => error + assertThatThrownBy { superTypeFinder.findSuperType(intType, structType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and struct<1: field: optional string> is not allowed." + ) + } + + @Test + fun testNonPrimitiveToPrimitiveThrows() { + val structType = + Types.StructType.of( + Types.NestedField.optional(1, "field", Types.StringType.get()), + ) + val intType = Types.IntegerType.get() + + assertThatThrownBy { superTypeFinder.findSuperType(structType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between struct<1: field: optional string> and int is not allowed." + ) + } + + @Test + fun testBinaryIsUnsupported() { + val binaryType = Types.BinaryType.get() + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => BINARY is not supported + assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Unsupported or unmapped Iceberg type: BINARY") + } + + @Test + fun testDecimalIsUnsupported() { + val decimalType = Types.DecimalType.of(10, 2) + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => DECIMAL is not supported + assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Unsupported or unmapped Iceberg type: DECIMAL") + } + + @Test + fun testFixedIsUnsupported() { + val fixedType = Types.FixedType.ofLength(16) + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => FIXED is not supported + assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Unsupported or unmapped Iceberg type: FIXED") + } + + @Test + fun testUUIDIsUnsupported() { + val uuidType = Types.UUIDType.get() + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => UUID is not supported + assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Unsupported or unmapped Iceberg type: UUID") + } + + @Test + fun testTimestampNanoIsUnsupported() { + // For illustration, let's assume TypeID.TIMESTAMP_NANO is an unsupported extension. + // This is a hypothetical scenario in some Iceberg versions. + // We'll fake it with reflection or a custom type (if needed). + // Alternatively, just a conceptual test that TIMESTAMP_NANO is not allowed. + + // We'll mimic that with a custom type object for demonstration: + val nanoTimestamp = + object : Type.PrimitiveType() { + override fun typeId() = TIMESTAMP_NANO + override fun isPrimitiveType() = true + } + val normalTimestamp = TimestampType.withoutZone() + + assertThatThrownBy { + superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name") + } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Unsupported or unmapped Iceberg type: TIMESTAMP_NANO") + } + + @Test + fun testPromotionIsNotAllowedByIceberg() { + // Suppose the user tries to do INT -> FLOAT + val intType = Types.IntegerType.get() + val floatType = Types.FloatType.get() + + // By default, TypeUtil.isPromotionAllowed(int, float) is false + assertThatThrownBy { superTypeFinder.findSuperType(intType, floatType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and float is not allowed." + ) + } +} From 4249facbd701a2ae6ca24d87d8f4c6c25c27c922 Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Tue, 24 Dec 2024 15:29:19 +0530 Subject: [PATCH 2/3] format --- .../destination/iceberg/v2/IcebergSuperTypeFinder.kt | 4 ++++ .../destination/iceberg/v2/IcebergSuperTypeFinderTest.kt | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt index d8fc4c0632e6..b315e41f4b67 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.iceberg.v2 import jakarta.inject.Singleton diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt index c31b700a57bc..71db8a87f6a1 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.iceberg.v2 import org.apache.iceberg.types.Type From 3b39576f124e307f85eef2ba0c2abea76d8e152b Mon Sep 17 00:00:00 2001 From: subodhchaturvedi Date: Wed, 25 Dec 2024 10:18:05 +0530 Subject: [PATCH 3/3] address review comments --- .../iceberg/v2/IcebergSuperTypeFinder.kt | 15 ++++++++------- .../iceberg/v2/IcebergSuperTypeFinderTest.kt | 17 ++++++++++++----- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt index b315e41f4b67..94c0b5cfd2b3 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt @@ -22,6 +22,7 @@ import org.apache.iceberg.types.Types.* */ @Singleton class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) { + private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) /** * Returns a supertype for [existingType] and [incomingType] if one exists. @@ -67,11 +68,12 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom * @throws IllegalArgumentException if either type is unsupported. */ private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) { - val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) - if (typeId1 in unsupportedTypeIds || typeId2 in unsupportedTypeIds) { - val badTypeId = if (typeId1 in unsupportedTypeIds) typeId1 else typeId2 + val providedTypes = listOf(typeId1, typeId2) + val foundUnsupported = providedTypes.filter { it in unsupportedTypeIds } + + if (foundUnsupported.isNotEmpty()) { throw IllegalArgumentException( - "Unsupported or unmapped Iceberg type: $badTypeId. Please implement handling if needed." + "Unsupported or unmapped Iceberg type(s): ${foundUnsupported.joinToString()}. Please implement handling if needed." ) } } @@ -93,14 +95,13 @@ class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesCom ): Type { val existingTypeId = existingType.typeId() val incomingTypeId = incomingType.typeId() - - validateTypeIds(existingTypeId, incomingTypeId) - // If promotion is not allowed by Iceberg, fail fast. if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) { throwIllegalTypeCombination(existingType, incomingType, columnName) } + validateTypeIds(existingTypeId, incomingTypeId) + // If both are the same type ID, we just use the existing type if (existingTypeId == incomingTypeId) { // For timestamps, you'd want to reconcile UTC. This is simplified here. diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt index 71db8a87f6a1..80f0bdca6405 100644 --- a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt @@ -119,7 +119,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => BINARY is not supported assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: BINARY") + .hasMessageContaining( + "Conversion for column \"column_name\" between binary and int is not allowed." + ) } @Test @@ -130,7 +132,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => DECIMAL is not supported assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: DECIMAL") + .hasMessageContaining( + "Conversion for column \"column_name\" between decimal(10, 2) and int is not allowed." + ) } @Test @@ -141,7 +145,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => FIXED is not supported assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: FIXED") + .hasMessageContaining( + "Conversion for column \"column_name\" between fixed[16] and int is not allowed." + ) } @Test @@ -152,7 +158,9 @@ class IcebergSuperTypeFinderTest { // Fails in validateTypeIds => UUID is not supported assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: UUID") + .hasMessageContaining( + "Conversion for column \"column_name\" between uuid and int is not allowed." + ) } @Test @@ -174,7 +182,6 @@ class IcebergSuperTypeFinderTest { superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name") } .isInstanceOf(IllegalArgumentException::class.java) - .hasMessageContaining("Unsupported or unmapped Iceberg type: TIMESTAMP_NANO") } @Test