diff --git a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala index ca3ccbb3d..3cb965928 100644 --- a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala @@ -29,6 +29,18 @@ trait QbeastSnapshot { */ def isInitial: Boolean + /** + * The current table description. + * @return + */ + def loadDescription: String + + /** + * The current table properties of the snapshot. + * @return + */ + def loadProperties: Map[String, String] + /** * Load methods */ diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 5649d38de..41825d405 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -44,6 +44,22 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) private val metadataMap: Map[String, String] = snapshot.metadata.configuration + /** + * The current table properties of the snapshot. + * + * We filter out the revision, leaving only Revision ID. + * + * @return + */ + override def loadProperties: Map[String, String] = + snapshot.getProperties.filterKeys(k => !k.startsWith(MetadataConfig.revision)).toMap + + /** + * The current table description. + * @return + */ + override def loadDescription: String = snapshot.metadata.description + /** * Constructs revision dictionary * @@ -195,4 +211,5 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) * Loads Staging AddFiles */ def loadStagingFiles(): Dataset[AddFile] = stagingFiles() + } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala index b4e9a665c..455263a54 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala @@ -293,9 +293,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal ifExists = true).run(spark) // Other cases not handled yet - case _ => return getSessionCatalog().alterTable(ident, changes: _*) + case _ => } + // Update session catalog with changes + getSessionCatalog().alterTable(ident, changes: _*) loadTable(ident) } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala index 1113dbc5d..4647b2723 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala @@ -15,16 +15,20 @@ */ package io.qbeast.spark.internal.sources.v2 +import io.qbeast.context.QbeastContext import io.qbeast.core.model.QTableID import io.qbeast.spark.internal.sources.QbeastBaseRelation import io.qbeast.spark.table.IndexedTableFactory import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.catalog.TableCapability._ +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.sources.BaseRelation @@ -33,6 +37,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.V2toV1Fallback import java.util +import scala.collection.mutable import scala.collection.JavaConverters._ /** @@ -65,11 +70,15 @@ case class QbeastTableImpl( private val indexedTable = tableFactory.getIndexedTable(tableId) + private lazy val spark = SparkSession.active + + private lazy val initialSnapshot = QbeastContext.metadataManager.loadSnapshot(tableId) + private lazy val table: CatalogTable = if (catalogTable.isDefined) catalogTable.get else { // Get table Metadata if no catalog table is provided - SparkSession.active.sessionState.catalog + spark.sessionState.catalog .getTableMetadata(tableIdentifier) } @@ -90,7 +99,30 @@ case class QbeastTableImpl( QbeastBaseRelation.forQbeastTableWithOptions(indexedTable, properties().asScala.toMap) } - override def properties(): util.Map[String, String] = options.asJava + override def properties(): util.Map[String, String] = { + val description = initialSnapshot.loadDescription + val base = mutable.Map() ++ initialSnapshot.loadProperties + options.foreach { + case (key, value) if !base.contains(key) => + base.put(key, value) + case _ => // do nothing + } + base.put(TableCatalog.PROP_PROVIDER, "qbeast") + base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri)) + catalogTable.foreach { table => + if (table.owner != null && table.owner.nonEmpty) { + base.put(TableCatalog.PROP_OWNER, table.owner) + } + v1Table.storage.properties.foreach { case (key, value) => + base.put(TableCatalog.OPTION_PREFIX + key, value) + } + if (v1Table.tableType == CatalogTableType.EXTERNAL) { + base.put(TableCatalog.PROP_EXTERNAL, "true") + } + } + Option(description).foreach(base.put(TableCatalog.PROP_COMMENT, _)) + base.asJava + } override def v1Table: CatalogTable = table diff --git a/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala index 25ed14b9d..a1da08258 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala @@ -84,7 +84,10 @@ class QbeastTableImplTest extends QbeastIntegrationTestSpec with CatalogTestSuit None, indexedTableFactory) - qbeastTableImpl.properties() shouldBe properties.asJava + val qbeastTableProperties = qbeastTableImpl.properties() + + qbeastTableProperties.get("provider") shouldBe "qbeast" + qbeastTableProperties.get("columnsToIndex") shouldBe "id" }) diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index 7583b9117..5538ee90c 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -16,6 +16,7 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.spark.QbeastIntegrationTestSpec +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.AnalysisException import org.apache.spark.SparkConf @@ -238,13 +239,11 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo it should "persist altered properties on the _delta_log" in withQbeastContextSparkAndTmpWarehouse( (spark, tmpDir) => { - val df = loadTestData(spark) - df.write.format("qbeast").option("columnsToIndex", "user_id,price").save(tmpDir) - spark.sql(s"CREATE TABLE t1 USING qbeast LOCATION '$tmpDir'") + spark.sql("CREATE TABLE t1(id INT) USING qbeast TBLPROPERTIES ('columnsToIndex'= 'id')") spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") // Check the delta log info - val deltaLog = DeltaLog.forTable(spark, tmpDir) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1")) val snapshot = deltaLog.update() val properties = snapshot.getProperties @@ -253,4 +252,65 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo }) + it should "persist UNSET properties in _delta_catalog" in withQbeastContextSparkAndTmpWarehouse( + (spark, _) => { + + spark.sql( + "CREATE TABLE t1(id INT) " + + "USING qbeast " + + "TBLPROPERTIES ('columnsToIndex'= 'id')") + + spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1")) + val snapshot = deltaLog.update() + val properties = snapshot.getProperties + + properties should contain key "k" + properties("k") shouldBe "v" + + spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')") + + // Check the delta log info + val updatedProperties = deltaLog.update().getProperties + updatedProperties should not contain key("k") + }) + + it should "ensure consistency with the session catalog" in withQbeastContextSparkAndTmpWarehouse( + (spark, tmpDir) => { + + import spark.implicits._ + + spark.sql( + "CREATE TABLE t1(id INT) " + + "USING qbeast " + + "TBLPROPERTIES ('columnsToIndex'= 'id')") + spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") + // Check the delta log info + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1")) + val catalog = spark.sessionState.catalog + val showProperties = spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap + + val snapshot = deltaLog.update() + val properties = snapshot.getProperties + val catalogProperties = + catalog.getTableMetadata(TableIdentifier("t1")).properties + properties should contain key "k" + catalogProperties should contain key ("k") + showProperties should contain key ("k") + + spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')") + // Check the delta log info + val updatedSnapshot = deltaLog.update() + val updatedProperties = updatedSnapshot.getProperties + val updatedCatalogProperties = + catalog.getTableMetadata(TableIdentifier("t1")).properties + val updatedShowProperties = + spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap + + updatedProperties should not contain key("k") + updatedCatalogProperties should not contain key("k") + updatedShowProperties should not contain key("k") + }) + } diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala index 6297293bc..d22770540 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala @@ -145,6 +145,25 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite .asScala should contain("newProperty" -> "newValue") }) + it should "unset properties" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { + val qbeastCatalog = createQbeastCatalog(spark) + val tableIdentifier = Identifier.of(defaultNamespace, "student") + qbeastCatalog.createTable( + tableIdentifier, + columns, + Array.empty[Transform], + Map("newProperty" -> "newValue").asJava) + + val unsetPropertiesChange = TableChange.removeProperty("newProperty") + // Alter table with new information + qbeastCatalog.alterTable(tableIdentifier, unsetPropertiesChange) + + qbeastCatalog + .loadTable(Identifier.of(defaultNamespace, "student")) + .properties() + .asScala should not contain ("newProperty" -> "newValue") + }) + it should "drop table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { val qbeastCatalog = createQbeastCatalog(spark) val tableIdentifier = Identifier.of(defaultNamespace, "student") diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala index 832e69201..5540e8eb4 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala @@ -55,11 +55,13 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { .first() .getString(0) shouldBe "qbeast" // Check Table Properties - table + val tableProperties = table .where("col_name == 'Table Properties'") .select("data_type") .first() - .getString(0) shouldBe "[columnsToIndex=id,option.columnsToIndex=id]" + .getString(0) + tableProperties should include("columnsToIndex=id") + tableProperties should include("option.columnsToIndex=id") })