From ca417c417143d90afe67d22123febe10bed6bc83 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Sun, 22 Dec 2024 13:38:05 -0500 Subject: [PATCH] init --- .../sharing/spark/DeltaSharingUtils.scala | 2 + .../DeltaSharingDataSourceDeltaSuite.scala | 60 ++++++++++--------- .../TestClientForDeltaFormatSharing.scala | 1 + .../apache/spark/sql/delta/TableFeature.scala | 20 ++++++- .../spark/sql/delta/DeltaVariantSuite.scala | 51 ++++++++++++++-- 5 files changed, 98 insertions(+), 36 deletions(-) diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala index c03924f13b1..94512fda994 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala @@ -45,6 +45,7 @@ object DeltaSharingUtils extends Logging { TimestampNTZTableFeature.name, TypeWideningPreviewTableFeature.name, TypeWideningTableFeature.name, + VariantTypePreviewTableFeature.name, VariantTypeTableFeature.name ) @@ -55,6 +56,7 @@ object DeltaSharingUtils extends Logging { TimestampNTZTableFeature.name, TypeWideningPreviewTableFeature.name, TypeWideningTableFeature.name, + VariantTypePreviewTableFeature.name, VariantTypeTableFeature.name ) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala index e55d1caa8b0..b05a173afae 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala @@ -1469,41 +1469,45 @@ trait DeltaSharingDataSourceDeltaSuiteBase } } - testSparkMasterOnly("basic variant test") { - withTempDir { tempDir => - val deltaTableName = "variant_table" - withTable(deltaTableName) { - spark.range(0, 10) - .selectExpr("parse_json(cast(id as string)) v") - .write - .format("delta") - .mode("overwrite") - .saveAsTable(deltaTableName) + Seq(true, false).foreach { addGATableFeature => + testSparkMasterOnly(s"basic variant test - GA feature enabled: $addGATableFeature") { + withTempDir { tempDir => + val deltaTableName = "variant_table" + withTable(deltaTableName) { + spark.range(0, 10) + .selectExpr("parse_json(cast(id as string)) v") + .write + .format("delta") + .mode("overwrite") + .saveAsTable(deltaTableName) - val sharedTableName = "shared_table_variant" - prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) - prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + if (addGATableFeature) { + spark.sql( + s"ALTER TABLE $deltaTableName " + + s"SET TBLPROPERTIES('delta.feature.variantType' = 'supported')" + ) + } - val expectedSchemaString = "StructType(StructField(v,VariantType,true))" - val expected = spark.read.format("delta").table(deltaTableName) + val sharedTableName = "shared_table_variant" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) - def test(tablePath: String): Unit = { - assert( - expectedSchemaString == spark.read + val expectedSchemaString = "StructType(StructField(v,VariantType,true))" + val expected = spark.read.format("delta").table(deltaTableName) + + def test(tablePath: String): Unit = { + val sharedDf = spark.read .format("deltaSharing") .option("responseFormat", "delta") .load(tablePath) - .schema - .toString - ) - val df = - spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) - checkAnswer(df, expected) - } + assert(expectedSchemaString == sharedDf.schema.toString) + checkAnswer(sharedDf, expected) + } - withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { - val profileFile = prepareProfileFile(tempDir) - test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } } } } diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala index bf5167366a8..15e73eb15de 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -67,6 +67,7 @@ private[spark] class TestClientForDeltaFormatSharing( TimestampNTZTableFeature, TypeWideningPreviewTableFeature, TypeWideningTableFeature, + VariantTypePreviewTableFeature, VariantTypeTableFeature ).map(_.name) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index d972dd1e031..a9fdc3158d0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -365,6 +365,7 @@ object TableFeature { V2CheckpointTableFeature, RowTrackingFeature, InCommitTimestampTableFeature, + VariantTypePreviewTableFeature, VariantTypeTableFeature, CoordinatedCommitsTableFeature, CheckpointProtectionTableFeature) @@ -615,14 +616,29 @@ object RedirectWriterOnlyFeature extends WriterFeature(name = "redirectWriterOnl override def automaticallyUpdateProtocolOfExistingTables: Boolean = true } -object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-preview") +object VariantTypePreviewTableFeature extends ReaderWriterFeature(name = "variantType-preview") with FeatureAutomaticallyEnabledByMetadata { override def metadataRequiresFeatureToBeEnabled( protocol: Protocol, metadata: Metadata, spark: SparkSession): Boolean = { - SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema) + SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema) && + // Do not require the 'variantType-preview' table feature to be enabled if the 'variantType' + // table feature is enabled so tables with 'variantType' can be read. + !protocol.isFeatureSupported(VariantTypeTableFeature) } } +/** + * Stable feature for variant. The stable feature isn't enabled automatically yet when variants + * are present in the table schema. The feature spec is finalized though and by supporting the + * stable feature here we guarantee that this version can already read any table created in the + * future. + * + * Note: Users can manually add both the preview and stable features to a table using ADD FEATURE, + * although that's undocumented. This is allowed: the two feature specifications are compatible and + * supported. + */ +object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType") + object DeletionVectorsTableFeature extends ReaderWriterFeature(name = "deletionVectors") with FeatureAutomaticallyEnabledByMetadata { diff --git a/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala b/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala index daac96a28e4..e1f403cd566 100644 --- a/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala +++ b/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala @@ -47,12 +47,30 @@ class DeltaVariantSuite deltaLog.unsafeVolatileSnapshot.protocol } + private def assertVariantTypeTableFeatures( + tableName: String, + expectPreviewFeature: Boolean, + expectStableFeature: Boolean): Unit = { + val features = getProtocolForTable("tbl").readerAndWriterFeatures + if (expectPreviewFeature) { + assert(features.contains(VariantTypePreviewTableFeature)) + } else { + assert(!features.contains(VariantTypePreviewTableFeature)) + } + if (expectStableFeature) { + assert(features.contains(VariantTypeTableFeature)) + } else { + assert(!features.contains(VariantTypeTableFeature)) + } + } + test("create a new table with Variant, higher protocol and feature should be picked.") { withTable("tbl") { sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA") sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) - assert(getProtocolForTable("tbl").readerAndWriterFeatures.contains(VariantTypeTableFeature)) + assertVariantTypeTableFeatures( + "tbl", expectPreviewFeature = true, expectStableFeature = false) } } @@ -63,7 +81,10 @@ class DeltaVariantSuite val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) assert( - !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature), + !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported( + VariantTypePreviewTableFeature) && + !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported( + VariantTypeTableFeature), s"Table tbl contains VariantTypeFeature descriptor when its not supposed to" ) } @@ -102,7 +123,7 @@ class DeltaVariantSuite e, "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT", parameters = Map( - "unsupportedFeatures" -> VariantTypeTableFeature.name, + "unsupportedFeatures" -> VariantTypePreviewTableFeature.name, "supportedFeatures" -> currentFeatures ) ) @@ -112,14 +133,31 @@ class DeltaVariantSuite assert( getProtocolForTable("tbl") == - VariantTypeTableFeature.minProtocolVersion - .withFeature(VariantTypeTableFeature) + VariantTypePreviewTableFeature.minProtocolVersion + .withFeature(VariantTypePreviewTableFeature) .withFeature(InvariantsTableFeature) .withFeature(AppendOnlyTableFeature) ) } } + test("variant stable and preview features can be supported simultaneously and read") { + withTable("tbl") { + sql("CREATE TABLE tbl(v VARIANT) USING delta") + sql("INSERT INTO tbl (SELECT parse_json(cast(id + 99 as string)) FROM range(1))") + assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + assertVariantTypeTableFeatures( + "tbl", expectPreviewFeature = true, expectStableFeature = false) + sql( + s"ALTER TABLE tbl " + + s"SET TBLPROPERTIES('delta.feature.variantType' = 'supported')" + ) + assertVariantTypeTableFeatures( + "tbl", expectPreviewFeature = true, expectStableFeature = true) + assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) + } + } + test("VariantType may not be used as a partition column") { withTable("delta_test") { checkError( @@ -267,7 +305,8 @@ class DeltaVariantSuite .selectExpr("tableFeatures") .collect()(0) .getAs[MutableSeq[String]](0) - assert(tableFeatures.find(f => f == VariantTypeTableFeature.name).nonEmpty) + assert(tableFeatures.find(f => f == VariantTypePreviewTableFeature.name).nonEmpty) + assert(tableFeatures.find(f => f == VariantTypeTableFeature.name).isEmpty) } }