Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor iceberg to delta partition value conversion #2715

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, SerializableFileStatus}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{DateFormatter, TimestampFormatter}
import org.apache.hadoop.fs.Path
import org.apache.iceberg.{PartitionData, RowLevelOperationMode, Table, TableProperties}
import org.apache.iceberg.{PartitionData, RowLevelOperationMode, StructLike, Table, TableProperties}
import org.apache.iceberg.transforms.IcebergPartitionUtil

import org.apache.spark.internal.Logging
Expand All @@ -48,6 +48,31 @@ class IcebergFileManifest(

val basePath = table.location()

val icebergSchema = table.schema()

// we must use field id to look up the partition value; consider scenario with iceberg
// behavior chance since 1.4.0:
// 1) create table with partition schema (a[col_name]: 1[field_id]), add file1;
// The partition data for file1 is (a:1:some_part_value)
// 2) add new partition col b and the partition schema becomes (a: 1, b: 2), add file2;
// the partition data for file2 is (a:1:some_part_value, b:2:some_part_value)
// 3) remove partition col a, then add file3;
// for iceberg < 1.4.0: the partFields is (a:1(void), b:2); the partition data for
// file3 is (a:1(void):null, b:2:some_part_value);
// for iceberg 1.4.0: the partFields is (b:2); When it reads file1 (a:1:some_part_value),
// it must use the field_id instead of index to look up the partition
// value, as the partField and partitionData from file1 have different
// ordering and thus same index indicates different column.
val physicalNameToField = table.spec().fields().asScala.collect {
case field if field.transform().toString != VOID_TRANSFORM =>
DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> field
}.toMap

val dateFormatter = DateFormatter()

val timestampFormatter =
TimestampFormatter(ConvertUtils.timestampPartitionPattern, java.util.TimeZone.getDefault)

override def numFiles: Long = {
if (_numFiles.isEmpty) getFileSparkResults()
_numFiles.get
Expand Down Expand Up @@ -77,30 +102,6 @@ class IcebergFileManifest(
val schemaBatchSize =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_IMPORT_BATCH_SIZE_SCHEMA_INFERENCE)

val partFields = table.spec().fields().asScala
val icebergSchema = table.schema()
// we must use field id to look up the partition value; consider scenario with iceberg
// behavior chance since 1.4.0:
// 1) create table with partition schema (a[col_name]: 1[field_id]), add file1;
// The partition data for file1 is (a:1:some_part_value)
// 2) add new partition col b and the partition schema becomes (a: 1, b: 2), add file2;
// the partition data for file2 is (a:1:some_part_value, b:2:some_part_value)
// 3) remove partition col a, then add file3;
// for iceberg < 1.4.0: the partFields is (a:1(void), b:2); the partition data for
// file3 is (a:1(void):null, b:2:some_part_value);
// for iceberg 1.4.0: the partFields is (b:2); When it reads file1 (a:1:some_part_value),
// it must use the field_id instead of index to look up the partition
// value, as the partField and partitionData from file1 have different
// ordering and thus same index indicates different column.
val physicalNameToField = partFields.collect {
case field if field.transform().toString != VOID_TRANSFORM =>
DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> field
}.toMap

val dateFormatter = DateFormatter()
val timestampFormatter = TimestampFormatter(ConvertUtils.timestampPartitionPattern,
java.util.TimeZone.getDefault)

// This flag is strongly not recommended to turn on, but we still provide a flag for regression
// purpose.
val unsafeConvertMorTable =
Expand Down Expand Up @@ -131,23 +132,9 @@ class IcebergFileManifest(
s"Please trigger an Iceberg compaction and retry the command.")
}
val partitionValues = if (spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)) {

val icebergPartition = fileScanTask.file().partition()
val icebergPartitionData = icebergPartition.asInstanceOf[PartitionData]
val fieldIdToIdx = icebergPartitionData.getPartitionType.fields().asScala.zipWithIndex
.map(kv => kv._1.fieldId() -> kv._2).toMap
val physicalNameToPartValueMap = physicalNameToField
.map { case (physicalName, field) =>
val fieldIndex = fieldIdToIdx.get(field.fieldId())
val partValueAsString = fieldIndex.map {idx =>
val partValue = icebergPartitionData.get(idx)
IcebergPartitionUtil.partitionValueToString(
field, partValue, icebergSchema, dateFormatter, timestampFormatter)
}.getOrElse(null)
physicalName -> partValueAsString
}
Some(physicalNameToPartValueMap)
DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)) {
Some(convertIcebergPartitionToPartitionValues(
fileScanTask.file().partition()))
} else None
(filePath, partitionValues)
}
Expand All @@ -171,4 +158,34 @@ class IcebergFileManifest(
}

override def close(): Unit = fileSparkResults.map(_.unpersist())

def convertIcebergPartitionToPartitionValues(partition: StructLike):
Map[String, String] = {
val icebergPartitionData = partition.asInstanceOf[PartitionData]
val fieldIdToIdx = icebergPartitionData.getPartitionType
.fields()
.asScala
.zipWithIndex
.map(kv => kv._1.fieldId() -> kv._2)
.toMap
val physicalNameToPartValueMap = physicalNameToField
.map {
case (physicalName, field) =>
val fieldIndex = fieldIdToIdx.get(field.fieldId())
val partValueAsString = fieldIndex
.map { idx =>
val partValue = icebergPartitionData.get(idx)
IcebergPartitionUtil.partitionValueToString(
field,
partValue,
icebergSchema,
dateFormatter,
timestampFormatter
)
}
.getOrElse(null)
physicalName -> partValueAsString
}
physicalNameToPartValueMap
}
}
Loading