From 670165f884c668abaf19587db8774620e47defa7 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Fri, 26 Jul 2024 15:45:45 +0200 Subject: [PATCH 01/11] WIP on TBLPROPERTIES consistency --- .../io/qbeast/core/model/QbeastSnapshot.scala | 14 +++ .../spark/delta/DeltaQbeastSnapshot.scala | 16 ++++ .../sources/catalog/QbeastCatalog.scala | 4 +- .../internal/sources/v2/QbeastTableImpl.scala | 30 ++++++- .../QbeastCatalogIntegrationTest.scala | 87 ++++++++++++++++++- .../sources/catalog/QbeastCatalogTest.scala | 34 ++++++++ 6 files changed, 178 insertions(+), 7 deletions(-) diff --git a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala index ca3ccbb3d..97b889fa0 100644 --- a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala @@ -18,6 +18,8 @@ package io.qbeast.core.model import io.qbeast.IISeq import org.apache.spark.sql.Dataset +import scala.collection.mutable + /** * A snapshot of the Qbeast table state. */ @@ -29,6 +31,18 @@ trait QbeastSnapshot { */ def isInitial: Boolean + /** + * The current table description. + * @return + */ + def loadDescription: String + + /** + * The current table properties of the snapshot. + * @return + */ + def loadProperties: mutable.Map[String, String] + /** * Load methods */ diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 5649d38de..0887fb41a 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -25,6 +25,8 @@ import org.apache.spark.sql.functions.lit import org.apache.spark.sql.AnalysisExceptionFactory import org.apache.spark.sql.Dataset +import scala.collection.mutable + /** * Qbeast Snapshot that provides information about the current index state. * @@ -44,6 +46,19 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) private val metadataMap: Map[String, String] = snapshot.metadata.configuration + /** + * The current table properties of the snapshot. + * + * @return + */ + override def loadProperties: mutable.Map[String, String] = snapshot.getProperties + + /** + * The current table description. + * @return + */ + override def loadDescription: String = snapshot.metadata.description + /** * Constructs revision dictionary * @@ -195,4 +210,5 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) * Loads Staging AddFiles */ def loadStagingFiles(): Dataset[AddFile] = stagingFiles() + } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala index b4e9a665c..455263a54 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala @@ -293,9 +293,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal ifExists = true).run(spark) // Other cases not handled yet - case _ => return getSessionCatalog().alterTable(ident, changes: _*) + case _ => } + // Update session catalog with changes + getSessionCatalog().alterTable(ident, changes: _*) loadTable(ident) } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala index 1113dbc5d..8a748cb9b 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala @@ -15,16 +15,20 @@ */ package io.qbeast.spark.internal.sources.v2 +import io.qbeast.context.QbeastContext import io.qbeast.core.model.QTableID import io.qbeast.spark.internal.sources.QbeastBaseRelation import io.qbeast.spark.table.IndexedTableFactory import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.CatalogUtils import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.connector.catalog.SupportsWrite import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.catalog.TableCapability import org.apache.spark.sql.connector.catalog.TableCapability._ +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.connector.write.LogicalWriteInfo import org.apache.spark.sql.connector.write.WriteBuilder import org.apache.spark.sql.sources.BaseRelation @@ -65,11 +69,15 @@ case class QbeastTableImpl( private val indexedTable = tableFactory.getIndexedTable(tableId) + private lazy val spark = SparkSession.active + + private lazy val initialSnapshot = QbeastContext.metadataManager.loadSnapshot(tableId) + private lazy val table: CatalogTable = if (catalogTable.isDefined) catalogTable.get else { // Get table Metadata if no catalog table is provided - SparkSession.active.sessionState.catalog + spark.sessionState.catalog .getTableMetadata(tableIdentifier) } @@ -90,7 +98,25 @@ case class QbeastTableImpl( QbeastBaseRelation.forQbeastTableWithOptions(indexedTable, properties().asScala.toMap) } - override def properties(): util.Map[String, String] = options.asJava + override def properties(): util.Map[String, String] = { + val description = initialSnapshot.loadDescription + val base = initialSnapshot.loadProperties + base.put(TableCatalog.PROP_PROVIDER, "qbeast") + base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri)) + catalogTable.foreach { table => + if (table.owner != null && table.owner.nonEmpty) { + base.put(TableCatalog.PROP_OWNER, table.owner) + } + v1Table.storage.properties.foreach { case (key, value) => + base.put(TableCatalog.OPTION_PREFIX + key, value) + } + if (v1Table.tableType == CatalogTableType.EXTERNAL) { + base.put(TableCatalog.PROP_EXTERNAL, "true") + } + } + Option(description).foreach(base.put(TableCatalog.PROP_COMMENT, _)) + base.asJava + } override def v1Table: CatalogTable = table diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index 7583b9117..187b89924 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -16,6 +16,7 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.spark.QbeastIntegrationTestSpec +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.AnalysisException import org.apache.spark.SparkConf @@ -235,16 +236,33 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo }) + it should "create table with properties" in withQbeastContextSparkAndTmpWarehouse( + (spark, tmpDir) => { + + spark.sql( + "CREATE TABLE student (id INT, name STRING, age INT)" + + " USING qbeast TBLPROPERTIES ('k' = 'v', 'columnsToIndex'='id')") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("student")) + val snapshot = deltaLog.update() + val properties = snapshot.getProperties + val catalog = spark.sessionState.catalog + val catalogProperties = catalog.getTableMetadata(TableIdentifier("student")).properties + properties should contain key "k" + properties("k") shouldBe "v" + catalogProperties should contain key "k" + catalogProperties("k") shouldBe "v" + + }) + it should "persist altered properties on the _delta_log" in withQbeastContextSparkAndTmpWarehouse( (spark, tmpDir) => { - val df = loadTestData(spark) - df.write.format("qbeast").option("columnsToIndex", "user_id,price").save(tmpDir) - spark.sql(s"CREATE TABLE t1 USING qbeast LOCATION '$tmpDir'") + spark.sql(s"CREATE TABLE t1(id INT) USING qbeast TBLPROPERTIES ('columnsToIndex'= 'id')") spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") // Check the delta log info - val deltaLog = DeltaLog.forTable(spark, tmpDir) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1")) val snapshot = deltaLog.update() val properties = snapshot.getProperties @@ -253,4 +271,65 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo }) + it should "persist UNSET properties in _delta_catalog" in withQbeastContextSparkAndTmpWarehouse( + (spark, _) => { + + spark.sql( + s"CREATE TABLE t1(id INT) " + + s"USING qbeast " + + s"TBLPROPERTIES ('columnsToIndex'= 'id')") + + spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1")) + val snapshot = deltaLog.update() + val properties = snapshot.getProperties + + properties should contain key "k" + properties("k") shouldBe "v" + + spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')") + + // Check the delta log info + val updatedProperties = deltaLog.update().getProperties + updatedProperties should not contain key("k") + }) + + it should "ensure consistency with the session catalog" in withQbeastContextSparkAndTmpWarehouse( + (spark, tmpDir) => { + + import spark.implicits._ + + spark.sql( + s"CREATE TABLE t1(id INT) " + + s"USING qbeast " + + s"TBLPROPERTIES ('columnsToIndex'= 'id')") + spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") + // Check the delta log info + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1")) + val catalog = spark.sessionState.catalog + val showProperties = spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap + + val snapshot = deltaLog.update() + val properties = snapshot.getProperties + val catalogProperties = + catalog.getTableMetadata(TableIdentifier("t1")).properties + properties should contain key "k" + catalogProperties should contain key ("k") + showProperties should contain key ("k") + + spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')") + // Check the delta log info + val updatedSnapshot = deltaLog.update() + val updatedProperties = updatedSnapshot.getProperties + val updatedCatalogProperties = + catalog.getTableMetadata(TableIdentifier("t1")).properties + val updatedShowProperties = + spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap + + updatedProperties should not contain key("k") + updatedCatalogProperties should not contain key("k") + updatedShowProperties should not contain key("k") + }) + } diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala index 6297293bc..5bd03f609 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala @@ -100,6 +100,21 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite qbeastCatalog.loadTable(tableIdentifier).columns() shouldBe newColumns }) + it should "create table with properties" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { + val qbeastCatalog = createQbeastCatalog(spark) + val tableIdentifier = Identifier.of(defaultNamespace, "student") + qbeastCatalog.createTable( + tableIdentifier, + columns, + Array.empty[Transform], + Map("newProperty" -> "newValue").asJava) + + qbeastCatalog + .loadTable(Identifier.of(defaultNamespace, "student")) + .properties() + .asScala should contain("newProperty" -> "newValue") + }) + it should "list tables" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { val qbeastCatalog = createQbeastCatalog(spark) qbeastCatalog.listTables(defaultNamespace) shouldBe Array() @@ -145,6 +160,25 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite .asScala should contain("newProperty" -> "newValue") }) + it should "unset properties" in withQbeastContextSparkAndTmpDir((spark, _) => { + val qbeastCatalog = createQbeastCatalog(spark) + val tableIdentifier = Identifier.of(defaultNamespace, "student") + qbeastCatalog.createTable( + tableIdentifier, + columns, + Array.empty[Transform], + Map("newProperty" -> "newValue").asJava) + + val unsetPropertiesChange = TableChange.removeProperty("newProperty") + // Alter table with new information + qbeastCatalog.alterTable(tableIdentifier, unsetPropertiesChange) + + qbeastCatalog + .loadTable(Identifier.of(defaultNamespace, "student")) + .properties() + .asScala should not contain ("newProperty" -> "newValue") + }) + it should "drop table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { val qbeastCatalog = createQbeastCatalog(spark) val tableIdentifier = Identifier.of(defaultNamespace, "student") From 4d45675c18bfe726abd8ca4339cb7ffec4c5688d Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Fri, 26 Jul 2024 16:17:19 +0200 Subject: [PATCH 02/11] Format --- .../spark/delta/DeltaQbeastSnapshot.scala | 2 +- .../QbeastCatalogIntegrationTest.scala | 19 ------------------- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 0887fb41a..786a3603c 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -55,7 +55,7 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) /** * The current table description. - * @return + * @return */ override def loadDescription: String = snapshot.metadata.description diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index 187b89924..fefc16c98 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -236,25 +236,6 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo }) - it should "create table with properties" in withQbeastContextSparkAndTmpWarehouse( - (spark, tmpDir) => { - - spark.sql( - "CREATE TABLE student (id INT, name STRING, age INT)" + - " USING qbeast TBLPROPERTIES ('k' = 'v', 'columnsToIndex'='id')") - - val deltaLog = DeltaLog.forTable(spark, TableIdentifier("student")) - val snapshot = deltaLog.update() - val properties = snapshot.getProperties - val catalog = spark.sessionState.catalog - val catalogProperties = catalog.getTableMetadata(TableIdentifier("student")).properties - properties should contain key "k" - properties("k") shouldBe "v" - catalogProperties should contain key "k" - catalogProperties("k") shouldBe "v" - - }) - it should "persist altered properties on the _delta_log" in withQbeastContextSparkAndTmpWarehouse( (spark, tmpDir) => { From e48e33141f05dd765773f2a1a0aa7a365b68adc8 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Fri, 26 Jul 2024 16:26:33 +0200 Subject: [PATCH 03/11] scalafix --- .../catalog/QbeastCatalogIntegrationTest.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala index fefc16c98..5538ee90c 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogIntegrationTest.scala @@ -239,7 +239,7 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo it should "persist altered properties on the _delta_log" in withQbeastContextSparkAndTmpWarehouse( (spark, tmpDir) => { - spark.sql(s"CREATE TABLE t1(id INT) USING qbeast TBLPROPERTIES ('columnsToIndex'= 'id')") + spark.sql("CREATE TABLE t1(id INT) USING qbeast TBLPROPERTIES ('columnsToIndex'= 'id')") spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") // Check the delta log info @@ -256,9 +256,9 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo (spark, _) => { spark.sql( - s"CREATE TABLE t1(id INT) " + - s"USING qbeast " + - s"TBLPROPERTIES ('columnsToIndex'= 'id')") + "CREATE TABLE t1(id INT) " + + "USING qbeast " + + "TBLPROPERTIES ('columnsToIndex'= 'id')") spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") @@ -282,9 +282,9 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo import spark.implicits._ spark.sql( - s"CREATE TABLE t1(id INT) " + - s"USING qbeast " + - s"TBLPROPERTIES ('columnsToIndex'= 'id')") + "CREATE TABLE t1(id INT) " + + "USING qbeast " + + "TBLPROPERTIES ('columnsToIndex'= 'id')") spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')") // Check the delta log info val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1")) From 7f2c0a45b39b4175fb63d150049bdc008dcf19d7 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Fri, 26 Jul 2024 16:32:08 +0200 Subject: [PATCH 04/11] Format again --- .../sources/catalog/QbeastCatalogTest.scala | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala index 5bd03f609..51015d225 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala @@ -100,20 +100,21 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite qbeastCatalog.loadTable(tableIdentifier).columns() shouldBe newColumns }) - it should "create table with properties" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { - val qbeastCatalog = createQbeastCatalog(spark) - val tableIdentifier = Identifier.of(defaultNamespace, "student") - qbeastCatalog.createTable( - tableIdentifier, - columns, - Array.empty[Transform], - Map("newProperty" -> "newValue").asJava) + it should "create table with properties" in withQbeastContextSparkAndTmpWarehouse( + (spark, _) => { + val qbeastCatalog = createQbeastCatalog(spark) + val tableIdentifier = Identifier.of(defaultNamespace, "student") + qbeastCatalog.createTable( + tableIdentifier, + columns, + Array.empty[Transform], + Map("newProperty" -> "newValue").asJava) - qbeastCatalog - .loadTable(Identifier.of(defaultNamespace, "student")) - .properties() - .asScala should contain("newProperty" -> "newValue") - }) + qbeastCatalog + .loadTable(Identifier.of(defaultNamespace, "student")) + .properties() + .asScala should contain("newProperty" -> "newValue") + }) it should "list tables" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { val qbeastCatalog = createQbeastCatalog(spark) From c752e5f4e4c244a5c7515921026703dad0832284 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Fri, 26 Jul 2024 17:45:41 +0200 Subject: [PATCH 05/11] Fix tests and properties parsing --- .../qbeast/spark/delta/DeltaQbeastSnapshot.scala | 4 +++- .../internal/sources/v2/QbeastTableImpl.scala | 4 ++++ .../internal/sources/QbeastTableImplTest.scala | 5 ++++- .../sources/catalog/QbeastCatalogTest.scala | 16 ---------------- .../spark/utils/QbeastSQLIntegrationTest.scala | 2 +- 5 files changed, 12 insertions(+), 19 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 786a3603c..173bb4382 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -51,7 +51,9 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) * * @return */ - override def loadProperties: mutable.Map[String, String] = snapshot.getProperties + override def loadProperties: mutable.Map[String, String] = + snapshot.getProperties.filter(prop => + !(prop._1.startsWith(MetadataConfig.revision) || prop._1.startsWith("delta"))) /** * The current table description. diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala index 8a748cb9b..307fc84cb 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala @@ -101,6 +101,10 @@ case class QbeastTableImpl( override def properties(): util.Map[String, String] = { val description = initialSnapshot.loadDescription val base = initialSnapshot.loadProperties + options.foreach { + case (key, value) if !base.contains(key) => + base.put(key, value) + } base.put(TableCatalog.PROP_PROVIDER, "qbeast") base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri)) catalogTable.foreach { table => diff --git a/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala index 25ed14b9d..a1da08258 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/QbeastTableImplTest.scala @@ -84,7 +84,10 @@ class QbeastTableImplTest extends QbeastIntegrationTestSpec with CatalogTestSuit None, indexedTableFactory) - qbeastTableImpl.properties() shouldBe properties.asJava + val qbeastTableProperties = qbeastTableImpl.properties() + + qbeastTableProperties.get("provider") shouldBe "qbeast" + qbeastTableProperties.get("columnsToIndex") shouldBe "id" }) diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala index 51015d225..12195d5c8 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala @@ -100,22 +100,6 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite qbeastCatalog.loadTable(tableIdentifier).columns() shouldBe newColumns }) - it should "create table with properties" in withQbeastContextSparkAndTmpWarehouse( - (spark, _) => { - val qbeastCatalog = createQbeastCatalog(spark) - val tableIdentifier = Identifier.of(defaultNamespace, "student") - qbeastCatalog.createTable( - tableIdentifier, - columns, - Array.empty[Transform], - Map("newProperty" -> "newValue").asJava) - - qbeastCatalog - .loadTable(Identifier.of(defaultNamespace, "student")) - .properties() - .asScala should contain("newProperty" -> "newValue") - }) - it should "list tables" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { val qbeastCatalog = createQbeastCatalog(spark) qbeastCatalog.listTables(defaultNamespace) shouldBe Array() diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala index 832e69201..3f77bfef9 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala @@ -59,7 +59,7 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { .where("col_name == 'Table Properties'") .select("data_type") .first() - .getString(0) shouldBe "[columnsToIndex=id,option.columnsToIndex=id]" + .getString(0) should include("columnsToIndex=id,option.columnsToIndex=id") }) From f2660804f38bde711c2650d4e3e54657828577de Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 29 Jul 2024 07:19:20 +0200 Subject: [PATCH 06/11] missing matching case --- .../io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala index 307fc84cb..27bcf9dbe 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala @@ -104,6 +104,7 @@ case class QbeastTableImpl( options.foreach { case (key, value) if !base.contains(key) => base.put(key, value) + case _ => // do nothing } base.put(TableCatalog.PROP_PROVIDER, "qbeast") base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri)) From 1d01d5f1821f44e073ed205c9a9867757a6b2938 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 29 Jul 2024 07:21:08 +0200 Subject: [PATCH 07/11] Use tmpWarehouse for test --- .../spark/internal/sources/catalog/QbeastCatalogTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala index 12195d5c8..d22770540 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala @@ -145,7 +145,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite .asScala should contain("newProperty" -> "newValue") }) - it should "unset properties" in withQbeastContextSparkAndTmpDir((spark, _) => { + it should "unset properties" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { val qbeastCatalog = createQbeastCatalog(spark) val tableIdentifier = Identifier.of(defaultNamespace, "student") qbeastCatalog.createTable( From 4232f6b0a76db256e5f35af908f1d9ef824f105b Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 29 Jul 2024 07:30:49 +0200 Subject: [PATCH 08/11] Filter only revision info --- .../scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 173bb4382..95b4fea92 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -49,11 +49,12 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) /** * The current table properties of the snapshot. * + * We filter out the revision, leaving only Revision ID. + * * @return */ override def loadProperties: mutable.Map[String, String] = - snapshot.getProperties.filter(prop => - !(prop._1.startsWith(MetadataConfig.revision) || prop._1.startsWith("delta"))) + snapshot.getProperties.filter(prop => !prop._1.startsWith(MetadataConfig.revision)) /** * The current table description. From 6a5dd42a5075813410e621f773ef9d12ca5e5156 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 29 Jul 2024 08:03:50 +0200 Subject: [PATCH 09/11] Separated options --- .../io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala index 3f77bfef9..5540e8eb4 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala @@ -55,11 +55,13 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { .first() .getString(0) shouldBe "qbeast" // Check Table Properties - table + val tableProperties = table .where("col_name == 'Table Properties'") .select("data_type") .first() - .getString(0) should include("columnsToIndex=id,option.columnsToIndex=id") + .getString(0) + tableProperties should include("columnsToIndex=id") + tableProperties should include("option.columnsToIndex=id") }) From c987b2f01b37b2e52405456ab418bd92ac68d5c7 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 30 Jul 2024 11:30:50 +0200 Subject: [PATCH 10/11] Use filterKeys --- src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala | 2 +- .../scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala | 4 ++-- .../io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala index 97b889fa0..98865c0df 100644 --- a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala @@ -41,7 +41,7 @@ trait QbeastSnapshot { * The current table properties of the snapshot. * @return */ - def loadProperties: mutable.Map[String, String] + def loadProperties: Map[String, String] /** * Load methods diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 95b4fea92..16676b57a 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -53,8 +53,8 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot) * * @return */ - override def loadProperties: mutable.Map[String, String] = - snapshot.getProperties.filter(prop => !prop._1.startsWith(MetadataConfig.revision)) + override def loadProperties: Map[String, String] = + snapshot.getProperties.filterKeys(k => !k.startsWith(MetadataConfig.revision)).toMap /** * The current table description. diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala index 27bcf9dbe..4647b2723 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastTableImpl.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.V2toV1Fallback import java.util +import scala.collection.mutable import scala.collection.JavaConverters._ /** @@ -100,7 +101,7 @@ case class QbeastTableImpl( override def properties(): util.Map[String, String] = { val description = initialSnapshot.loadDescription - val base = initialSnapshot.loadProperties + val base = mutable.Map() ++ initialSnapshot.loadProperties options.foreach { case (key, value) if !base.contains(key) => base.put(key, value) From 7720f4f035e091505c84a46f546dd0d9094e30b3 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 30 Jul 2024 11:32:32 +0200 Subject: [PATCH 11/11] Unused import --- src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala | 2 -- src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala index 98865c0df..3cb965928 100644 --- a/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala @@ -18,8 +18,6 @@ package io.qbeast.core.model import io.qbeast.IISeq import org.apache.spark.sql.Dataset -import scala.collection.mutable - /** * A snapshot of the Qbeast table state. */ diff --git a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala index 16676b57a..41825d405 100644 --- a/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala +++ b/src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala @@ -25,8 +25,6 @@ import org.apache.spark.sql.functions.lit import org.apache.spark.sql.AnalysisExceptionFactory import org.apache.spark.sql.Dataset -import scala.collection.mutable - /** * Qbeast Snapshot that provides information about the current index state. *