diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt new file mode 100644 index 000000000000..94c0b5cfd2b3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.v2 + +import jakarta.inject.Singleton +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.PrimitiveType +import org.apache.iceberg.types.Type.TypeID +import org.apache.iceberg.types.Type.TypeID.* +import org.apache.iceberg.types.TypeUtil +import org.apache.iceberg.types.Types.* + +/** + * A utility class that determines a "supertype" given two Iceberg [Type]s. + * + * The "supertype" is a type to which both input types can safely be promoted without data loss. For + * instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc. + * + * @property icebergTypesComparator comparator used to verify deep type equality. + */ +@Singleton +class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) { + private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) + + /** + * Returns a supertype for [existingType] and [incomingType] if one exists. + * - If they are deeply equal (according to [IcebergTypesComparator.typesAreEqual]), returns the + * [existingType] as-is. + * - Otherwise, attempts to combine them into a valid supertype. + * - Throws [IllegalArgumentException] if no valid supertype can be found. + */ + fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type { + // If the two types are already deeply equal, return one of them (arbitrary). + if (icebergTypesComparator.typesAreEqual(incomingType, existingType)) { + return existingType + } + // Otherwise, attempt to combine them into a valid supertype. + return combineTypes(existingType, incomingType, columnName) + } + + /** + * Combines two top-level [Type]s. If exactly one is primitive and the other is non-primitive, + * no supertype is possible => throws [IllegalArgumentException]. + */ + private fun combineTypes(existingType: Type, incomingType: Type, columnName: String): Type { + if (existingType.isPrimitiveType != incomingType.isPrimitiveType) { + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + // Both are primitive + if (existingType.isPrimitiveType && incomingType.isPrimitiveType) { + return combinePrimitives( + existingType.asPrimitiveType(), + incomingType.asPrimitiveType(), + columnName + ) + } + + // Both are non-primitive => not currently supported + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + /** + * Checks whether either type is unsupported or unmapped (e.g. BINARY, DECIMAL, FIXED, etc.). + * + * @throws IllegalArgumentException if either type is unsupported. + */ + private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) { + val providedTypes = listOf(typeId1, typeId2) + val foundUnsupported = providedTypes.filter { it in unsupportedTypeIds } + + if (foundUnsupported.isNotEmpty()) { + throw IllegalArgumentException( + "Unsupported or unmapped Iceberg type(s): ${foundUnsupported.joinToString()}. Please implement handling if needed." + ) + } + } + + /** + * Attempts to combine two [PrimitiveType]s into a valid supertype by using + * [TypeUtil.isPromotionAllowed]. + * + * - If they have the same [TypeID], just returns the existing type (since they’re not deeply + * equal, but the top-level ID is the same. You may want to consider e.g. TIMESTAMP with/without + * UTC). + * - If they have different IDs, tries known promotions (INT->LONG, FLOAT->DOUBLE). + * - If promotion is not allowed, throws [IllegalArgumentException]. + */ + private fun combinePrimitives( + existingType: PrimitiveType, + incomingType: PrimitiveType, + columnName: String + ): Type { + val existingTypeId = existingType.typeId() + val incomingTypeId = incomingType.typeId() + // If promotion is not allowed by Iceberg, fail fast. + if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) { + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + validateTypeIds(existingTypeId, incomingTypeId) + + // If both are the same type ID, we just use the existing type + if (existingTypeId == incomingTypeId) { + // For timestamps, you'd want to reconcile UTC. This is simplified here. + return existingType + } + + // Otherwise, we attempt known promotions + return when (existingTypeId) { + INTEGER -> + when (incomingTypeId) { + LONG -> LongType.get() + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + FLOAT -> + when (incomingTypeId) { + DOUBLE -> DoubleType.get() + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + } + + /** + * Helper function to throw a standardized [IllegalArgumentException] for invalid type combos. + */ + private fun throwIllegalTypeCombination( + existingType: Type, + incomingType: Type, + columnName: String + ): Nothing = + throw IllegalArgumentException( + "Conversion for column \"$columnName\" between $existingType and $incomingType is not allowed." + ) +} diff --git a/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt new file mode 100644 index 000000000000..80f0bdca6405 --- /dev/null +++ b/airbyte-integrations/connectors/destination-iceberg-v2/src/test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.iceberg.v2 + +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.TypeID.DOUBLE +import org.apache.iceberg.types.Type.TypeID.LONG +import org.apache.iceberg.types.Type.TypeID.TIMESTAMP_NANO +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.Types.TimestampType +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.Test + +/** Comprehensive test suite for [IcebergSuperTypeFinder]. */ +class IcebergSuperTypeFinderTest { + + private val superTypeFinder = IcebergSuperTypeFinder(IcebergTypesComparator()) + + @Test + fun testIdenticalPrimitiveTypes() { + val intType = Types.IntegerType.get() + val result = superTypeFinder.findSuperType(intType, intType, "column_name") + + // They are "equal" => expect the existing type to be returned + assertThat(result).isSameAs(intType) + } + + @Test + fun testIdenticalTimestampTypesWithZone() { + val tsWithZone = TimestampType.withZone() + val result = superTypeFinder.findSuperType(tsWithZone, tsWithZone, "column_name") + + assertThat(result).isSameAs(tsWithZone) + } + + @Test + fun testDifferentTimestampZoneThrows() { + val tsWithZone = TimestampType.withZone() + val tsWithoutZone = TimestampType.withoutZone() + + assertThatThrownBy { + superTypeFinder.findSuperType(tsWithZone, tsWithoutZone, "column_name") + } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Conversion for column \"column_name\" between") + } + + @Test + fun testIntToLongPromotion() { + val intType = Types.IntegerType.get() + val longType = Types.LongType.get() + + val result = superTypeFinder.findSuperType(intType, longType, "column_name") + // INT -> LONG => LONG is the supertype + assertThat(result.typeId()).isEqualTo(LONG) + } + + @Test + fun testFloatToDoublePromotion() { + val floatType = Types.FloatType.get() + val doubleType = Types.DoubleType.get() + + val result = superTypeFinder.findSuperType(floatType, doubleType, "column_name") + assertThat(result.typeId()).isEqualTo(DOUBLE) + } + + @Test + fun testIntToDoubleIsNotAllowed() { + val intType = Types.IntegerType.get() + val doubleType = Types.DoubleType.get() + + // By default, TypeUtil.isPromotionAllowed(int, double) returns false + assertThatThrownBy { superTypeFinder.findSuperType(intType, doubleType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and double is not allowed." + ) + } + + @Test + fun testPrimitiveToNonPrimitiveThrows() { + val intType = Types.IntegerType.get() + val structType = + Types.StructType.of( + Types.NestedField.optional(1, "field", Types.StringType.get()), + ) + + // Attempting to combine int (primitive) with struct (non-primitive) => error + assertThatThrownBy { superTypeFinder.findSuperType(intType, structType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and struct<1: field: optional string> is not allowed." + ) + } + + @Test + fun testNonPrimitiveToPrimitiveThrows() { + val structType = + Types.StructType.of( + Types.NestedField.optional(1, "field", Types.StringType.get()), + ) + val intType = Types.IntegerType.get() + + assertThatThrownBy { superTypeFinder.findSuperType(structType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between struct<1: field: optional string> and int is not allowed." + ) + } + + @Test + fun testBinaryIsUnsupported() { + val binaryType = Types.BinaryType.get() + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => BINARY is not supported + assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between binary and int is not allowed." + ) + } + + @Test + fun testDecimalIsUnsupported() { + val decimalType = Types.DecimalType.of(10, 2) + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => DECIMAL is not supported + assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between decimal(10, 2) and int is not allowed." + ) + } + + @Test + fun testFixedIsUnsupported() { + val fixedType = Types.FixedType.ofLength(16) + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => FIXED is not supported + assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between fixed[16] and int is not allowed." + ) + } + + @Test + fun testUUIDIsUnsupported() { + val uuidType = Types.UUIDType.get() + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => UUID is not supported + assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between uuid and int is not allowed." + ) + } + + @Test + fun testTimestampNanoIsUnsupported() { + // For illustration, let's assume TypeID.TIMESTAMP_NANO is an unsupported extension. + // This is a hypothetical scenario in some Iceberg versions. + // We'll fake it with reflection or a custom type (if needed). + // Alternatively, just a conceptual test that TIMESTAMP_NANO is not allowed. + + // We'll mimic that with a custom type object for demonstration: + val nanoTimestamp = + object : Type.PrimitiveType() { + override fun typeId() = TIMESTAMP_NANO + override fun isPrimitiveType() = true + } + val normalTimestamp = TimestampType.withoutZone() + + assertThatThrownBy { + superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name") + } + .isInstanceOf(IllegalArgumentException::class.java) + } + + @Test + fun testPromotionIsNotAllowedByIceberg() { + // Suppose the user tries to do INT -> FLOAT + val intType = Types.IntegerType.get() + val floatType = Types.FloatType.get() + + // By default, TypeUtil.isPromotionAllowed(int, float) is false + assertThatThrownBy { superTypeFinder.findSuperType(intType, floatType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and float is not allowed." + ) + } +}