Skip to content

Commit

Permalink
[UNIFORM] Fix converting non-ISO timestamp partition values to Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
amogh-jahagirdar committed Jan 6, 2025
1 parent 7769d31 commit 4f77c13
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.icebergShaded

import java.nio.ByteBuffer
import java.time.Instant
import java.time.format.DateTimeParseException

import scala.collection.JavaConverters._
import scala.util.control.NonFatal
Expand All @@ -26,6 +27,8 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfig, DeltaConfigs
import org.apache.spark.sql.delta.DeltaConfigs.parseCalendarInterval
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.PartitionUtils.{timestampPartitionPattern, utcFormatter}
import org.apache.spark.sql.delta.util.TimestampFormatter
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema}
Expand Down Expand Up @@ -207,6 +210,9 @@ object IcebergTransactionUtils
builder
}

private lazy val timestampFormatter =
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault)

/**
* Follows deserialization as specified here
* https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Partition-Value-Serialization
Expand Down Expand Up @@ -235,13 +241,25 @@ object IcebergTransactionUtils
case _: TimestampNTZType =>
java.sql.Timestamp.valueOf(partitionVal).getNanos/1000.asInstanceOf[Long]
case _: TimestampType =>
Instant.parse(partitionVal).getNano/1000.asInstanceOf[Long]
try {
getMicrosSinceEpoch(partitionVal)
} catch {
case _: DateTimeParseException =>
// In case of non-ISO timestamps, parse and interpret the timestamp as system time
// and then convert to UTC
val utcInstant = utcFormatter.format(timestampFormatter.parse(partitionVal))
getMicrosSinceEpoch(utcInstant)
}
case _ =>
throw DeltaErrors.universalFormatConversionFailedException(
version, "iceberg", "Unexpected partition data type " + elemType)
}
}

private def getMicrosSinceEpoch(instant: String): Long = {
Instant.parse(instant).getNano/1000.asInstanceOf[Long]
}

private def getMetricsForIcebergDataFile(
statsParser: String => InternalRow,
stats: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ class DelayedCommitProtocol(
// since there's no guarantee the stats will exist.
@transient val addedStatuses = new ArrayBuffer[AddFile]

val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.SSSSSS][.S]"

// Constants for CDC partition manipulation. Used only in newTaskTempFile(), but we define them
// here to avoid building a new redundant regex for every file.
protected val cdcPartitionFalse = s"${CDC_PARTITION_COL}=false"
Expand Down Expand Up @@ -145,7 +143,7 @@ class DelayedCommitProtocol(

val dateFormatter = DateFormatter()
val timestampFormatter =
TimestampFormatter(timestampPartitionPattern, java.util.TimeZone.getDefault)
TimestampFormatter(PartitionUtils.timestampPartitionPattern, java.util.TimeZone.getDefault)

/**
* ToDo: Remove the use of this PartitionUtils API with type inference logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object PartitionSpec {

private[delta] object PartitionUtils {

lazy val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
lazy val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.SSSSSS][.S]"
lazy val utcFormatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSSz", ZoneId.of("Z"))

case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkThrowable
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.delta.DeltaTestUtils.withTimeZone
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf.{LEAF_NODE_DEFAULT_PARALLELISM, PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
Expand Down Expand Up @@ -1467,16 +1468,6 @@ abstract class DeltaInsertIntoTests(
}
}

private def withTimeZone(zone: String)(f: => Unit): Unit = {
val currentDefault = TimeZone.getDefault
try {
TimeZone.setDefault(TimeZone.getTimeZone(zone))
f
} finally {
TimeZone.setDefault(currentDefault)
}
}

// This behavior is specific to Delta
testQuietly("insertInto: schema enforcement") {
val t1 = "tbl"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta

import java.io.{BufferedReader, File, InputStreamReader}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale
import java.util.{Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -451,6 +451,16 @@ object DeltaTestUtils extends DeltaTestUtilsBase {
def fulfillsVersionRequirements(actual: Protocol, requirement: Protocol): Boolean =
lteq(requirement, actual)
}

def withTimeZone(zone: String)(f: => Unit): Unit = {
val currentDefault = TimeZone.getDefault
try {
TimeZone.setDefault(TimeZone.getTimeZone(zone))
f
} finally {
TimeZone.setDefault(currentDefault)
}
}
}

trait DeltaTestUtilsForTempViews
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package org.apache.spark.sql.delta.uniform

import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.DeltaTestUtils.withTimeZone
import org.apache.spark.sql.delta.sources.DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types._

abstract class UniFormE2EIcebergSuiteBase extends UniFormE2ETest {
Expand Down Expand Up @@ -108,6 +113,36 @@ abstract class UniFormE2EIcebergSuiteBase extends UniFormE2ETest {
}
}

test("Insert Partitioned Table - UTC Adjustment for Non-ISO Timestamp Partition values") {
withTable(testTableName) {
withTimeZone("GMT-8") {
withSQLConf(UTC_TIMESTAMP_PARTITION_VALUES.key -> "false") {
write(
s"""CREATE TABLE $testTableName (id int, ts timestamp)
| USING DELTA
| PARTITIONED BY (ts)
| TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
write(s"INSERT INTO $testTableName" +
s" VALUES (1, timestamp'2021-06-30 00:00:00.123456')")
val verificationQuery = s"SELECT id FROM $testTableName " +
s"where ts=TIMESTAMP'2021-06-30 08:00:00.123456UTC'"
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName))
val partitionColName = deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
val partitionValues = deltaLog.update().allFiles.head.partitionValues
assert(partitionValues === Map(partitionColName -> "2021-06-30 00:00:00.123456"))

// Verify against Delta read and Iceberg read
checkAnswer(spark.sql(verificationQuery), Seq(Row(1)))
checkAnswer(createReaderSparkSession.sql(verificationQuery), Seq(Row(1)))
}
}
}
}

test("CIUD") {
withTable(testTableName) {
write(
Expand Down

0 comments on commit 4f77c13

Please sign in to comment.