Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement logic to determine super types between iceberg types #50412

Merged
merged 14 commits into from
Jan 14, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.12
dockerImageTag: 0.2.13
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake

import jakarta.inject.Singleton
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.PrimitiveType
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Type.TypeID.*
import org.apache.iceberg.types.TypeUtil
import org.apache.iceberg.types.Types.*

/**
* A utility class that determines a "supertype" given two Iceberg [Type]s.
*
* The "supertype" is a type to which both input types can safely be promoted without data loss. For
* instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc.
*
* @property S3DataLakeTypesComparator comparator used to verify deep type equality.
*/
@Singleton
class S3DataLakeSuperTypeFinder(private val s3DataLakeTypesComparator: S3DataLakeTypesComparator) {
private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO)

/**
* Returns a supertype for [existingType] and [incomingType] if one exists.
* - If they are deeply equal (according to [S3DataLakeTypesComparator.typesAreEqual]), returns
* the [existingType] as-is.
* - Otherwise, attempts to combine them into a valid supertype.
* - Throws [IllegalArgumentException] if no valid supertype can be found.
*/
fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type {
// If the two types are already deeply equal, return one of them (arbitrary).
if (s3DataLakeTypesComparator.typesAreEqual(incomingType, existingType)) {
return existingType
}
// Otherwise, attempt to combine them into a valid supertype.
return combineTypes(existingType, incomingType, columnName)
}

/**
* Combines two top-level [Type]s. If exactly one is primitive and the other is non-primitive,
* no supertype is possible => throws [IllegalArgumentException].
*/
private fun combineTypes(existingType: Type, incomingType: Type, columnName: String): Type {
if (existingType.isPrimitiveType != incomingType.isPrimitiveType) {
throwIllegalTypeCombination(existingType, incomingType, columnName)
}

// Both are primitive
if (existingType.isPrimitiveType && incomingType.isPrimitiveType) {
return combinePrimitives(
existingType.asPrimitiveType(),
incomingType.asPrimitiveType(),
columnName
)
}

// Both are non-primitive => not currently supported
throwIllegalTypeCombination(existingType, incomingType, columnName)
}

/**
* Checks whether either type is unsupported or unmapped (e.g. BINARY, DECIMAL, FIXED, etc.).
*
* @throws IllegalArgumentException if either type is unsupported.
*/
private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) {
val providedTypes = listOf(typeId1, typeId2)
val foundUnsupported = providedTypes.filter { it in unsupportedTypeIds }

if (foundUnsupported.isNotEmpty()) {
throw IllegalArgumentException(
"Unsupported or unmapped Iceberg type(s): ${foundUnsupported.joinToString()}. Please implement handling if needed."
)
}
}

/**
* Attempts to combine two [PrimitiveType]s into a valid supertype by using
* [TypeUtil.isPromotionAllowed].
*
* - If they have the same [TypeID], just returns the existing type (since they’re not deeply
* equal, but the top-level ID is the same. You may want to consider e.g. TIMESTAMP with/without
* UTC).
* - If they have different IDs, tries known promotions (INT->LONG, FLOAT->DOUBLE).
* - If promotion is not allowed, throws [IllegalArgumentException].
*/
private fun combinePrimitives(
existingType: PrimitiveType,
incomingType: PrimitiveType,
columnName: String
): Type {
val existingTypeId = existingType.typeId()
val incomingTypeId = incomingType.typeId()
// If promotion is not allowed by Iceberg, fail fast.
if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) {
throwIllegalTypeCombination(existingType, incomingType, columnName)
}

validateTypeIds(existingTypeId, incomingTypeId)

// If both are the same type ID, we just use the existing type
if (existingTypeId == incomingTypeId) {
// For timestamps, you'd want to reconcile UTC. This is simplified here.
return existingType
}

// Otherwise, we attempt known promotions
return when (existingTypeId) {
INTEGER ->
when (incomingTypeId) {
LONG -> LongType.get()
else -> throwIllegalTypeCombination(existingType, incomingType, columnName)
}
FLOAT ->
when (incomingTypeId) {
DOUBLE -> DoubleType.get()
else -> throwIllegalTypeCombination(existingType, incomingType, columnName)
}
else -> throwIllegalTypeCombination(existingType, incomingType, columnName)
}
}

/**
* Helper function to throw a standardized [IllegalArgumentException] for invalid type combos.
*/
private fun throwIllegalTypeCombination(
existingType: Type,
incomingType: Type,
columnName: String
): Nothing =
throw IllegalArgumentException(
"Conversion for column \"$columnName\" between $existingType and $incomingType is not allowed."
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake

import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID.DOUBLE
import org.apache.iceberg.types.Type.TypeID.LONG
import org.apache.iceberg.types.Type.TypeID.TIMESTAMP_NANO
import org.apache.iceberg.types.Types
import org.apache.iceberg.types.Types.TimestampType
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test

/** Comprehensive test suite for [S3DataLakeSuperTypeFinder]. */
class S3DataLakeSuperTypeFinderTest {

private val superTypeFinder = S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator())

@Test
fun testIdenticalPrimitiveTypes() {
val intType = Types.IntegerType.get()
val result = superTypeFinder.findSuperType(intType, intType, "column_name")

// They are "equal" => expect the existing type to be returned
assertThat(result).isSameAs(intType)
}

@Test
fun testIdenticalTimestampTypesWithZone() {
val tsWithZone = TimestampType.withZone()
val result = superTypeFinder.findSuperType(tsWithZone, tsWithZone, "column_name")

assertThat(result).isSameAs(tsWithZone)
}

@Test
fun testDifferentTimestampZoneThrows() {
val tsWithZone = TimestampType.withZone()
val tsWithoutZone = TimestampType.withoutZone()

assertThatThrownBy {
superTypeFinder.findSuperType(tsWithZone, tsWithoutZone, "column_name")
}
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("Conversion for column \"column_name\" between")
}

@Test
fun testIntToLongPromotion() {
val intType = Types.IntegerType.get()
val longType = Types.LongType.get()

val result = superTypeFinder.findSuperType(intType, longType, "column_name")
// INT -> LONG => LONG is the supertype
assertThat(result.typeId()).isEqualTo(LONG)
}

@Test
fun testFloatToDoublePromotion() {
val floatType = Types.FloatType.get()
val doubleType = Types.DoubleType.get()

val result = superTypeFinder.findSuperType(floatType, doubleType, "column_name")
assertThat(result.typeId()).isEqualTo(DOUBLE)
}

@Test
fun testIntToDoubleIsNotAllowed() {
val intType = Types.IntegerType.get()
val doubleType = Types.DoubleType.get()

// By default, TypeUtil.isPromotionAllowed(int, double) returns false
assertThatThrownBy { superTypeFinder.findSuperType(intType, doubleType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between int and double is not allowed."
)
}

@Test
fun testPrimitiveToNonPrimitiveThrows() {
val intType = Types.IntegerType.get()
val structType =
Types.StructType.of(
Types.NestedField.optional(1, "field", Types.StringType.get()),
)

// Attempting to combine int (primitive) with struct (non-primitive) => error
assertThatThrownBy { superTypeFinder.findSuperType(intType, structType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between int and struct<1: field: optional string> is not allowed."
)
}

@Test
fun testNonPrimitiveToPrimitiveThrows() {
val structType =
Types.StructType.of(
Types.NestedField.optional(1, "field", Types.StringType.get()),
)
val intType = Types.IntegerType.get()

assertThatThrownBy { superTypeFinder.findSuperType(structType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between struct<1: field: optional string> and int is not allowed."
)
}

@Test
fun testBinaryIsUnsupported() {
val binaryType = Types.BinaryType.get()
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => BINARY is not supported
assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between binary and int is not allowed."
)
}

@Test
fun testDecimalIsUnsupported() {
val decimalType = Types.DecimalType.of(10, 2)
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => DECIMAL is not supported
assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between decimal(10, 2) and int is not allowed."
)
}

@Test
fun testFixedIsUnsupported() {
val fixedType = Types.FixedType.ofLength(16)
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => FIXED is not supported
assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between fixed[16] and int is not allowed."
)
}

@Test
fun testUUIDIsUnsupported() {
val uuidType = Types.UUIDType.get()
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => UUID is not supported
assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between uuid and int is not allowed."
)
}

@Test
fun testTimestampNanoIsUnsupported() {
// For illustration, let's assume TypeID.TIMESTAMP_NANO is an unsupported extension.
// This is a hypothetical scenario in some Iceberg versions.
// We'll fake it with reflection or a custom type (if needed).
// Alternatively, just a conceptual test that TIMESTAMP_NANO is not allowed.

// We'll mimic that with a custom type object for demonstration:
val nanoTimestamp =
object : Type.PrimitiveType() {
override fun typeId() = TIMESTAMP_NANO
override fun isPrimitiveType() = true
}
val normalTimestamp = TimestampType.withoutZone()

assertThatThrownBy {
superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name")
}
.isInstanceOf(IllegalArgumentException::class.java)
}

@Test
fun testPromotionIsNotAllowedByIceberg() {
// Suppose the user tries to do INT -> FLOAT
val intType = Types.IntegerType.get()
val floatType = Types.FloatType.get()

// By default, TypeUtil.isPromotionAllowed(int, float) is false
assertThatThrownBy { superTypeFinder.findSuperType(intType, floatType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between int and float is not allowed."
)
}
}
19 changes: 10 additions & 9 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ for more information.
<details>
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:---------------------------------------------|
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------|
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |

</details>
Loading