Skip to content

Commit

Permalink
Refactor iceberg to delta partition value conversion
Browse files Browse the repository at this point in the history
refactor iceberg to delta partition value conversion to make code more modular. No function change.

Closes #2715

GitOrigin-RevId: 385178c997d40aa2fe1be8273dd65d01f2ddccc1
  • Loading branch information
lzlfred authored and allisonport-db committed Mar 7, 2024
1 parent 5cc0184 commit 4e9a15c
Showing 1 changed file with 59 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, SerializableFileStatus}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{DateFormatter, TimestampFormatter}
import org.apache.hadoop.fs.Path
import org.apache.iceberg.{PartitionData, RowLevelOperationMode, Table, TableProperties}
import org.apache.iceberg.{PartitionData, RowLevelOperationMode, StructLike, Table, TableProperties}
import org.apache.iceberg.transforms.IcebergPartitionUtil

import org.apache.spark.internal.Logging
Expand All @@ -48,6 +48,31 @@ class IcebergFileManifest(

val basePath = table.location()

val icebergSchema = table.schema()

// we must use field id to look up the partition value; consider scenario with iceberg
// behavior chance since 1.4.0:
// 1) create table with partition schema (a[col_name]: 1[field_id]), add file1;
// The partition data for file1 is (a:1:some_part_value)
// 2) add new partition col b and the partition schema becomes (a: 1, b: 2), add file2;
// the partition data for file2 is (a:1:some_part_value, b:2:some_part_value)
// 3) remove partition col a, then add file3;
// for iceberg < 1.4.0: the partFields is (a:1(void), b:2); the partition data for
// file3 is (a:1(void):null, b:2:some_part_value);
// for iceberg 1.4.0: the partFields is (b:2); When it reads file1 (a:1:some_part_value),
// it must use the field_id instead of index to look up the partition
// value, as the partField and partitionData from file1 have different
// ordering and thus same index indicates different column.
val physicalNameToField = table.spec().fields().asScala.collect {
case field if field.transform().toString != VOID_TRANSFORM =>
DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> field
}.toMap

val dateFormatter = DateFormatter()

val timestampFormatter =
TimestampFormatter(ConvertUtils.timestampPartitionPattern, java.util.TimeZone.getDefault)

override def numFiles: Long = {
if (_numFiles.isEmpty) getFileSparkResults()
_numFiles.get
Expand Down Expand Up @@ -77,30 +102,6 @@ class IcebergFileManifest(
val schemaBatchSize =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_IMPORT_BATCH_SIZE_SCHEMA_INFERENCE)

val partFields = table.spec().fields().asScala
val icebergSchema = table.schema()
// we must use field id to look up the partition value; consider scenario with iceberg
// behavior chance since 1.4.0:
// 1) create table with partition schema (a[col_name]: 1[field_id]), add file1;
// The partition data for file1 is (a:1:some_part_value)
// 2) add new partition col b and the partition schema becomes (a: 1, b: 2), add file2;
// the partition data for file2 is (a:1:some_part_value, b:2:some_part_value)
// 3) remove partition col a, then add file3;
// for iceberg < 1.4.0: the partFields is (a:1(void), b:2); the partition data for
// file3 is (a:1(void):null, b:2:some_part_value);
// for iceberg 1.4.0: the partFields is (b:2); When it reads file1 (a:1:some_part_value),
// it must use the field_id instead of index to look up the partition
// value, as the partField and partitionData from file1 have different
// ordering and thus same index indicates different column.
val physicalNameToField = partFields.collect {
case field if field.transform().toString != VOID_TRANSFORM =>
DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> field
}.toMap

val dateFormatter = DateFormatter()
val timestampFormatter = TimestampFormatter(ConvertUtils.timestampPartitionPattern,
java.util.TimeZone.getDefault)

// This flag is strongly not recommended to turn on, but we still provide a flag for regression
// purpose.
val unsafeConvertMorTable =
Expand Down Expand Up @@ -131,23 +132,9 @@ class IcebergFileManifest(
s"Please trigger an Iceberg compaction and retry the command.")
}
val partitionValues = if (spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)) {

val icebergPartition = fileScanTask.file().partition()
val icebergPartitionData = icebergPartition.asInstanceOf[PartitionData]
val fieldIdToIdx = icebergPartitionData.getPartitionType.fields().asScala.zipWithIndex
.map(kv => kv._1.fieldId() -> kv._2).toMap
val physicalNameToPartValueMap = physicalNameToField
.map { case (physicalName, field) =>
val fieldIndex = fieldIdToIdx.get(field.fieldId())
val partValueAsString = fieldIndex.map {idx =>
val partValue = icebergPartitionData.get(idx)
IcebergPartitionUtil.partitionValueToString(
field, partValue, icebergSchema, dateFormatter, timestampFormatter)
}.getOrElse(null)
physicalName -> partValueAsString
}
Some(physicalNameToPartValueMap)
DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)) {
Some(convertIcebergPartitionToPartitionValues(
fileScanTask.file().partition()))
} else None
(filePath, partitionValues)
}
Expand All @@ -171,4 +158,34 @@ class IcebergFileManifest(
}

override def close(): Unit = fileSparkResults.map(_.unpersist())

def convertIcebergPartitionToPartitionValues(partition: StructLike):
Map[String, String] = {
val icebergPartitionData = partition.asInstanceOf[PartitionData]
val fieldIdToIdx = icebergPartitionData.getPartitionType
.fields()
.asScala
.zipWithIndex
.map(kv => kv._1.fieldId() -> kv._2)
.toMap
val physicalNameToPartValueMap = physicalNameToField
.map {
case (physicalName, field) =>
val fieldIndex = fieldIdToIdx.get(field.fieldId())
val partValueAsString = fieldIndex
.map { idx =>
val partValue = icebergPartitionData.get(idx)
IcebergPartitionUtil.partitionValueToString(
field,
partValue,
icebergSchema,
dateFormatter,
timestampFormatter
)
}
.getOrElse(null)
physicalName -> partValueAsString
}
physicalNameToPartValueMap
}
}

0 comments on commit 4e9a15c

Please sign in to comment.