Skip to content

Commit

Permalink
feat: implement logic to determine super types between iceberg types (#…
Browse files Browse the repository at this point in the history
…50412)

Co-authored-by: Octavia Squidington III <[email protected]>
Co-authored-by: Francis Genet <[email protected]>
  • Loading branch information
3 people authored Jan 14, 2025
1 parent ac9033c commit d5be4b7
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.12
dockerImageTag: 0.2.13
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake

import jakarta.inject.Singleton
import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.PrimitiveType
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Type.TypeID.*
import org.apache.iceberg.types.TypeUtil
import org.apache.iceberg.types.Types.*

/**
* A utility class that determines a "supertype" given two Iceberg [Type]s.
*
* The "supertype" is a type to which both input types can safely be promoted without data loss. For
* instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc.
*
* @property S3DataLakeTypesComparator comparator used to verify deep type equality.
*/
@Singleton
class S3DataLakeSuperTypeFinder(private val s3DataLakeTypesComparator: S3DataLakeTypesComparator) {
private val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO)

/**
* Returns a supertype for [existingType] and [incomingType] if one exists.
* - If they are deeply equal (according to [S3DataLakeTypesComparator.typesAreEqual]), returns
* the [existingType] as-is.
* - Otherwise, attempts to combine them into a valid supertype.
* - Throws [IllegalArgumentException] if no valid supertype can be found.
*/
fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type {
// If the two types are already deeply equal, return one of them (arbitrary).
if (s3DataLakeTypesComparator.typesAreEqual(incomingType, existingType)) {
return existingType
}
// Otherwise, attempt to combine them into a valid supertype.
return combineTypes(existingType, incomingType, columnName)
}

/**
* Combines two top-level [Type]s. If exactly one is primitive and the other is non-primitive,
* no supertype is possible => throws [IllegalArgumentException].
*/
private fun combineTypes(existingType: Type, incomingType: Type, columnName: String): Type {
if (existingType.isPrimitiveType != incomingType.isPrimitiveType) {
throwIllegalTypeCombination(existingType, incomingType, columnName)
}

// Both are primitive
if (existingType.isPrimitiveType && incomingType.isPrimitiveType) {
return combinePrimitives(
existingType.asPrimitiveType(),
incomingType.asPrimitiveType(),
columnName
)
}

// Both are non-primitive => not currently supported
throwIllegalTypeCombination(existingType, incomingType, columnName)
}

/**
* Checks whether either type is unsupported or unmapped (e.g. BINARY, DECIMAL, FIXED, etc.).
*
* @throws IllegalArgumentException if either type is unsupported.
*/
private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) {
val providedTypes = listOf(typeId1, typeId2)
val foundUnsupported = providedTypes.filter { it in unsupportedTypeIds }

if (foundUnsupported.isNotEmpty()) {
throw IllegalArgumentException(
"Unsupported or unmapped Iceberg type(s): ${foundUnsupported.joinToString()}. Please implement handling if needed."
)
}
}

/**
* Attempts to combine two [PrimitiveType]s into a valid supertype by using
* [TypeUtil.isPromotionAllowed].
*
* - If they have the same [TypeID], just returns the existing type (since they’re not deeply
* equal, but the top-level ID is the same. You may want to consider e.g. TIMESTAMP with/without
* UTC).
* - If they have different IDs, tries known promotions (INT->LONG, FLOAT->DOUBLE).
* - If promotion is not allowed, throws [IllegalArgumentException].
*/
private fun combinePrimitives(
existingType: PrimitiveType,
incomingType: PrimitiveType,
columnName: String
): Type {
val existingTypeId = existingType.typeId()
val incomingTypeId = incomingType.typeId()
// If promotion is not allowed by Iceberg, fail fast.
if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) {
throwIllegalTypeCombination(existingType, incomingType, columnName)
}

validateTypeIds(existingTypeId, incomingTypeId)

// If both are the same type ID, we just use the existing type
if (existingTypeId == incomingTypeId) {
// For timestamps, you'd want to reconcile UTC. This is simplified here.
return existingType
}

// Otherwise, we attempt known promotions
return when (existingTypeId) {
INTEGER ->
when (incomingTypeId) {
LONG -> LongType.get()
else -> throwIllegalTypeCombination(existingType, incomingType, columnName)
}
FLOAT ->
when (incomingTypeId) {
DOUBLE -> DoubleType.get()
else -> throwIllegalTypeCombination(existingType, incomingType, columnName)
}
else -> throwIllegalTypeCombination(existingType, incomingType, columnName)
}
}

/**
* Helper function to throw a standardized [IllegalArgumentException] for invalid type combos.
*/
private fun throwIllegalTypeCombination(
existingType: Type,
incomingType: Type,
columnName: String
): Nothing =
throw IllegalArgumentException(
"Conversion for column \"$columnName\" between $existingType and $incomingType is not allowed."
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3_data_lake

import org.apache.iceberg.types.Type
import org.apache.iceberg.types.Type.TypeID.DOUBLE
import org.apache.iceberg.types.Type.TypeID.LONG
import org.apache.iceberg.types.Type.TypeID.TIMESTAMP_NANO
import org.apache.iceberg.types.Types
import org.apache.iceberg.types.Types.TimestampType
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test

/** Comprehensive test suite for [S3DataLakeSuperTypeFinder]. */
class S3DataLakeSuperTypeFinderTest {

private val superTypeFinder = S3DataLakeSuperTypeFinder(S3DataLakeTypesComparator())

@Test
fun testIdenticalPrimitiveTypes() {
val intType = Types.IntegerType.get()
val result = superTypeFinder.findSuperType(intType, intType, "column_name")

// They are "equal" => expect the existing type to be returned
assertThat(result).isSameAs(intType)
}

@Test
fun testIdenticalTimestampTypesWithZone() {
val tsWithZone = TimestampType.withZone()
val result = superTypeFinder.findSuperType(tsWithZone, tsWithZone, "column_name")

assertThat(result).isSameAs(tsWithZone)
}

@Test
fun testDifferentTimestampZoneThrows() {
val tsWithZone = TimestampType.withZone()
val tsWithoutZone = TimestampType.withoutZone()

assertThatThrownBy {
superTypeFinder.findSuperType(tsWithZone, tsWithoutZone, "column_name")
}
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining("Conversion for column \"column_name\" between")
}

@Test
fun testIntToLongPromotion() {
val intType = Types.IntegerType.get()
val longType = Types.LongType.get()

val result = superTypeFinder.findSuperType(intType, longType, "column_name")
// INT -> LONG => LONG is the supertype
assertThat(result.typeId()).isEqualTo(LONG)
}

@Test
fun testFloatToDoublePromotion() {
val floatType = Types.FloatType.get()
val doubleType = Types.DoubleType.get()

val result = superTypeFinder.findSuperType(floatType, doubleType, "column_name")
assertThat(result.typeId()).isEqualTo(DOUBLE)
}

@Test
fun testIntToDoubleIsNotAllowed() {
val intType = Types.IntegerType.get()
val doubleType = Types.DoubleType.get()

// By default, TypeUtil.isPromotionAllowed(int, double) returns false
assertThatThrownBy { superTypeFinder.findSuperType(intType, doubleType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between int and double is not allowed."
)
}

@Test
fun testPrimitiveToNonPrimitiveThrows() {
val intType = Types.IntegerType.get()
val structType =
Types.StructType.of(
Types.NestedField.optional(1, "field", Types.StringType.get()),
)

// Attempting to combine int (primitive) with struct (non-primitive) => error
assertThatThrownBy { superTypeFinder.findSuperType(intType, structType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between int and struct<1: field: optional string> is not allowed."
)
}

@Test
fun testNonPrimitiveToPrimitiveThrows() {
val structType =
Types.StructType.of(
Types.NestedField.optional(1, "field", Types.StringType.get()),
)
val intType = Types.IntegerType.get()

assertThatThrownBy { superTypeFinder.findSuperType(structType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between struct<1: field: optional string> and int is not allowed."
)
}

@Test
fun testBinaryIsUnsupported() {
val binaryType = Types.BinaryType.get()
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => BINARY is not supported
assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between binary and int is not allowed."
)
}

@Test
fun testDecimalIsUnsupported() {
val decimalType = Types.DecimalType.of(10, 2)
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => DECIMAL is not supported
assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between decimal(10, 2) and int is not allowed."
)
}

@Test
fun testFixedIsUnsupported() {
val fixedType = Types.FixedType.ofLength(16)
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => FIXED is not supported
assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between fixed[16] and int is not allowed."
)
}

@Test
fun testUUIDIsUnsupported() {
val uuidType = Types.UUIDType.get()
val intType = Types.IntegerType.get()

// Fails in validateTypeIds => UUID is not supported
assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between uuid and int is not allowed."
)
}

@Test
fun testTimestampNanoIsUnsupported() {
// For illustration, let's assume TypeID.TIMESTAMP_NANO is an unsupported extension.
// This is a hypothetical scenario in some Iceberg versions.
// We'll fake it with reflection or a custom type (if needed).
// Alternatively, just a conceptual test that TIMESTAMP_NANO is not allowed.

// We'll mimic that with a custom type object for demonstration:
val nanoTimestamp =
object : Type.PrimitiveType() {
override fun typeId() = TIMESTAMP_NANO
override fun isPrimitiveType() = true
}
val normalTimestamp = TimestampType.withoutZone()

assertThatThrownBy {
superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name")
}
.isInstanceOf(IllegalArgumentException::class.java)
}

@Test
fun testPromotionIsNotAllowedByIceberg() {
// Suppose the user tries to do INT -> FLOAT
val intType = Types.IntegerType.get()
val floatType = Types.FloatType.get()

// By default, TypeUtil.isPromotionAllowed(int, float) is false
assertThatThrownBy { superTypeFinder.findSuperType(intType, floatType, "column_name") }
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessageContaining(
"Conversion for column \"column_name\" between int and float is not allowed."
)
}
}
19 changes: 10 additions & 9 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ for more information.
<details>
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:---------------------------------------------|
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------|
| 0.2.13 | 2025-01-14 | [\#50412](https://github.com/airbytehq/airbyte/pull/50412) | Implement logic to determine super types between iceberg types |
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
| 0.2.8 | 2025-01-09 | [\#51012](https://github.com/airbytehq/airbyte/pull/51012) | Rename/Cleanup package from Iceberg V2 |
| 0.2.7 | 2025-01-09 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |

</details>

0 comments on commit d5be4b7

Please sign in to comment.