Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 366: TBLPROPERTIES consistency between log, catalog and Qbeast internals #367

Merged
merged 11 commits into from
Jul 30, 2024
12 changes: 12 additions & 0 deletions src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ trait QbeastSnapshot {
*/
def isInitial: Boolean

/**
* The current table description.
* @return
*/
def loadDescription: String

/**
* The current table properties of the snapshot.
* @return
*/
def loadProperties: Map[String, String]

/**
* Load methods
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)

private val metadataMap: Map[String, String] = snapshot.metadata.configuration

/**
* The current table properties of the snapshot.
*
* We filter out the revision, leaving only Revision ID.
*
* @return
*/
override def loadProperties: Map[String, String] =
snapshot.getProperties.filterKeys(k => !k.startsWith(MetadataConfig.revision)).toMap

/**
* The current table description.
* @return
*/
override def loadDescription: String = snapshot.metadata.description

/**
* Constructs revision dictionary
*
Expand Down Expand Up @@ -195,4 +211,5 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
* Loads Staging AddFiles
*/
def loadStagingFiles(): Dataset[AddFile] = stagingFiles()

}
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
ifExists = true).run(spark)

// Other cases not handled yet
case _ => return getSessionCatalog().alterTable(ident, changes: _*)
case _ =>
}

// Update session catalog with changes
getSessionCatalog().alterTable(ident, changes: _*)
loadTable(ident)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
*/
package io.qbeast.spark.internal.sources.v2

import io.qbeast.context.QbeastContext
import io.qbeast.core.model.QTableID
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.sources.BaseRelation
Expand All @@ -33,6 +37,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.V2toV1Fallback

import java.util
import scala.collection.mutable
import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -65,11 +70,15 @@ case class QbeastTableImpl(

private val indexedTable = tableFactory.getIndexedTable(tableId)

private lazy val spark = SparkSession.active

private lazy val initialSnapshot = QbeastContext.metadataManager.loadSnapshot(tableId)

private lazy val table: CatalogTable =
if (catalogTable.isDefined) catalogTable.get
else {
// Get table Metadata if no catalog table is provided
SparkSession.active.sessionState.catalog
spark.sessionState.catalog
.getTableMetadata(tableIdentifier)
}

Expand All @@ -90,7 +99,30 @@ case class QbeastTableImpl(
QbeastBaseRelation.forQbeastTableWithOptions(indexedTable, properties().asScala.toMap)
}

override def properties(): util.Map[String, String] = options.asJava
override def properties(): util.Map[String, String] = {
val description = initialSnapshot.loadDescription
val base = mutable.Map() ++ initialSnapshot.loadProperties
options.foreach {
case (key, value) if !base.contains(key) =>
base.put(key, value)
case _ => // do nothing
}
base.put(TableCatalog.PROP_PROVIDER, "qbeast")
base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri))
catalogTable.foreach { table =>
if (table.owner != null && table.owner.nonEmpty) {
base.put(TableCatalog.PROP_OWNER, table.owner)
}
v1Table.storage.properties.foreach { case (key, value) =>
base.put(TableCatalog.OPTION_PREFIX + key, value)
}
if (v1Table.tableType == CatalogTableType.EXTERNAL) {
base.put(TableCatalog.PROP_EXTERNAL, "true")
}
}
Option(description).foreach(base.put(TableCatalog.PROP_COMMENT, _))
base.asJava
}

override def v1Table: CatalogTable = table

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ class QbeastTableImplTest extends QbeastIntegrationTestSpec with CatalogTestSuit
None,
indexedTableFactory)

qbeastTableImpl.properties() shouldBe properties.asJava
val qbeastTableProperties = qbeastTableImpl.properties()

qbeastTableProperties.get("provider") shouldBe "qbeast"
qbeastTableProperties.get("columnsToIndex") shouldBe "id"

})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.spark.QbeastIntegrationTestSpec
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.AnalysisException
import org.apache.spark.SparkConf
Expand Down Expand Up @@ -238,13 +239,11 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo
it should "persist altered properties on the _delta_log" in withQbeastContextSparkAndTmpWarehouse(
(spark, tmpDir) => {

val df = loadTestData(spark)
df.write.format("qbeast").option("columnsToIndex", "user_id,price").save(tmpDir)
spark.sql(s"CREATE TABLE t1 USING qbeast LOCATION '$tmpDir'")
spark.sql("CREATE TABLE t1(id INT) USING qbeast TBLPROPERTIES ('columnsToIndex'= 'id')")
spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')")

// Check the delta log info
val deltaLog = DeltaLog.forTable(spark, tmpDir)
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1"))
val snapshot = deltaLog.update()
val properties = snapshot.getProperties

Expand All @@ -253,4 +252,65 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo

})

it should "persist UNSET properties in _delta_catalog" in withQbeastContextSparkAndTmpWarehouse(
(spark, _) => {

spark.sql(
"CREATE TABLE t1(id INT) " +
"USING qbeast " +
"TBLPROPERTIES ('columnsToIndex'= 'id')")

spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')")

val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1"))
val snapshot = deltaLog.update()
val properties = snapshot.getProperties

properties should contain key "k"
properties("k") shouldBe "v"

spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')")

// Check the delta log info
val updatedProperties = deltaLog.update().getProperties
updatedProperties should not contain key("k")
})

it should "ensure consistency with the session catalog" in withQbeastContextSparkAndTmpWarehouse(
(spark, tmpDir) => {

import spark.implicits._

spark.sql(
"CREATE TABLE t1(id INT) " +
"USING qbeast " +
"TBLPROPERTIES ('columnsToIndex'= 'id')")
spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')")
// Check the delta log info
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1"))
val catalog = spark.sessionState.catalog
val showProperties = spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap

val snapshot = deltaLog.update()
val properties = snapshot.getProperties
val catalogProperties =
catalog.getTableMetadata(TableIdentifier("t1")).properties
properties should contain key "k"
catalogProperties should contain key ("k")
showProperties should contain key ("k")

spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')")
// Check the delta log info
val updatedSnapshot = deltaLog.update()
val updatedProperties = updatedSnapshot.getProperties
val updatedCatalogProperties =
catalog.getTableMetadata(TableIdentifier("t1")).properties
val updatedShowProperties =
spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap

updatedProperties should not contain key("k")
updatedCatalogProperties should not contain key("k")
updatedShowProperties should not contain key("k")
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite
.asScala should contain("newProperty" -> "newValue")
})

it should "unset properties" in withQbeastContextSparkAndTmpWarehouse((spark, _) => {
val qbeastCatalog = createQbeastCatalog(spark)
val tableIdentifier = Identifier.of(defaultNamespace, "student")
qbeastCatalog.createTable(
tableIdentifier,
columns,
Array.empty[Transform],
Map("newProperty" -> "newValue").asJava)

val unsetPropertiesChange = TableChange.removeProperty("newProperty")
// Alter table with new information
qbeastCatalog.alterTable(tableIdentifier, unsetPropertiesChange)

qbeastCatalog
.loadTable(Identifier.of(defaultNamespace, "student"))
.properties()
.asScala should not contain ("newProperty" -> "newValue")
})

it should "drop table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => {
val qbeastCatalog = createQbeastCatalog(spark)
val tableIdentifier = Identifier.of(defaultNamespace, "student")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec {
.first()
.getString(0) shouldBe "qbeast"
// Check Table Properties
table
val tableProperties = table
.where("col_name == 'Table Properties'")
.select("data_type")
.first()
.getString(0) shouldBe "[columnsToIndex=id,option.columnsToIndex=id]"
.getString(0)
tableProperties should include("columnsToIndex=id")
tableProperties should include("option.columnsToIndex=id")

})

Expand Down
Loading