Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Delta Uniform] Support expireSnapshot in uniform iceberg table automatically when OPTIMIZE #3298

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.ExpireSnapshots
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser

Expand Down Expand Up @@ -184,6 +185,12 @@ class IcebergConversionTransaction(
}
}

class ExpireSnapshotHelper(expireSnapshot: ExpireSnapshots)
extends TransactionHelper(expireSnapshot) {

override def opType: String = "expireSnapshot"
}

//////////////////////
// Member variables //
//////////////////////
Expand Down Expand Up @@ -240,6 +247,12 @@ class IcebergConversionTransaction(
ret
}

def getExpireSnapshotHelper(): ExpireSnapshotHelper = {
val ret = new ExpireSnapshotHelper(txn.expireSnapshots())
fileUpdates += ret
ret
}

/**
* Handles the following update scenarios
* - partition update -> throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.Breaks._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaErrors, DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormat, UniversalFormatConverter}
import org.apache.spark.sql.delta.DeltaOperations.OPTIMIZE_OPERATION_NAME
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, RemoveFile}
import org.apache.spark.sql.delta.hooks.IcebergConverterHook
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -291,6 +292,10 @@ class IcebergConverter(spark: SparkSession)
// or to the specified batch size.
val actionBatchSize =
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_ACTIONS_TO_CONVERT)
// If there exists any OPTIMIZE action inside actions to convert,
// It will trigger snapshot expiration for iceberg table
var needsExpireSnapshot = false

prevConvertedSnapshotOpt match {
case Some(prevSnapshot) =>
// Read the actions directly from the delta json files.
Expand All @@ -313,9 +318,12 @@ class IcebergConverter(spark: SparkSession)
actionsToConvert.foreach { actionsIter =>
try {
actionsIter.grouped(actionBatchSize).foreach { actionStrs =>
val actions = actionStrs.map(Action.fromJson)
needsExpireSnapshot ||= existsOptimize(actions)

runIcebergConversionForActions(
icebergTxn,
actionStrs.map(Action.fromJson),
actions,
log.dataPath,
prevConvertedSnapshotOpt)
}
Expand All @@ -342,6 +350,7 @@ class IcebergConverter(spark: SparkSession)

actionsToConvert.grouped(actionBatchSize)
.foreach { actions =>
needsExpireSnapshot ||= existsOptimize(actions)
runIcebergConversionForActions(icebergTxn, actions, log.dataPath, None)
}

Expand All @@ -350,6 +359,13 @@ class IcebergConverter(spark: SparkSession)
icebergTxn.updateTableMetadata(snapshotToConvert.metadata, snapshotToConvert.metadata)
}
}
if (needsExpireSnapshot) {
logInfo(s"Committing iceberg snapshot expiration for uniform table " +
s"[path = ${log.logPath}] tableId=${log.tableId}]")
val expireSnapshotHelper = icebergTxn.getExpireSnapshotHelper()
expireSnapshotHelper.commit()
}

icebergTxn.commit()
Some(snapshotToConvert.version, snapshotToConvert.timestamp)
}
Expand Down Expand Up @@ -458,4 +474,12 @@ class IcebergConverter(spark: SparkSession)
}
}
}

private def existsOptimize(actions: Seq[Action]): Boolean = {
actions.exists { action =>
val sa = action.wrap
sa.commitInfo != null && sa.commitInfo.operation == OPTIMIZE_OPERATION_NAME
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,24 @@ package org.apache.spark.sql.delta.icebergShaded
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfigs, DeltaLog}
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfig, DeltaConfigs, DeltaErrors, DeltaLog, DeltaRuntimeException}
import org.apache.spark.sql.delta.DeltaConfigs.parseCalendarInterval
import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema}
import shadedForDelta.org.apache.iceberg.Metrics
import shadedForDelta.org.apache.iceberg.TableProperties

// scalastyle:off import.ordering.noEmptyLine
import shadedForDelta.org.apache.iceberg.catalog.{Namespace, TableIdentifier => IcebergTableIdentifier}
// scalastyle:on import.ordering.noEmptyLine
import shadedForDelta.org.apache.iceberg.hive.HiveCatalog

import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier => SparkTableIdentifier}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.CalendarInterval

object IcebergTransactionUtils
extends DeltaLogging
Expand Down Expand Up @@ -150,8 +154,13 @@ object IcebergTransactionUtils
*/
def getIcebergPropertiesFromDeltaProperties(
properties: Map[String, String]): Map[String, String] = {
val additionalPropertyFromDelta = additionalIcebergPropertiesFromDeltaProperties(properties)
val prefix = DeltaConfigs.DELTA_UNIVERSAL_FORMAT_ICEBERG_CONFIG_PREFIX
properties.filterKeys(_.startsWith(prefix)).map(kv => (kv._1.stripPrefix(prefix), kv._2)).toMap
val specifiedProperty =
properties.filterKeys(_.startsWith(prefix)).map(kv => (kv._1.stripPrefix(prefix), kv._2))
.toMap
validateIcebergProperty(additionalPropertyFromDelta, specifiedProperty)
additionalPropertyFromDelta ++ specifiedProperty
}

/** Returns the mapping of logicalPartitionColName -> physicalPartitionColName */
Expand Down Expand Up @@ -226,4 +235,78 @@ object IcebergTransactionUtils
}
IcebergTableIdentifier.of(namespace, identifier.table)
}

// Additional iceberg properties inferred from delta properties
// If user doesn't specify the property in iceberg table, we infer it from delta properties
// Otherwise, we validate the user specified property with the inferred property
// Here's a list of additional properties:
// 1. iceberg's history.expire.max-snapshot-age-ms:
// inferred as min of delta.logRetentionDuration and delta.deletedFileRetentionDuration
private def additionalIcebergPropertiesFromDeltaProperties(
properties: Map[String, String]): Map[String, String] = {
icebergRetentionPropertyFromDelta(properties)
}

private def icebergRetentionPropertyFromDelta(
deltaProperties: Map[String, String]): Map[String, String] = {
val icebergSnapshotRetentionFromDelta = deltaRetentionMsFrom(deltaProperties)
lazy val icebergDefault = TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT
icebergSnapshotRetentionFromDelta.map { retentionMs =>
Map(TableProperties.MAX_SNAPSHOT_AGE_MS -> (retentionMs min icebergDefault).toString)
}.getOrElse(Map.empty)
}

// Given additional iceberg property constrained/inferred by Delta and
// user specified iceberg property, validate that they don't conflict
private def validateIcebergProperty(
additionalPropertyFromDelta: Map[String, String],
customizedProperty: Map[String, String]): Unit = {
validateIcebergRetentionWithDelta(additionalPropertyFromDelta, customizedProperty)
}

// Validation:
// Customized iceberg retention should be <= to the delta retention
// Which is min of logRetentionDuration and deletedFileRetentionDuration
private def validateIcebergRetentionWithDelta(
additionalPropertyFromDelta: Map[String, String],
usrSpecifiedProperty: Map[String, String]): Unit = {
lazy val defaultRetentionDelta =
calendarStrToMs(DeltaConfigs.LOG_RETENTION.defaultValue) min
calendarStrToMs(DeltaConfigs.TOMBSTONE_RETENTION.defaultValue)
lazy val retentionMsFromDelta = additionalPropertyFromDelta
.getOrElse(TableProperties.MAX_SNAPSHOT_AGE_MS, s"$defaultRetentionDelta").toLong

usrSpecifiedProperty.get(TableProperties.MAX_SNAPSHOT_AGE_MS).foreach { proposedMs =>
if (proposedMs.toLong > retentionMsFromDelta) {
throw new IllegalArgumentException(
s"Uniform iceberg's ${TableProperties.MAX_SNAPSHOT_AGE_MS} should be set >= " +
s" min of delta's ${DeltaConfigs.LOG_RETENTION.key} and" +
s" ${DeltaConfigs.TOMBSTONE_RETENTION.key}." +
s" Current delta retention min in MS: $retentionMsFromDelta," +
s" Proposed iceberg retention in Ms: $proposedMs")
}
}
}

private def deltaRetentionMsFrom(deltaProperties: Map[String, String]): Option[Long] = {
def getCalendarMsFrom(
conf: DeltaConfig[CalendarInterval], properties: Map[String, String]): Option[Long] = {
properties.get(conf.key).map(calendarStrToMs)
}

def minOf(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match {
case (Some(a), Some(b)) => Some(a min b)
case (a, b) => a orElse b
}

val logRetention = getCalendarMsFrom(DeltaConfigs.LOG_RETENTION, deltaProperties)
val vacuumRetention = getCalendarMsFrom(DeltaConfigs.TOMBSTONE_RETENTION, deltaProperties)
minOf(logRetention, vacuumRetention)
}

// Converts a string in calendar interval format to milliseconds
private def calendarStrToMs(calendarStr: String): Long = {
val interval = parseCalendarInterval(calendarStr)
DeltaConfigs.getMilliSeconds(interval)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ trait DeltaConfigsBase extends DeltaLogging {
case lKey if lKey.startsWith("delta.") =>
Option(entries.get(lKey.stripPrefix("delta."))) match {
case Some(deltaConfig) => deltaConfig(value) // validate the value
case None if lKey.startsWith(DELTA_UNIVERSAL_FORMAT_CONFIG_PREFIX) =>
// always allow any delta universal format config with key converted to lower case
lKey -> value
case None if allowArbitraryProperties =>
logConsole(
s"You are setting a property: $key that is not recognized by this " +
Expand Down
Loading