diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala index e5efc1c128d..d1f13f316b8 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala @@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction} import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables +import org.apache.spark.sql.catalyst.catalog.CatalogTable + sealed trait IcebergTableOp case object CREATE_TABLE extends IcebergTableOp case object WRITE_TABLE extends IcebergTableOp @@ -46,6 +48,7 @@ case object REPLACE_TABLE extends IcebergTableOp * @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE. */ class IcebergConversionTransaction( + protected val catalogTable: CatalogTable, protected val conf: Configuration, protected val postCommitSnapshot: Snapshot, protected val tableOp: IcebergTableOp = WRITE_TABLE, @@ -316,16 +319,19 @@ class IcebergConversionTransaction( /////////////////////// protected def createIcebergTxn(): IcebergTransaction = { - val hadoopTables = new HadoopTables(conf) - val tableExists = hadoopTables.exists(tablePath.toString) + val hiveCatalog = IcebergTransactionUtils.createHiveCatalog(conf) + val icebergTableId = IcebergTransactionUtils + .convertSparkTableIdentifierToIcebergHive(catalogTable.identifier) + + val tableExists = hiveCatalog.tableExists(icebergTableId) def tableBuilder = { val properties = getIcebergPropertiesFromDeltaProperties( postCommitSnapshot.metadata.configuration ) - hadoopTables - .buildTable(tablePath.toString, icebergSchema) + hiveCatalog + .buildTable(icebergTableId, icebergSchema) .withPartitionSpec(partitionSpec) .withProperties(properties.asJava) } @@ -334,7 +340,7 @@ class IcebergConversionTransaction( case WRITE_TABLE => if (tableExists) { recordFrameProfile("IcebergConversionTransaction", "loadTable") { - hadoopTables.load(tablePath.toString).newTransaction() + hiveCatalog.loadTable(icebergTableId).newTransaction() } } else { throw new IllegalStateException(s"Cannot write to table $tablePath. Table doesn't exist.") diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index 843f584e27f..09253528045 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -23,16 +23,17 @@ import scala.collection.JavaConverters._ import scala.util.control.Breaks._ import scala.util.control.NonFatal -import org.apache.spark.sql.delta.{DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormatConverter} +import org.apache.spark.sql.delta.{DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormat, UniversalFormatConverter} import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, RemoveFile} import org.apache.spark.sql.delta.hooks.IcebergConverterHook import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.Path -import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables +import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable object IcebergConverter { @@ -60,9 +61,9 @@ class IcebergConverter(spark: SparkSession) // Save an atomic reference of the snapshot being converted, and the txn that triggered // resulted in the specified snapshot protected val currentConversion = - new AtomicReference[(Snapshot, Option[OptimisticTransactionImpl])]() + new AtomicReference[(Snapshot, OptimisticTransactionImpl)]() protected val standbyConversion = - new AtomicReference[(Snapshot, Option[OptimisticTransactionImpl])]() + new AtomicReference[(Snapshot, OptimisticTransactionImpl)]() // Whether our async converter thread is active. We may already have an alive thread that is // about to shutdown, but in such cases this value should return false. @@ -81,7 +82,10 @@ class IcebergConverter(spark: SparkSession) */ override def enqueueSnapshotForConversion( snapshotToConvert: Snapshot, - txn: Option[OptimisticTransactionImpl]): Unit = { + txn: OptimisticTransactionImpl): Unit = { + if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) { + return + } val log = snapshotToConvert.deltaLog // Replace any previously queued snapshot val previouslyQueued = standbyConversion.getAndSet((snapshotToConvert, txn)) @@ -126,7 +130,7 @@ class IcebergConverter(spark: SparkSession) } // Get a snapshot to convert from the icebergQueue. Sets the queue to null after. - private def getNextSnapshot: (Snapshot, Option[OptimisticTransactionImpl]) = + private def getNextSnapshot: (Snapshot, OptimisticTransactionImpl) = asyncThreadLock.synchronized { val potentialSnapshotAndTxn = standbyConversion.get() currentConversion.set(potentialSnapshotAndTxn) @@ -155,21 +159,66 @@ class IcebergConverter(spark: SparkSession) } } + /** + * Convert the specified snapshot into Iceberg for the given catalogTable + * @param snapshotToConvert the snapshot that needs to be converted to Iceberg + * @param catalogTable the catalogTable this conversion targets. + * @return Converted Delta version and commit timestamp + */ + override def convertSnapshot( + snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] = { + if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) { + return None + } + convertSnapshot(snapshotToConvert, None, catalogTable) + } + + /** + * Convert the specified snapshot into Iceberg when performing an OptimisticTransaction + * on a delta table. + * @param snapshotToConvert the snapshot that needs to be converted to Iceberg + * @param txn the transaction that triggers the conversion. It must + * contain the catalogTable this conversion targets. + * @return Converted Delta version and commit timestamp + */ + override def convertSnapshot( + snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = { + if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) { + return None + } + txn.catalogTable match { + case Some(table) => convertSnapshot(snapshotToConvert, Some(txn), table) + case _ => + logWarning(s"CatalogTable for table ${snapshotToConvert.deltaLog.tableId} " + + s"is empty in txn. Skip iceberg conversion.") + recordDeltaEvent( + snapshotToConvert.deltaLog, + "delta.iceberg.conversion.skipped.emptyCatalogTable", + data = Map( + "version" -> snapshotToConvert.version + ) + ) + None + } + } + /** * Convert the specified snapshot into Iceberg. NOTE: This operation is blocking. Call * enqueueSnapshotForConversion to run the operation asynchronously. * @param snapshotToConvert the snapshot that needs to be converted to Iceberg * @param txnOpt the OptimisticTransaction that created snapshotToConvert. * Used as a hint to avoid recomputing old metadata. + * @param catalogTable the catalogTable this conversion targets * @return Converted Delta version and commit timestamp */ - override def convertSnapshot( + private def convertSnapshot( snapshotToConvert: Snapshot, - txnOpt: Option[OptimisticTransactionImpl]): Option[(Long, Long)] = + txnOpt: Option[OptimisticTransactionImpl], + catalogTable: CatalogTable): Option[(Long, Long)] = recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") { val log = snapshotToConvert.deltaLog val lastDeltaVersionConverted: Option[Long] = - loadLastDeltaVersionConverted(snapshotToConvert) + loadLastDeltaVersionConverted(snapshotToConvert, catalogTable) val maxCommitsToConvert = spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT) @@ -202,8 +251,14 @@ class IcebergConverter(spark: SparkSession) case (Some(_), None) => REPLACE_TABLE case (None, None) => CREATE_TABLE } + + UniversalFormat.enforceSupportInCatalog(catalogTable, snapshotToConvert.metadata) match { + case Some(updatedTable) => spark.sessionState.catalog.alterTable(updatedTable) + case _ => + } + val icebergTxn = new IcebergConversionTransaction( - log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted) + catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted) // Write out the actions taken since the last conversion (or since table creation). // This is done in batches, with each batch corresponding either to one delta file, @@ -268,18 +323,10 @@ class IcebergConverter(spark: SparkSession) Some(snapshotToConvert.version, snapshotToConvert.timestamp) } - override def loadLastDeltaVersionConverted(snapshot: Snapshot): Option[Long] = + override def loadLastDeltaVersionConverted( + snapshot: Snapshot, catalogTable: CatalogTable): Option[Long] = recordFrameProfile("Delta", "IcebergConverter.loadLastDeltaVersionConverted") { - val deltaLog = snapshot.deltaLog - val hadoopTables = new HadoopTables(deltaLog.newDeltaHadoopConf()) - if (hadoopTables.exists(deltaLog.dataPath.toString)) { - hadoopTables - .load(deltaLog.dataPath.toString) - .properties() - .asScala - .get(IcebergConverter.DELTA_VERSION_PROPERTY) - .map(_.toLong) - } else None + catalogTable.properties.get(IcebergConverter.DELTA_VERSION_PROPERTY).map(_.toLong) } /** diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala index bb6d1f6288c..bb19347e6b4 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.scala @@ -22,11 +22,16 @@ import scala.util.control.NonFatal import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaConfigs, DeltaLog} import org.apache.spark.sql.delta.actions.{AddFile, FileAction, RemoveFile} import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import shadedForDelta.org.apache.iceberg.{DataFile, DataFiles, FileFormat, PartitionSpec, Schema => IcebergSchema} +// scalastyle:off import.ordering.noEmptyLine +import shadedForDelta.org.apache.iceberg.catalog.{Namespace, TableIdentifier => IcebergTableIdentifier} +// scalastyle:on import.ordering.noEmptyLine +import shadedForDelta.org.apache.iceberg.hive.HiveCatalog -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier => SparkTableIdentifier} +import org.apache.spark.sql.types.StructType object IcebergTransactionUtils extends DeltaLogging @@ -172,4 +177,30 @@ object IcebergTransactionUtils builder } + + /** + * Create an Iceberg HiveCatalog + * @param conf: Hadoop Configuration + * @return + */ + def createHiveCatalog(conf : Configuration) : HiveCatalog = { + val catalog = new HiveCatalog() + catalog.setConf(conf) + catalog.initialize("spark_catalog", Map.empty[String, String].asJava) + catalog + } + + /** + * Encode Spark table identifier to Iceberg table identifier by putting + * only "database" to the "namespace" in Iceberg table identifier. + * See [[HiveCatalog.isValidateNamespace]] + */ + def convertSparkTableIdentifierToIcebergHive( + identifier: SparkTableIdentifier): IcebergTableIdentifier = { + val namespace = (identifier.database) match { + case Some(database) => Namespace.of(database) + case _ => Namespace.empty() + } + IcebergTableIdentifier.of(namespace, identifier.table) + } } diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala index 94d0b7e590a..152bef7fc53 100644 --- a/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.delta import java.io.File - import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar._ - -import org.apache.spark.sql.{QueryTest, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession} import org.apache.spark.util.Utils import org.apache.spark.SparkContext @@ -39,95 +37,36 @@ class ConvertToIcebergSuite extends QueryTest with Eventually { override def beforeAll(): Unit = { super.beforeAll() - warehousePath = Utils.createTempDir() - _sparkSessionWithDelta = createSparkSessionWithDelta() - _sparkSessionWithIceberg = createSparkSessionWithIceberg() - require(!_sparkSessionWithDelta.eq(_sparkSessionWithIceberg), "separate sessions expected") } override def beforeEach(): Unit = { super.beforeEach() - testTablePath = Utils.createTempDir().getAbsolutePath } override def afterEach(): Unit = { super.afterEach() - Utils.deleteRecursively(new File(testTablePath)) - _sparkSessionWithDelta.sql(s"DROP TABLE IF EXISTS $testTableName") - _sparkSessionWithIceberg.sql(s"DROP TABLE IF EXISTS $testTableName") } override def afterAll(): Unit = { super.afterAll() - if (warehousePath != null) Utils.deleteRecursively(warehousePath) - SparkContext.getActive.foreach(_.stop()) - } - - test("basic test - path based table created with SQL") { - runDeltaSql(s"""CREATE TABLE delta.`$testTablePath` (col1 INT) USING DELTA - |TBLPROPERTIES ( - | 'delta.columnMapping.mode' = 'id', - | 'delta.universalFormat.enabledFormats' = 'iceberg' - |)""".stripMargin) - verifyReadWithIceberg(testTablePath, Seq()) - runDeltaSql(s"INSERT INTO delta.`$testTablePath` VALUES (123)") - verifyReadWithIceberg(testTablePath, Seq(Row(123))) - } - - test("basic test - catalog table created with SQL") { - runDeltaSql(s"""CREATE TABLE $testTableName(col1 INT) USING DELTA - |LOCATION '$testTablePath' - |TBLPROPERTIES ( - | 'delta.columnMapping.mode' = 'id', - | 'delta.universalFormat.enabledFormats' = 'iceberg' - |)""".stripMargin) - verifyReadWithIceberg(testTablePath, Seq()) - runDeltaSql(s"INSERT INTO $testTableName VALUES (123)") - verifyReadWithIceberg(testTablePath, Seq(Row(123))) - } - - test("basic test - path based table created with DataFrame") { - withDeltaSparkSession { deltaSpark => - withDefaultTablePropsInSQLConf { - deltaSpark.range(10).write.format("delta").save(testTablePath) - } - } - verifyReadWithIceberg(testTablePath, 0 to 9 map (Row(_))) - withDeltaSparkSession { deltaSpark => - deltaSpark.range(10, 20, 1) - .write.format("delta").mode("append").save(testTablePath) - } - verifyReadWithIceberg(testTablePath, 0 to 19 map (Row(_))) } - test("basic test - catalog table created with DataFrame") { - withDeltaSparkSession { deltaSpark => - withDefaultTablePropsInSQLConf { - deltaSpark.range(10).write.format("delta") - .option("path", testTablePath) - .saveAsTable(testTableName) - } - } - verifyReadWithIceberg(testTablePath, 0 to 9 map (Row(_))) + def runDeltaSql(sqlStr: String): Unit = { withDeltaSparkSession { deltaSpark => - deltaSpark.range(10, 20, 1) - .write.format("delta").mode("append") - .option("path", testTablePath) - .saveAsTable(testTableName) + deltaSpark.sql(sqlStr) } - verifyReadWithIceberg(testTablePath, 0 to 19 map (Row(_))) } - def runDeltaSql(sqlStr: String): Unit = { - withDeltaSparkSession { deltaSpark => - deltaSpark.sql(sqlStr) + def runIcebergSql(sqlStr: String): DataFrame = { + withIcebergSparkSession { icebergSpark => + icebergSpark.sql(sqlStr) } } - def verifyReadWithIceberg(tablePath: String, expectedAnswer: Seq[Row]): Unit = { + def verifyReadWithIceberg(tableName: String, expectedAnswer: Seq[Row]): Unit = { withIcebergSparkSession { icebergSparkSession => eventually(timeout(10.seconds)) { - val icebergDf = icebergSparkSession.read.format("iceberg").load(tablePath) + val icebergDf = icebergSparkSession.sql(s"SELECT * FROM ${tableName}") checkAnswer(icebergDf, expectedAnswer) } } @@ -145,15 +84,15 @@ class ConvertToIcebergSuite extends QueryTest with Eventually { ) { f } } - def withDeltaSparkSession(f: SparkSession => Unit): Unit = { + def withDeltaSparkSession[T](f: SparkSession => T): T = { withSparkSession(_sparkSessionWithDelta, f) } - def withIcebergSparkSession(f: SparkSession => Unit): Unit = { + def withIcebergSparkSession[T](f: SparkSession => T): T = { withSparkSession(_sparkSessionWithIceberg, f) } - def withSparkSession(sessionToUse: SparkSession, f: SparkSession => Unit): Unit = { + def withSparkSession[T](sessionToUse: SparkSession, f: SparkSession => T): T = { try { SparkSession.setDefaultSession(sessionToUse) SparkSession.setActiveSession(sessionToUse) @@ -174,6 +113,7 @@ class ConvertToIcebergSuite extends QueryTest with Eventually { .appName("DeltaSession") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hive") .getOrCreate() SparkSession.clearActiveSession() SparkSession.clearDefaultSession() diff --git a/icebergShaded/generate_iceberg_jars.py b/icebergShaded/generate_iceberg_jars.py index 551638aca90..f8a9f39ea72 100644 --- a/icebergShaded/generate_iceberg_jars.py +++ b/icebergShaded/generate_iceberg_jars.py @@ -40,6 +40,8 @@ "api/build/libs/iceberg-api-*.jar", "core/build/libs/iceberg-core-*.jar", "parquet/build/libs/iceberg-parquet-*.jar", + "hive-metastore/build/libs/iceberg-hive-*.jar", + "data/build/libs/iceberg-data-*.jar" ] iceberg_root_dir = path.abspath(path.dirname(__file__)) @@ -112,6 +114,8 @@ def generate_iceberg_jars(): build_args = "-x spotlessCheck -x checkstyleMain -x test -x integrationTest" run_cmd("./gradlew :iceberg-core:build %s" % build_args) run_cmd("./gradlew :iceberg-parquet:build %s" % build_args) + run_cmd("./gradlew :iceberg-hive-metastore:build %s" % build_args) + run_cmd("./gradlew :iceberg-data:build %s" % build_args) print(">>> Copying JARs to lib directory") shutil.rmtree(iceberg_lib_dir, ignore_errors=True) diff --git a/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch b/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch new file mode 100644 index 00000000000..23386853c2d --- /dev/null +++ b/icebergShaded/iceberg_src_patches/0003-iceberg-hive-metastore-must-not-remove-unknown-table-data.patch @@ -0,0 +1,45 @@ +HiveTableOperations should have its catalog operations compatible with Delta + +This patch prevent Iceberg HiveTableOperations to overwrite catalog table properties used by Delta. It also writes a dummy schema to metastore to be aligned with Delta's behavior. +--- +Index: hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +=================================================================== +diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java (revision ede085d0f7529f24acd0c81dd0a43f7bb969b763) ++++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java (revision 4470b919dd6a97b0f6d6b7d57d1d57348a40c025) +@@ -43,6 +43,7 @@ + import org.apache.hadoop.hive.metastore.IMetaStoreClient; + import org.apache.hadoop.hive.metastore.TableType; + import org.apache.hadoop.hive.metastore.api.InvalidObjectException; ++import org.apache.hadoop.hive.metastore.api.FieldSchema; + import org.apache.hadoop.hive.metastore.api.LockComponent; + import org.apache.hadoop.hive.metastore.api.LockLevel; + import org.apache.hadoop.hive.metastore.api.LockRequest; +@@ -286,7 +287,9 @@ + LOG.debug("Committing new table: {}", fullName); + } + +- tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes ++ StorageDescriptor newsd = storageDescriptor(metadata, hiveEngineEnabled); ++ newsd.getSerdeInfo().setParameters(tbl.getSd().getSerdeInfo().getParameters()); ++ tbl.setSd(newsd); // set to pickup any schema changes + + String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; +@@ -393,6 +396,7 @@ + @VisibleForTesting + void persistTable(Table hmsTable, boolean updateHiveTable) + throws TException, InterruptedException { ++ hmsTable.getSd().setCols(Collections.singletonList(new FieldSchema("col", "array", ""))); + if (updateHiveTable) { + metaClients.run( + client -> { +@@ -468,7 +472,7 @@ + } + + // remove any props from HMS that are no longer present in Iceberg table props +- obsoleteProps.forEach(parameters::remove); ++ // obsoleteProps.forEach(parameters::remove); + + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); + parameters.put(METADATA_LOCATION_PROP, newMetadataLocation); diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala index 174652f0d34..3f47e03c4ab 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable /** * Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature). @@ -144,8 +145,35 @@ object UniversalFormat extends DeltaLogging { } } -} + val ICEBERG_TABLE_TYPE_KEY = "table_type" + + /** + * Update CatalogTable to mark it readable by other table readers (iceberg for now). + * This method ensures 'table_type' = 'ICEBERG' when uniform is enabled, + * and ensure table_type is not 'ICEBERG' when uniform is not enabled + * If the key has other values than 'ICEBERG', this method will not touch it for compatibility + * + * @param table catalogTable before change + * @param metadata snapshot metadata + * @return the converted catalog, or None if no change is made + */ + def enforceSupportInCatalog(table: CatalogTable, metadata: Metadata): Option[CatalogTable] = { + val icebergInCatalog = table.properties.get(ICEBERG_TABLE_TYPE_KEY) match { + case Some(value) => value.equalsIgnoreCase(ICEBERG_FORMAT) + case _ => false + } + (icebergEnabled(metadata), icebergInCatalog) match { + case (true, false) => + Some(table.copy(properties = table.properties + + (ICEBERG_TABLE_TYPE_KEY -> ICEBERG_FORMAT))) + case (false, true) => + Some(table.copy(properties = + table.properties - ICEBERG_TABLE_TYPE_KEY)) + case _ => None + } + } +} /** Class to facilitate the conversion of Delta into other table formats. */ abstract class UniversalFormatConverter(spark: SparkSession) { /** @@ -157,12 +185,36 @@ abstract class UniversalFormatConverter(spark: SparkSession) { */ def enqueueSnapshotForConversion( snapshotToConvert: Snapshot, - txn: Option[OptimisticTransactionImpl]): Unit + txn: OptimisticTransactionImpl): Unit + + /** + * Perform a blocking conversion when performing an OptimisticTransaction + * on a delta table. + * + * @param snapshotToConvert the snapshot that needs to be converted to Iceberg + * @param txn the transaction that triggers the conversion. Used as a hint to + * avoid recomputing old metadata. It must contain the catalogTable + * this conversion targets. + * @return Converted Delta version and commit timestamp + */ + def convertSnapshot( + snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] - /** Perform a blocking conversion. */ + /** + * Perform a blocking conversion for the given catalogTable + * + * @param snapshotToConvert the snapshot that needs to be converted to Iceberg + * @param catalogTable the catalogTable this conversion targets. + * @return Converted Delta version and commit timestamp + */ def convertSnapshot( - snapshotToConvert: Snapshot, - txnOpt: Option[OptimisticTransactionImpl]): Option[(Long, Long)] + snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] - def loadLastDeltaVersionConverted(snapshot: Snapshot): Option[Long] + /** + * Fetch the delta version corresponding to the latest conversion. + * @param snapshot the snapshot to be converted + * @param table the catalogTable with info of previous conversions + * @return None if no previous conversion found + */ + def loadLastDeltaVersionConverted(snapshot: Snapshot, table: CatalogTable): Option[Long] } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 77f74f79755..8e0722d95ab 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -193,7 +193,7 @@ case class CreateDeltaTableCommand( if (UniversalFormat.icebergEnabled(postCommitSnapshot.metadata)) { - deltaLog.icebergConverter.convertSnapshot(postCommitSnapshot, None) + deltaLog.icebergConverter.convertSnapshot(postCommitSnapshot, tableWithLocation) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala index aca8907b711..b241d1fa28b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala @@ -46,6 +46,6 @@ object IcebergConverterHook extends PostCommitHook with DeltaLogging { postCommitSnapshot .deltaLog .icebergConverter - .enqueueSnapshotForConversion(postCommitSnapshot, Some(txn)) + .enqueueSnapshotForConversion(postCommitSnapshot, txn) } }