-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement logic to determine super types between iceberg types #50412
Open
subodh1810
wants to merge
6
commits into
iceberg-schema-comparator
Choose a base branch
from
iceberg-schema-supertype-finder
base: iceberg-schema-comparator
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+338
−0
Open
Changes from 4 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
08b3197
feat: implement logic to determine super types between iceberg types
subodh1810 4249fac
format
subodh1810 0ecd07b
Merge branch 'iceberg-schema-comparator' into iceberg-schema-supertyp…
subodh1810 318ed6c
Merge branch 'iceberg-schema-comparator' into iceberg-schema-supertyp…
subodh1810 a749668
Merge branch 'iceberg-schema-comparator' into iceberg-schema-supertyp…
subodh1810 3b39576
address review comments
subodh1810 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
137 changes: 137 additions & 0 deletions
137
.../src/main/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinder.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.iceberg.v2 | ||
|
||
import jakarta.inject.Singleton | ||
import org.apache.iceberg.types.Type | ||
import org.apache.iceberg.types.Type.PrimitiveType | ||
import org.apache.iceberg.types.Type.TypeID | ||
import org.apache.iceberg.types.Type.TypeID.* | ||
import org.apache.iceberg.types.TypeUtil | ||
import org.apache.iceberg.types.Types.* | ||
|
||
/** | ||
* A utility class that determines a "supertype" given two Iceberg [Type]s. | ||
* | ||
* The "supertype" is a type to which both input types can safely be promoted without data loss. For | ||
* instance, INT can be promoted to LONG, FLOAT can be promoted to DOUBLE, etc. | ||
* | ||
* @property icebergTypesComparator comparator used to verify deep type equality. | ||
*/ | ||
@Singleton | ||
class IcebergSuperTypeFinder(private val icebergTypesComparator: IcebergTypesComparator) { | ||
|
||
/** | ||
* Returns a supertype for [existingType] and [incomingType] if one exists. | ||
* - If they are deeply equal (according to [IcebergTypesComparator.typesAreEqual]), returns the | ||
* [existingType] as-is. | ||
* - Otherwise, attempts to combine them into a valid supertype. | ||
* - Throws [IllegalArgumentException] if no valid supertype can be found. | ||
*/ | ||
fun findSuperType(existingType: Type, incomingType: Type, columnName: String): Type { | ||
// If the two types are already deeply equal, return one of them (arbitrary). | ||
if (icebergTypesComparator.typesAreEqual(incomingType, existingType)) { | ||
return existingType | ||
} | ||
// Otherwise, attempt to combine them into a valid supertype. | ||
return combineTypes(existingType, incomingType, columnName) | ||
} | ||
|
||
/** | ||
* Combines two top-level [Type]s. If exactly one is primitive and the other is non-primitive, | ||
* no supertype is possible => throws [IllegalArgumentException]. | ||
*/ | ||
private fun combineTypes(existingType: Type, incomingType: Type, columnName: String): Type { | ||
if (existingType.isPrimitiveType != incomingType.isPrimitiveType) { | ||
throwIllegalTypeCombination(existingType, incomingType, columnName) | ||
} | ||
|
||
// Both are primitive | ||
if (existingType.isPrimitiveType && incomingType.isPrimitiveType) { | ||
return combinePrimitives( | ||
existingType.asPrimitiveType(), | ||
incomingType.asPrimitiveType(), | ||
columnName | ||
) | ||
} | ||
|
||
// Both are non-primitive => not currently supported | ||
throwIllegalTypeCombination(existingType, incomingType, columnName) | ||
} | ||
|
||
/** | ||
* Checks whether either type is unsupported or unmapped (e.g. BINARY, DECIMAL, FIXED, etc.). | ||
* | ||
* @throws IllegalArgumentException if either type is unsupported. | ||
*/ | ||
private fun validateTypeIds(typeId1: TypeID, typeId2: TypeID) { | ||
val unsupportedTypeIds = setOf(BINARY, DECIMAL, FIXED, UUID, MAP, TIMESTAMP_NANO) | ||
if (typeId1 in unsupportedTypeIds || typeId2 in unsupportedTypeIds) { | ||
val badTypeId = if (typeId1 in unsupportedTypeIds) typeId1 else typeId2 | ||
throw IllegalArgumentException( | ||
"Unsupported or unmapped Iceberg type: $badTypeId. Please implement handling if needed." | ||
) | ||
} | ||
} | ||
|
||
/** | ||
* Attempts to combine two [PrimitiveType]s into a valid supertype by using | ||
* [TypeUtil.isPromotionAllowed]. | ||
* | ||
* - If they have the same [TypeID], just returns the existing type (since they’re not deeply | ||
* equal, but the top-level ID is the same. You may want to consider e.g. TIMESTAMP with/without | ||
* UTC). | ||
* - If they have different IDs, tries known promotions (INT->LONG, FLOAT->DOUBLE). | ||
* - If promotion is not allowed, throws [IllegalArgumentException]. | ||
*/ | ||
private fun combinePrimitives( | ||
existingType: PrimitiveType, | ||
incomingType: PrimitiveType, | ||
columnName: String | ||
): Type { | ||
val existingTypeId = existingType.typeId() | ||
val incomingTypeId = incomingType.typeId() | ||
|
||
validateTypeIds(existingTypeId, incomingTypeId) | ||
|
||
// If promotion is not allowed by Iceberg, fail fast. | ||
if (!TypeUtil.isPromotionAllowed(existingType, incomingType)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be happening even before There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
throwIllegalTypeCombination(existingType, incomingType, columnName) | ||
} | ||
|
||
// If both are the same type ID, we just use the existing type | ||
if (existingTypeId == incomingTypeId) { | ||
// For timestamps, you'd want to reconcile UTC. This is simplified here. | ||
return existingType | ||
} | ||
|
||
// Otherwise, we attempt known promotions | ||
return when (existingTypeId) { | ||
INTEGER -> | ||
when (incomingTypeId) { | ||
LONG -> LongType.get() | ||
else -> throwIllegalTypeCombination(existingType, incomingType, columnName) | ||
} | ||
FLOAT -> | ||
when (incomingTypeId) { | ||
DOUBLE -> DoubleType.get() | ||
else -> throwIllegalTypeCombination(existingType, incomingType, columnName) | ||
} | ||
else -> throwIllegalTypeCombination(existingType, incomingType, columnName) | ||
} | ||
} | ||
|
||
/** | ||
* Helper function to throw a standardized [IllegalArgumentException] for invalid type combos. | ||
*/ | ||
private fun throwIllegalTypeCombination( | ||
existingType: Type, | ||
incomingType: Type, | ||
columnName: String | ||
): Nothing = | ||
throw IllegalArgumentException( | ||
"Conversion for column \"$columnName\" between $existingType and $incomingType is not allowed." | ||
) | ||
} |
193 changes: 193 additions & 0 deletions
193
.../test/kotlin/io/airbyte/integrations/destination/iceberg/v2/IcebergSuperTypeFinderTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
/* | ||
* Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.iceberg.v2 | ||
|
||
import org.apache.iceberg.types.Type | ||
import org.apache.iceberg.types.Type.TypeID.DOUBLE | ||
import org.apache.iceberg.types.Type.TypeID.LONG | ||
import org.apache.iceberg.types.Type.TypeID.TIMESTAMP_NANO | ||
import org.apache.iceberg.types.Types | ||
import org.apache.iceberg.types.Types.TimestampType | ||
import org.assertj.core.api.Assertions.assertThat | ||
import org.assertj.core.api.Assertions.assertThatThrownBy | ||
import org.junit.jupiter.api.Test | ||
|
||
/** Comprehensive test suite for [IcebergSuperTypeFinder]. */ | ||
class IcebergSuperTypeFinderTest { | ||
|
||
private val superTypeFinder = IcebergSuperTypeFinder(IcebergTypesComparator()) | ||
|
||
@Test | ||
fun testIdenticalPrimitiveTypes() { | ||
val intType = Types.IntegerType.get() | ||
val result = superTypeFinder.findSuperType(intType, intType, "column_name") | ||
|
||
// They are "equal" => expect the existing type to be returned | ||
assertThat(result).isSameAs(intType) | ||
} | ||
|
||
@Test | ||
fun testIdenticalTimestampTypesWithZone() { | ||
val tsWithZone = TimestampType.withZone() | ||
val result = superTypeFinder.findSuperType(tsWithZone, tsWithZone, "column_name") | ||
|
||
assertThat(result).isSameAs(tsWithZone) | ||
} | ||
|
||
@Test | ||
fun testDifferentTimestampZoneThrows() { | ||
val tsWithZone = TimestampType.withZone() | ||
val tsWithoutZone = TimestampType.withoutZone() | ||
|
||
assertThatThrownBy { | ||
superTypeFinder.findSuperType(tsWithZone, tsWithoutZone, "column_name") | ||
} | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining("Conversion for column \"column_name\" between") | ||
} | ||
|
||
@Test | ||
fun testIntToLongPromotion() { | ||
val intType = Types.IntegerType.get() | ||
val longType = Types.LongType.get() | ||
|
||
val result = superTypeFinder.findSuperType(intType, longType, "column_name") | ||
// INT -> LONG => LONG is the supertype | ||
assertThat(result.typeId()).isEqualTo(LONG) | ||
} | ||
|
||
@Test | ||
fun testFloatToDoublePromotion() { | ||
val floatType = Types.FloatType.get() | ||
val doubleType = Types.DoubleType.get() | ||
|
||
val result = superTypeFinder.findSuperType(floatType, doubleType, "column_name") | ||
assertThat(result.typeId()).isEqualTo(DOUBLE) | ||
} | ||
|
||
@Test | ||
fun testIntToDoubleIsNotAllowed() { | ||
val intType = Types.IntegerType.get() | ||
val doubleType = Types.DoubleType.get() | ||
|
||
// By default, TypeUtil.isPromotionAllowed(int, double) returns false | ||
assertThatThrownBy { superTypeFinder.findSuperType(intType, doubleType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining( | ||
"Conversion for column \"column_name\" between int and double is not allowed." | ||
) | ||
} | ||
|
||
@Test | ||
fun testPrimitiveToNonPrimitiveThrows() { | ||
val intType = Types.IntegerType.get() | ||
val structType = | ||
Types.StructType.of( | ||
Types.NestedField.optional(1, "field", Types.StringType.get()), | ||
) | ||
|
||
// Attempting to combine int (primitive) with struct (non-primitive) => error | ||
assertThatThrownBy { superTypeFinder.findSuperType(intType, structType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining( | ||
"Conversion for column \"column_name\" between int and struct<1: field: optional string> is not allowed." | ||
) | ||
} | ||
|
||
@Test | ||
fun testNonPrimitiveToPrimitiveThrows() { | ||
val structType = | ||
Types.StructType.of( | ||
Types.NestedField.optional(1, "field", Types.StringType.get()), | ||
) | ||
val intType = Types.IntegerType.get() | ||
|
||
assertThatThrownBy { superTypeFinder.findSuperType(structType, intType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining( | ||
"Conversion for column \"column_name\" between struct<1: field: optional string> and int is not allowed." | ||
) | ||
} | ||
|
||
@Test | ||
fun testBinaryIsUnsupported() { | ||
val binaryType = Types.BinaryType.get() | ||
val intType = Types.IntegerType.get() | ||
|
||
// Fails in validateTypeIds => BINARY is not supported | ||
assertThatThrownBy { superTypeFinder.findSuperType(binaryType, intType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining("Unsupported or unmapped Iceberg type: BINARY") | ||
} | ||
|
||
@Test | ||
fun testDecimalIsUnsupported() { | ||
val decimalType = Types.DecimalType.of(10, 2) | ||
val intType = Types.IntegerType.get() | ||
|
||
// Fails in validateTypeIds => DECIMAL is not supported | ||
assertThatThrownBy { superTypeFinder.findSuperType(decimalType, intType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining("Unsupported or unmapped Iceberg type: DECIMAL") | ||
} | ||
|
||
@Test | ||
fun testFixedIsUnsupported() { | ||
val fixedType = Types.FixedType.ofLength(16) | ||
val intType = Types.IntegerType.get() | ||
|
||
// Fails in validateTypeIds => FIXED is not supported | ||
assertThatThrownBy { superTypeFinder.findSuperType(fixedType, intType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining("Unsupported or unmapped Iceberg type: FIXED") | ||
} | ||
|
||
@Test | ||
fun testUUIDIsUnsupported() { | ||
val uuidType = Types.UUIDType.get() | ||
val intType = Types.IntegerType.get() | ||
|
||
// Fails in validateTypeIds => UUID is not supported | ||
assertThatThrownBy { superTypeFinder.findSuperType(uuidType, intType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining("Unsupported or unmapped Iceberg type: UUID") | ||
} | ||
|
||
@Test | ||
fun testTimestampNanoIsUnsupported() { | ||
// For illustration, let's assume TypeID.TIMESTAMP_NANO is an unsupported extension. | ||
// This is a hypothetical scenario in some Iceberg versions. | ||
// We'll fake it with reflection or a custom type (if needed). | ||
// Alternatively, just a conceptual test that TIMESTAMP_NANO is not allowed. | ||
|
||
// We'll mimic that with a custom type object for demonstration: | ||
val nanoTimestamp = | ||
object : Type.PrimitiveType() { | ||
override fun typeId() = TIMESTAMP_NANO | ||
override fun isPrimitiveType() = true | ||
} | ||
val normalTimestamp = TimestampType.withoutZone() | ||
|
||
assertThatThrownBy { | ||
superTypeFinder.findSuperType(nanoTimestamp, normalTimestamp, "column_name") | ||
} | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining("Unsupported or unmapped Iceberg type: TIMESTAMP_NANO") | ||
} | ||
|
||
@Test | ||
fun testPromotionIsNotAllowedByIceberg() { | ||
// Suppose the user tries to do INT -> FLOAT | ||
val intType = Types.IntegerType.get() | ||
val floatType = Types.FloatType.get() | ||
|
||
// By default, TypeUtil.isPromotionAllowed(int, float) is false | ||
assertThatThrownBy { superTypeFinder.findSuperType(intType, floatType, "column_name") } | ||
.isInstanceOf(IllegalArgumentException::class.java) | ||
.hasMessageContaining( | ||
"Conversion for column \"column_name\" between int and float is not allowed." | ||
) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this slightly easier to read:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done