From d5be4b75f3caea0b176f583b9f10fdb415d3bd35 Mon Sep 17 00:00:00 2001 From: Subodh Kant Chaturvedi Date: Tue, 14 Jan 2025 11:16:26 +0530 Subject: [PATCH] feat: implement logic to determine super types between iceberg types (#50412) Co-authored-by: Octavia Squidington III Co-authored-by: Francis Genet --- .../destination-s3-data-lake/metadata.yaml | 2 +- .../s3_data_lake/S3DataLakeSuperTypeFinder.kt | 138 ++++++++++++ .../S3DataLakeSuperTypeFinderTest.kt | 200 ++++++++++++++++++ .../integrations/destinations/s3-data-lake.md | 19 +- 4 files changed, 349 insertions(+), 10 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinder.kt create mode 100644 airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml index 284c9dec54ad..2e654790784e 100644 --- a/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3-data-lake/metadata.yaml @@ -26,7 +26,7 @@ data: alias: airbyte-connector-testing-secret-store connectorType: destination definitionId: 716ca874-520b-4902-9f80-9fad66754b89 - dockerImageTag: 0.2.12 + dockerImageTag: 0.2.13 dockerRepository: airbyte/destination-s3-data-lake documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake githubIssueLabel: destination-s3-data-lake diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinder.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinder.kt new file mode 100644 index 000000000000..7153e75686cd --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinder.kt @@ -0,0 +1,138 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_data_lake + +import jakarta.inject.Singleton +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.PrimitiveType +import org.apache.iceberg.types.Type.TypeID +import org.apache.iceberg.types.Type.TypeID.* +import org.apache.iceberg.types.TypeUtil +import org.apache.iceberg.types.Types.* + +/** + * A utility class that determines a "supertype" given two Iceberg [Type]s. + * + * The "supertype" is a type to which both input types can safely be promoted without data loss. For + * instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc. + * + * @property S3DataLakeTypesComparator comparator used to verify deep type equality. + */ +@Singleton +class S3DataLakeSuperTypeFinder(private val s3DataLakeTypesComparator: S3DataLakeTypesComparator) { + private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) + + /** + * Returns a supertype for [existingType] and [incomingType] if one exists. + * - If they are deeply equal (according to [S3DataLakeTypesComparator.typesAreEqual]), returns + * the [existingType] as-is. + * - Otherwise, attempts to combine them into a valid supertype. + * - Throws [IllegalArgumentException] if no valid supertype can be found. + */ + fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type { + // If the two types are already deeply equal, return one of them (arbitrary). + if (s3DataLakeTypesComparator.typesAreEqual(incomingType, existingType)) { + return existingType + } + // Otherwise, attempt to combine them into a valid supertype. + return combineTypes(existingType, incomingType, columnName) + } + + /** + * Combines two top-level [Type]s. If exactly one is primitive and the other is non-primitive, + * no supertype is possible => throws [IllegalArgumentException]. + */ + private fun combineTypes(existingType: Type, incomingType: Type, columnName: String): Type { + if (existingType.isPrimitiveType != incomingType.isPrimitiveType) { + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + // Both are primitive + if (existingType.isPrimitiveType && incomingType.isPrimitiveType) { + return combinePrimitives( + existingType.asPrimitiveType(), + incomingType.asPrimitiveType(), + columnName + ) + } + + // Both are non-primitive => not currently supported + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + /** + * Checks whether either type is unsupported or unmapped (e.g. BINARY, DECIMAL, FIXED, etc.). + * + * @throws IllegalArgumentException if either type is unsupported. + */ + private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) { + val providedTypes = listOf(typeId1, typeId2) + val foundUnsupported = providedTypes.filter { it in unsupportedTypeIds } + + if (foundUnsupported.isNotEmpty()) { + throw IllegalArgumentException( + "Unsupported or unmapped Iceberg type(s): ${foundUnsupported.joinToString()}. Please implement handling if needed." + ) + } + } + + /** + * Attempts to combine two [PrimitiveType]s into a valid supertype by using + * [TypeUtil.isPromotionAllowed]. + * + * - If they have the same [TypeID], just returns the existing type (since they’re not deeply + * equal, but the top-level ID is the same. You may want to consider e.g. TIMESTAMP with/without + * UTC). + * - If they have different IDs, tries known promotions (INT->LONG, FLOAT->DOUBLE). + * - If promotion is not allowed, throws [IllegalArgumentException]. + */ + private fun combinePrimitives( + existingType: PrimitiveType, + incomingType: PrimitiveType, + columnName: String + ): Type { + val existingTypeId = existingType.typeId() + val incomingTypeId = incomingType.typeId() + // If promotion is not allowed by Iceberg, fail fast. + if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) { + throwIllegalTypeCombination(existingType, incomingType, columnName) + } + + validateTypeIds(existingTypeId, incomingTypeId) + + // If both are the same type ID, we just use the existing type + if (existingTypeId == incomingTypeId) { + // For timestamps, you'd want to reconcile UTC. This is simplified here. + return existingType + } + + // Otherwise, we attempt known promotions + return when (existingTypeId) { + INTEGER -> + when (incomingTypeId) { + LONG -> LongType.get() + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + FLOAT -> + when (incomingTypeId) { + DOUBLE -> DoubleType.get() + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + else -> throwIllegalTypeCombination(existingType, incomingType, columnName) + } + } + + /** + * Helper function to throw a standardized [IllegalArgumentException] for invalid type combos. + */ + private fun throwIllegalTypeCombination( + existingType: Type, + incomingType: Type, + columnName: String + ): Nothing = + throw IllegalArgumentException( + "Conversion for column \"$columnName\" between $existingType and $incomingType is not allowed." + ) +} diff --git a/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt new file mode 100644 index 000000000000..7d6e06fde42f --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3-data-lake/src/test/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeSuperTypeFinderTest.kt @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_data_lake + +import org.apache.iceberg.types.Type +import org.apache.iceberg.types.Type.TypeID.DOUBLE +import org.apache.iceberg.types.Type.TypeID.LONG +import org.apache.iceberg.types.Type.TypeID.TIMESTAMP_NANO +import org.apache.iceberg.types.Types +import org.apache.iceberg.types.Types.TimestampType +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.Test + +/** Comprehensive test suite for [S3DataLakeSuperTypeFinder]. */ +class S3DataLakeSuperTypeFinderTest { + + private val superTypeFinder = S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator()) + + @Test + fun testIdenticalPrimitiveTypes() { + val intType = Types.IntegerType.get() + val result = superTypeFinder.findSuperType(intType, intType, "column_name") + + // They are "equal" => expect the existing type to be returned + assertThat(result).isSameAs(intType) + } + + @Test + fun testIdenticalTimestampTypesWithZone() { + val tsWithZone = TimestampType.withZone() + val result = superTypeFinder.findSuperType(tsWithZone, tsWithZone, "column_name") + + assertThat(result).isSameAs(tsWithZone) + } + + @Test + fun testDifferentTimestampZoneThrows() { + val tsWithZone = TimestampType.withZone() + val tsWithoutZone = TimestampType.withoutZone() + + assertThatThrownBy { + superTypeFinder.findSuperType(tsWithZone, tsWithoutZone, "column_name") + } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining("Conversion for column \"column_name\" between") + } + + @Test + fun testIntToLongPromotion() { + val intType = Types.IntegerType.get() + val longType = Types.LongType.get() + + val result = superTypeFinder.findSuperType(intType, longType, "column_name") + // INT -> LONG => LONG is the supertype + assertThat(result.typeId()).isEqualTo(LONG) + } + + @Test + fun testFloatToDoublePromotion() { + val floatType = Types.FloatType.get() + val doubleType = Types.DoubleType.get() + + val result = superTypeFinder.findSuperType(floatType, doubleType, "column_name") + assertThat(result.typeId()).isEqualTo(DOUBLE) + } + + @Test + fun testIntToDoubleIsNotAllowed() { + val intType = Types.IntegerType.get() + val doubleType = Types.DoubleType.get() + + // By default, TypeUtil.isPromotionAllowed(int, double) returns false + assertThatThrownBy { superTypeFinder.findSuperType(intType, doubleType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and double is not allowed." + ) + } + + @Test + fun testPrimitiveToNonPrimitiveThrows() { + val intType = Types.IntegerType.get() + val structType = + Types.StructType.of( + Types.NestedField.optional(1, "field", Types.StringType.get()), + ) + + // Attempting to combine int (primitive) with struct (non-primitive) => error + assertThatThrownBy { superTypeFinder.findSuperType(intType, structType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and struct<1: field: optional string> is not allowed." + ) + } + + @Test + fun testNonPrimitiveToPrimitiveThrows() { + val structType = + Types.StructType.of( + Types.NestedField.optional(1, "field", Types.StringType.get()), + ) + val intType = Types.IntegerType.get() + + assertThatThrownBy { superTypeFinder.findSuperType(structType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between struct<1: field: optional string> and int is not allowed." + ) + } + + @Test + fun testBinaryIsUnsupported() { + val binaryType = Types.BinaryType.get() + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => BINARY is not supported + assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between binary and int is not allowed." + ) + } + + @Test + fun testDecimalIsUnsupported() { + val decimalType = Types.DecimalType.of(10, 2) + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => DECIMAL is not supported + assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between decimal(10, 2) and int is not allowed." + ) + } + + @Test + fun testFixedIsUnsupported() { + val fixedType = Types.FixedType.ofLength(16) + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => FIXED is not supported + assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between fixed[16] and int is not allowed." + ) + } + + @Test + fun testUUIDIsUnsupported() { + val uuidType = Types.UUIDType.get() + val intType = Types.IntegerType.get() + + // Fails in validateTypeIds => UUID is not supported + assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between uuid and int is not allowed." + ) + } + + @Test + fun testTimestampNanoIsUnsupported() { + // For illustration, let's assume TypeID.TIMESTAMP_NANO is an unsupported extension. + // This is a hypothetical scenario in some Iceberg versions. + // We'll fake it with reflection or a custom type (if needed). + // Alternatively, just a conceptual test that TIMESTAMP_NANO is not allowed. + + // We'll mimic that with a custom type object for demonstration: + val nanoTimestamp = + object : Type.PrimitiveType() { + override fun typeId() = TIMESTAMP_NANO + override fun isPrimitiveType() = true + } + val normalTimestamp = TimestampType.withoutZone() + + assertThatThrownBy { + superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name") + } + .isInstanceOf(IllegalArgumentException::class.java) + } + + @Test + fun testPromotionIsNotAllowedByIceberg() { + // Suppose the user tries to do INT -> FLOAT + val intType = Types.IntegerType.get() + val floatType = Types.FloatType.get() + + // By default, TypeUtil.isPromotionAllowed(int, float) is false + assertThatThrownBy { superTypeFinder.findSuperType(intType, floatType, "column_name") } + .isInstanceOf(IllegalArgumentException::class.java) + .hasMessageContaining( + "Conversion for column \"column_name\" between int and float is not allowed." + ) + } +} diff --git a/docs/integrations/destinations/s3-data-lake.md b/docs/integrations/destinations/s3-data-lake.md index fbcb3994eddb..449cf00baf6b 100644 --- a/docs/integrations/destinations/s3-data-lake.md +++ b/docs/integrations/destinations/s3-data-lake.md @@ -15,14 +15,15 @@ for more information.
Expand to review -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:------------------------------------------------------------|:---------------------------------------------| -| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth | -| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow | -| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator | -| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 | -| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 | -| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) | -| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------| +| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types | +| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth | +| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow | +| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator | +| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 | +| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 | +| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) | +| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |