From 418105cc1f0b24cc4494aafbe27ae860c5028b05 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 22 May 2025 15:05:15 -0700 Subject: [PATCH 1/4] [SPARK-52272][SQL] V2SessionCatalog does not alter schema on Hive Catalog --- .../datasources/v2/V2SessionCatalog.scala | 22 ++- .../sql/hive/execution/HiveDDLSuite.scala | 183 +++++++++++++++++- 2 files changed, 199 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index e067730cfdf50..9ade00d89a631 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -302,10 +302,22 @@ class V2SessionCatalog(catalog: SessionCatalog) val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes) try { - catalog.alterTable( - catalogTable.copy( - properties = finalProperties, schema = schema, owner = owner, comment = comment, - collation = collation, storage = storage)) + if (SQLConf.get.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION).equals("hive")) { + if (changes.exists(!_.isInstanceOf[TableChange.ColumnChange])) { + catalog.alterTable( + catalogTable.copy( + properties = finalProperties, schema = schema, owner = owner, comment = comment, + collation = collation, storage = storage)) + } + if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) { + catalog.alterTableDataSchema(ident.asTableIdentifier, schema) + } + } else { + catalog.alterTable( + catalogTable.copy( + properties = finalProperties, schema = schema, owner = owner, comment = comment, + collation = collation, storage = storage)) + } } catch { case _: NoSuchTableException => throw QueryCompilationErrors.noSuchTableError(ident) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f33bca78eb934..b016d551409bb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -22,6 +22,8 @@ import java.net.URI import java.time.LocalDateTime import java.util.Locale +import scala.jdk.CollectionConverters._ + import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER import org.scalatest.BeforeAndAfterEach @@ -31,13 +33,16 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader} +import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET} import org.apache.spark.sql.hive.orc.OrcFileOperator @@ -3392,4 +3397,180 @@ class HiveDDLSuite ) } } + + test("SPARK-52272: V2SessionCatalog does not alter schema on Hive Catalog") { + val externalCatalog = new CustomHiveCatalog(spark.sessionState.catalog.externalCatalog) + val v1SessionCatalog = new SessionCatalog(externalCatalog) + val v2SessionCatalog = new V2SessionCatalog(v1SessionCatalog) + withTable("t1") { + val identifier = Identifier.of(Array("default"), "t1") + val outputSchema = new StructType().add("a", IntegerType, true, "comment1") + v2SessionCatalog.createTable( + identifier, + new TableInfo.Builder() + .withProperties(Map.empty.asJava) + .withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema)) + .withPartitions(Array.empty) + .build() + ) + v2SessionCatalog.alterTable(identifier, TableChange.setProperty("foo", "bar")) + val loaded = v2SessionCatalog.loadTable(identifier) + assert(loaded.properties().get("foo") == "bar") + + assert(externalCatalog.getAlterTableCalledTimes == 1) + assert(externalCatalog.getAlterTableDataSchemaCalledTimes == 0) + + v2SessionCatalog.alterTable(identifier, + TableChange.updateColumnComment(Array("a"), "comment2")) + val loaded2 = v2SessionCatalog.loadTable(identifier) + assert(loaded2.columns().length == 1) + assert(loaded2.columns.head.comment() == "comment2") + + assert(externalCatalog.getAlterTableCalledTimes == 1) + assert(externalCatalog.getAlterTableDataSchemaCalledTimes == 1) + } + } +} + +class CustomHiveCatalog(catalog: ExternalCatalog) extends ExternalCatalog { + + private var alterTableCalledTimes: Int = 0 + private var alterTableDataSchemaCalledTimes: Int = 0 + + override def alterTable(tableDefinition: CatalogTable): Unit = { + alterTableCalledTimes += 1 + catalog.alterTable(tableDefinition) + } + + override def alterTableDataSchema( + db: String, + table: String, + newDataSchema: StructType): Unit = { + alterTableDataSchemaCalledTimes += 1 + catalog.alterTableDataSchema(db, table, newDataSchema) + } + + def getAlterTableCalledTimes: Int = alterTableCalledTimes + def getAlterTableDataSchemaCalledTimes: Int = alterTableDataSchemaCalledTimes + + override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = + catalog.createDatabase(dbDefinition, ignoreIfExists) + + override def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = + catalog.dropDatabase(db, ignoreIfNotExists, cascade) + + override def alterDatabase(dbDefinition: CatalogDatabase): Unit = + catalog.alterDatabase(dbDefinition) + + override def getDatabase(db: String): CatalogDatabase = catalog.getDatabase(db) + + override def databaseExists(db: String): Boolean = catalog.databaseExists(db) + + override def listDatabases(): Seq[String] = catalog.listDatabases() + + override def listDatabases(pattern: String): Seq[String] = catalog.listDatabases(pattern) + + override def setCurrentDatabase(db: String): Unit = catalog.setCurrentDatabase(db) + + override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = + catalog.createTable(tableDefinition, ignoreIfExists) + + override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, + purge: Boolean): Unit = + catalog.dropTable(db, table, ignoreIfNotExists, purge) + + override def renameTable(db: String, oldName: String, newName: String): Unit = + catalog.renameTable(db, oldName, newName) + + override def alterTableStats(db: String, table: String, + stats: Option[CatalogStatistics]): Unit = catalog.alterTableStats(db, table, stats) + + override def getTable(db: String, table: String): CatalogTable = + catalog.getTable(db, table) + + override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] = + catalog.getTablesByName(db, tables) + + override def tableExists(db: String, table: String): Boolean = + catalog.tableExists(db, table) + + override def listTables(db: String): Seq[String] = catalog.listTables(db) + + override def listTables(db: String, pattern: String): Seq[String] = + catalog.listTables(db, pattern) + + override def listViews(db: String, pattern: String): Seq[String] = + catalog.listViews(db, pattern) + + override def loadTable(db: String, table: String, loadPath: String, isOverwrite: Boolean, + isSrcLocal: Boolean): Unit = catalog.loadTable(db, table, loadPath, isOverwrite, isSrcLocal) + + override def loadPartition(db: String, table: String, loadPath: String, + partition: TablePartitionSpec, + isOverwrite: Boolean, + inheritTableSpecs: Boolean, + isSrcLocal: Boolean): Unit = + catalog.loadPartition(db, table, loadPath, partition, isOverwrite, inheritTableSpecs, + isSrcLocal) + + override def loadDynamicPartitions(db: String, table: String, loadPath: String, + partition: TablePartitionSpec, replace: Boolean, numDP: Int): Unit = + catalog.loadDynamicPartitions(db, table, loadPath, partition, replace, numDP) + + override def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = + catalog.createPartitions(db, table, parts, ignoreIfExists) + + override def dropPartitions(db: String, table: String, parts: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit = + catalog.dropPartitions(db, table, parts, ignoreIfNotExists, purge, retainData) + + override def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit = + catalog.renamePartitions(db, table, specs, newSpecs) + + override def alterPartitions(db: String, table: String, + parts: Seq[CatalogTablePartition]): Unit = + catalog.alterPartitions(db, table, parts) + + override def getPartition(db: String, table: String, spec: TablePartitionSpec): + CatalogTablePartition = + catalog.getPartition(db, table, spec) + + override def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): + Option[CatalogTablePartition] = + catalog.getPartitionOption(db, table, spec) + + override def listPartitionNames(db: String, table: String, + partialSpec: Option[TablePartitionSpec]): Seq[String] = + catalog.listPartitionNames(db, table, partialSpec) + + override def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec]): + Seq[CatalogTablePartition] = catalog.listPartitions(db, table, partialSpec) + + override def listPartitionsByFilter(db: String, + table: String, + predicates: Seq[Expression], + defaultTimeZoneId: String): Seq[CatalogTablePartition] = + catalog.listPartitionsByFilter(db, table, predicates, defaultTimeZoneId) + + override def createFunction(db: String, funcDefinition: CatalogFunction): Unit = + catalog.createFunction(db, funcDefinition) + + override def dropFunction(db: String, funcName: String): Unit = catalog.dropFunction(db, funcName) + + override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = + catalog.alterFunction(db, funcDefinition) + + override def renameFunction(db: String, oldName: String, newName: String): Unit = + catalog.renameFunction(db, oldName, newName) + + override def getFunction(db: String, funcName: String): CatalogFunction = + catalog.getFunction(db, funcName) + + override def functionExists(db: String, funcName: String): Boolean = + catalog.functionExists(db, funcName) + + override def listFunctions(db: String, pattern: String): Seq[String] = + catalog.listFunctions(db, pattern) } From 74686c40034971aa533f5440081a441599cd92e5 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 27 May 2025 14:47:15 -0700 Subject: [PATCH 2/4] Use mockito --- .../sql/hive/execution/HiveDDLSuite.scala | 161 ++---------------- 1 file changed, 10 insertions(+), 151 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index b016d551409bb..13e8d3721d81e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -26,6 +26,8 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{spy, times, verify} import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkException, SparkUnsupportedOperationException} @@ -33,8 +35,6 @@ import org.apache.spark.sql.{AnalysisException, Row, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME @@ -3399,8 +3399,8 @@ class HiveDDLSuite } test("SPARK-52272: V2SessionCatalog does not alter schema on Hive Catalog") { - val externalCatalog = new CustomHiveCatalog(spark.sessionState.catalog.externalCatalog) - val v1SessionCatalog = new SessionCatalog(externalCatalog) + val spyCatalog = spy(spark.sessionState.catalog.externalCatalog) + val v1SessionCatalog = new SessionCatalog(spyCatalog) val v2SessionCatalog = new V2SessionCatalog(v1SessionCatalog) withTable("t1") { val identifier = Identifier.of(Array("default"), "t1") @@ -3417,8 +3417,9 @@ class HiveDDLSuite val loaded = v2SessionCatalog.loadTable(identifier) assert(loaded.properties().get("foo") == "bar") - assert(externalCatalog.getAlterTableCalledTimes == 1) - assert(externalCatalog.getAlterTableDataSchemaCalledTimes == 0) + verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) + verify(spyCatalog, times(0)).alterTableDataSchema( + any[String], any[String], any[StructType]) v2SessionCatalog.alterTable(identifier, TableChange.updateColumnComment(Array("a"), "comment2")) @@ -3426,151 +3427,9 @@ class HiveDDLSuite assert(loaded2.columns().length == 1) assert(loaded2.columns.head.comment() == "comment2") - assert(externalCatalog.getAlterTableCalledTimes == 1) - assert(externalCatalog.getAlterTableDataSchemaCalledTimes == 1) + verify(spyCatalog, times(1)).alterTable(any[CatalogTable]) + verify(spyCatalog, times(1)).alterTableDataSchema( + any[String], any[String], any[StructType]) } } } - -class CustomHiveCatalog(catalog: ExternalCatalog) extends ExternalCatalog { - - private var alterTableCalledTimes: Int = 0 - private var alterTableDataSchemaCalledTimes: Int = 0 - - override def alterTable(tableDefinition: CatalogTable): Unit = { - alterTableCalledTimes += 1 - catalog.alterTable(tableDefinition) - } - - override def alterTableDataSchema( - db: String, - table: String, - newDataSchema: StructType): Unit = { - alterTableDataSchemaCalledTimes += 1 - catalog.alterTableDataSchema(db, table, newDataSchema) - } - - def getAlterTableCalledTimes: Int = alterTableCalledTimes - def getAlterTableDataSchemaCalledTimes: Int = alterTableDataSchemaCalledTimes - - override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = - catalog.createDatabase(dbDefinition, ignoreIfExists) - - override def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = - catalog.dropDatabase(db, ignoreIfNotExists, cascade) - - override def alterDatabase(dbDefinition: CatalogDatabase): Unit = - catalog.alterDatabase(dbDefinition) - - override def getDatabase(db: String): CatalogDatabase = catalog.getDatabase(db) - - override def databaseExists(db: String): Boolean = catalog.databaseExists(db) - - override def listDatabases(): Seq[String] = catalog.listDatabases() - - override def listDatabases(pattern: String): Seq[String] = catalog.listDatabases(pattern) - - override def setCurrentDatabase(db: String): Unit = catalog.setCurrentDatabase(db) - - override def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = - catalog.createTable(tableDefinition, ignoreIfExists) - - override def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, - purge: Boolean): Unit = - catalog.dropTable(db, table, ignoreIfNotExists, purge) - - override def renameTable(db: String, oldName: String, newName: String): Unit = - catalog.renameTable(db, oldName, newName) - - override def alterTableStats(db: String, table: String, - stats: Option[CatalogStatistics]): Unit = catalog.alterTableStats(db, table, stats) - - override def getTable(db: String, table: String): CatalogTable = - catalog.getTable(db, table) - - override def getTablesByName(db: String, tables: Seq[String]): Seq[CatalogTable] = - catalog.getTablesByName(db, tables) - - override def tableExists(db: String, table: String): Boolean = - catalog.tableExists(db, table) - - override def listTables(db: String): Seq[String] = catalog.listTables(db) - - override def listTables(db: String, pattern: String): Seq[String] = - catalog.listTables(db, pattern) - - override def listViews(db: String, pattern: String): Seq[String] = - catalog.listViews(db, pattern) - - override def loadTable(db: String, table: String, loadPath: String, isOverwrite: Boolean, - isSrcLocal: Boolean): Unit = catalog.loadTable(db, table, loadPath, isOverwrite, isSrcLocal) - - override def loadPartition(db: String, table: String, loadPath: String, - partition: TablePartitionSpec, - isOverwrite: Boolean, - inheritTableSpecs: Boolean, - isSrcLocal: Boolean): Unit = - catalog.loadPartition(db, table, loadPath, partition, isOverwrite, inheritTableSpecs, - isSrcLocal) - - override def loadDynamicPartitions(db: String, table: String, loadPath: String, - partition: TablePartitionSpec, replace: Boolean, numDP: Int): Unit = - catalog.loadDynamicPartitions(db, table, loadPath, partition, replace, numDP) - - override def createPartitions(db: String, table: String, parts: Seq[CatalogTablePartition], - ignoreIfExists: Boolean): Unit = - catalog.createPartitions(db, table, parts, ignoreIfExists) - - override def dropPartitions(db: String, table: String, parts: Seq[TablePartitionSpec], - ignoreIfNotExists: Boolean, purge: Boolean, retainData: Boolean): Unit = - catalog.dropPartitions(db, table, parts, ignoreIfNotExists, purge, retainData) - - override def renamePartitions(db: String, table: String, specs: Seq[TablePartitionSpec], - newSpecs: Seq[TablePartitionSpec]): Unit = - catalog.renamePartitions(db, table, specs, newSpecs) - - override def alterPartitions(db: String, table: String, - parts: Seq[CatalogTablePartition]): Unit = - catalog.alterPartitions(db, table, parts) - - override def getPartition(db: String, table: String, spec: TablePartitionSpec): - CatalogTablePartition = - catalog.getPartition(db, table, spec) - - override def getPartitionOption(db: String, table: String, spec: TablePartitionSpec): - Option[CatalogTablePartition] = - catalog.getPartitionOption(db, table, spec) - - override def listPartitionNames(db: String, table: String, - partialSpec: Option[TablePartitionSpec]): Seq[String] = - catalog.listPartitionNames(db, table, partialSpec) - - override def listPartitions(db: String, table: String, partialSpec: Option[TablePartitionSpec]): - Seq[CatalogTablePartition] = catalog.listPartitions(db, table, partialSpec) - - override def listPartitionsByFilter(db: String, - table: String, - predicates: Seq[Expression], - defaultTimeZoneId: String): Seq[CatalogTablePartition] = - catalog.listPartitionsByFilter(db, table, predicates, defaultTimeZoneId) - - override def createFunction(db: String, funcDefinition: CatalogFunction): Unit = - catalog.createFunction(db, funcDefinition) - - override def dropFunction(db: String, funcName: String): Unit = catalog.dropFunction(db, funcName) - - override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = - catalog.alterFunction(db, funcDefinition) - - override def renameFunction(db: String, oldName: String, newName: String): Unit = - catalog.renameFunction(db, oldName, newName) - - override def getFunction(db: String, funcName: String): CatalogFunction = - catalog.getFunction(db, funcName) - - override def functionExists(db: String, funcName: String): Boolean = - catalog.functionExists(db, funcName) - - override def listFunctions(db: String, pattern: String): Seq[String] = - catalog.listFunctions(db, pattern) -} From 01f15f08666622f3631046959e0f6504d5a68f82 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 27 May 2025 15:05:12 -0700 Subject: [PATCH 3/4] Make this change for hive and in-memory mode --- .../datasources/v2/V2SessionCatalog.scala | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 9ade00d89a631..1088e3f7a7206 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -302,22 +302,15 @@ class V2SessionCatalog(catalog: SessionCatalog) val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes) try { - if (SQLConf.get.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION).equals("hive")) { - if (changes.exists(!_.isInstanceOf[TableChange.ColumnChange])) { - catalog.alterTable( - catalogTable.copy( - properties = finalProperties, schema = schema, owner = owner, comment = comment, - collation = collation, storage = storage)) - } - if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) { - catalog.alterTableDataSchema(ident.asTableIdentifier, schema) - } - } else { + if (changes.exists(!_.isInstanceOf[TableChange.ColumnChange])) { catalog.alterTable( catalogTable.copy( properties = finalProperties, schema = schema, owner = owner, comment = comment, collation = collation, storage = storage)) } + if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) { + catalog.alterTableDataSchema(ident.asTableIdentifier, schema) + } } catch { case _: NoSuchTableException => throw QueryCompilationErrors.noSuchTableError(ident) From d52b0db74ed8bb964c364e1199aa6403ce631c10 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 27 May 2025 22:29:36 -0700 Subject: [PATCH 4/4] Remove some tests- alter table drop column is now unsupported for V2SessionCatalog due to V2 sessionCatalog limitations --- .../v2/V2SessionCatalogSuite.scala | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 19d8cba253081..63e54812922ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -590,22 +590,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { parameters = Map("fieldName" -> "`missing_col`", "fields" -> "`id`, `data`")) } - test("alterTable: rename top-level column") { - val catalog = newCatalog() - - catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - val table = catalog.loadTable(testIdent) - - assert(table.columns === columns) - - catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id")) - val updated = catalog.loadTable(testIdent) - - val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType) - - assert(updated.schema == expectedSchema) - } - test("alterTable: rename nested column") { val catalog = newCatalog() @@ -627,26 +611,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(updated.columns === expectedColumns) } - test("alterTable: rename struct column") { - val catalog = newCatalog() - - val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val tableColumns = columns :+ Column.create("point", pointStruct) - - catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps) - val table = catalog.loadTable(testIdent) - - assert(table.columns === tableColumns) - - catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p")) - val updated = catalog.loadTable(testIdent) - - val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType) - val expectedColumns = columns :+ Column.create("p", newPointStruct) - - assert(updated.columns === expectedColumns) - } - test("alterTable: rename missing column fails") { val catalog = newCatalog() @@ -686,21 +650,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { assert(updated.columns === expectedColumns) } - test("alterTable: delete top-level column") { - val catalog = newCatalog() - - catalog.createTable(testIdent, columns, emptyTrans, emptyProps) - val table = catalog.loadTable(testIdent) - - assert(table.columns === columns) - - catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false)) - val updated = catalog.loadTable(testIdent) - - val expectedSchema = new StructType().add("data", StringType) - assert(updated.schema == expectedSchema) - } - test("alterTable: delete nested column") { val catalog = newCatalog()