Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
richardc-db committed Dec 22, 2024
1 parent 82e940f commit ca417c4
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ object DeltaSharingUtils extends Logging {
TimestampNTZTableFeature.name,
TypeWideningPreviewTableFeature.name,
TypeWideningTableFeature.name,
VariantTypePreviewTableFeature.name,
VariantTypeTableFeature.name
)

Expand All @@ -55,6 +56,7 @@ object DeltaSharingUtils extends Logging {
TimestampNTZTableFeature.name,
TypeWideningPreviewTableFeature.name,
TypeWideningTableFeature.name,
VariantTypePreviewTableFeature.name,
VariantTypeTableFeature.name
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1469,41 +1469,45 @@ trait DeltaSharingDataSourceDeltaSuiteBase
}
}

testSparkMasterOnly("basic variant test") {
withTempDir { tempDir =>
val deltaTableName = "variant_table"
withTable(deltaTableName) {
spark.range(0, 10)
.selectExpr("parse_json(cast(id as string)) v")
.write
.format("delta")
.mode("overwrite")
.saveAsTable(deltaTableName)
Seq(true, false).foreach { addGATableFeature =>
testSparkMasterOnly(s"basic variant test - GA feature enabled: $addGATableFeature") {
withTempDir { tempDir =>
val deltaTableName = "variant_table"
withTable(deltaTableName) {
spark.range(0, 10)
.selectExpr("parse_json(cast(id as string)) v")
.write
.format("delta")
.mode("overwrite")
.saveAsTable(deltaTableName)

val sharedTableName = "shared_table_variant"
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)
if (addGATableFeature) {
spark.sql(
s"ALTER TABLE $deltaTableName " +
s"SET TBLPROPERTIES('delta.feature.variantType' = 'supported')"
)
}

val expectedSchemaString = "StructType(StructField(v,VariantType,true))"
val expected = spark.read.format("delta").table(deltaTableName)
val sharedTableName = "shared_table_variant"
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)

def test(tablePath: String): Unit = {
assert(
expectedSchemaString == spark.read
val expectedSchemaString = "StructType(StructField(v,VariantType,true))"
val expected = spark.read.format("delta").table(deltaTableName)

def test(tablePath: String): Unit = {
val sharedDf = spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.load(tablePath)
.schema
.toString
)
val df =
spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
checkAnswer(df, expected)
}
assert(expectedSchemaString == sharedDf.schema.toString)
checkAnswer(sharedDf, expected)
}

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val profileFile = prepareProfileFile(tempDir)
test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName")
withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val profileFile = prepareProfileFile(tempDir)
test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[spark] class TestClientForDeltaFormatSharing(
TimestampNTZTableFeature,
TypeWideningPreviewTableFeature,
TypeWideningTableFeature,
VariantTypePreviewTableFeature,
VariantTypeTableFeature
).map(_.name)

Expand Down
20 changes: 18 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ object TableFeature {
V2CheckpointTableFeature,
RowTrackingFeature,
InCommitTimestampTableFeature,
VariantTypePreviewTableFeature,
VariantTypeTableFeature,
CoordinatedCommitsTableFeature,
CheckpointProtectionTableFeature)
Expand Down Expand Up @@ -615,14 +616,29 @@ object RedirectWriterOnlyFeature extends WriterFeature(name = "redirectWriterOnl
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true
}

object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType-preview")
object VariantTypePreviewTableFeature extends ReaderWriterFeature(name = "variantType-preview")
with FeatureAutomaticallyEnabledByMetadata {
override def metadataRequiresFeatureToBeEnabled(
protocol: Protocol, metadata: Metadata, spark: SparkSession): Boolean = {
SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema)
SchemaUtils.checkForVariantTypeColumnsRecursively(metadata.schema) &&
// Do not require the 'variantType-preview' table feature to be enabled if the 'variantType'
// table feature is enabled so tables with 'variantType' can be read.
!protocol.isFeatureSupported(VariantTypeTableFeature)
}
}

/**
* Stable feature for variant. The stable feature isn't enabled automatically yet when variants
* are present in the table schema. The feature spec is finalized though and by supporting the
* stable feature here we guarantee that this version can already read any table created in the
* future.
*
* Note: Users can manually add both the preview and stable features to a table using ADD FEATURE,
* although that's undocumented. This is allowed: the two feature specifications are compatible and
* supported.
*/
object VariantTypeTableFeature extends ReaderWriterFeature(name = "variantType")

object DeletionVectorsTableFeature
extends ReaderWriterFeature(name = "deletionVectors")
with FeatureAutomaticallyEnabledByMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,30 @@ class DeltaVariantSuite
deltaLog.unsafeVolatileSnapshot.protocol
}

private def assertVariantTypeTableFeatures(
tableName: String,
expectPreviewFeature: Boolean,
expectStableFeature: Boolean): Unit = {
val features = getProtocolForTable("tbl").readerAndWriterFeatures
if (expectPreviewFeature) {
assert(features.contains(VariantTypePreviewTableFeature))
} else {
assert(!features.contains(VariantTypePreviewTableFeature))
}
if (expectStableFeature) {
assert(features.contains(VariantTypeTableFeature))
} else {
assert(!features.contains(VariantTypeTableFeature))
}
}

test("create a new table with Variant, higher protocol and feature should be picked.") {
withTable("tbl") {
sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA")
sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))")
assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
assert(getProtocolForTable("tbl").readerAndWriterFeatures.contains(VariantTypeTableFeature))
assertVariantTypeTableFeatures(
"tbl", expectPreviewFeature = true, expectStableFeature = false)
}
}

Expand All @@ -63,7 +81,10 @@ class DeltaVariantSuite

val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
assert(
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(VariantTypeTableFeature),
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(
VariantTypePreviewTableFeature) &&
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(
VariantTypeTableFeature),
s"Table tbl contains VariantTypeFeature descriptor when its not supposed to"
)
}
Expand Down Expand Up @@ -102,7 +123,7 @@ class DeltaVariantSuite
e,
"DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT",
parameters = Map(
"unsupportedFeatures" -> VariantTypeTableFeature.name,
"unsupportedFeatures" -> VariantTypePreviewTableFeature.name,
"supportedFeatures" -> currentFeatures
)
)
Expand All @@ -112,14 +133,31 @@ class DeltaVariantSuite

assert(
getProtocolForTable("tbl") ==
VariantTypeTableFeature.minProtocolVersion
.withFeature(VariantTypeTableFeature)
VariantTypePreviewTableFeature.minProtocolVersion
.withFeature(VariantTypePreviewTableFeature)
.withFeature(InvariantsTableFeature)
.withFeature(AppendOnlyTableFeature)
)
}
}

test("variant stable and preview features can be supported simultaneously and read") {
withTable("tbl") {
sql("CREATE TABLE tbl(v VARIANT) USING delta")
sql("INSERT INTO tbl (SELECT parse_json(cast(id + 99 as string)) FROM range(1))")
assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
assertVariantTypeTableFeatures(
"tbl", expectPreviewFeature = true, expectStableFeature = false)
sql(
s"ALTER TABLE tbl " +
s"SET TBLPROPERTIES('delta.feature.variantType' = 'supported')"
)
assertVariantTypeTableFeatures(
"tbl", expectPreviewFeature = true, expectStableFeature = true)
assert(spark.table("tbl").selectExpr("v::int").head == Row(99))
}
}

test("VariantType may not be used as a partition column") {
withTable("delta_test") {
checkError(
Expand Down Expand Up @@ -267,7 +305,8 @@ class DeltaVariantSuite
.selectExpr("tableFeatures")
.collect()(0)
.getAs[MutableSeq[String]](0)
assert(tableFeatures.find(f => f == VariantTypeTableFeature.name).nonEmpty)
assert(tableFeatures.find(f => f == VariantTypePreviewTableFeature.name).nonEmpty)
assert(tableFeatures.find(f => f == VariantTypeTableFeature.name).isEmpty)
}
}

Expand Down

0 comments on commit ca417c4

Please sign in to comment.