From 623a202cb57bc0bae5f43c8abb25a5e897927926 Mon Sep 17 00:00:00 2001 From: harveyyue Date: Mon, 23 Sep 2024 14:18:48 +0800 Subject: [PATCH] [spark] Fix the loadTable of SparkSource can't recognize correct catalog --- .../spark_connector_configuration.html | 12 +++++++++ .../paimon/spark/SparkConnectorOptions.java | 12 +++++++++ .../org/apache/paimon/spark/SparkSource.scala | 25 +++++++++++++++---- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/docs/layouts/shortcodes/generated/spark_connector_configuration.html b/docs/layouts/shortcodes/generated/spark_connector_configuration.html index 00ca2ba17d39..c315d5d3d3f2 100644 --- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html @@ -74,5 +74,17 @@ Boolean If true, allow to merge data types if the two types meet the rules for explicit casting. + +
database
+ false + String + The read or write database name. + + +
table
+ false + String + The read or write table name. + diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java index 4ddbc6490f42..44988da6ebe8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java @@ -25,6 +25,18 @@ /** Options for spark connector. */ public class SparkConnectorOptions { + public static final ConfigOption DATABASE = + key("database") + .stringType() + .noDefaultValue() + .withDescription("The read or write database name."); + + public static final ConfigOption TABLE = + key("table") + .stringType() + .noDefaultValue() + .withDescription("The read or write table name."); + public static final ConfigOption MERGE_SCHEMA = key("write.merge-schema") .booleanType() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala index 8ea2c31bc8f6..d688da75b8ed 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala @@ -18,13 +18,14 @@ package org.apache.paimon.spark -import org.apache.paimon.catalog.CatalogContext +import org.apache.paimon.catalog.{CatalogContext, CatalogFactory, Identifier} import org.apache.paimon.options.Options import org.apache.paimon.spark.commands.WriteIntoPaimonTable import org.apache.paimon.spark.sources.PaimonSink import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory} import org.apache.paimon.table.system.AuditLogTable +import org.apache.paimon.utils.StringUtils import org.apache.spark.sql.{DataFrame, SaveMode => SparkSaveMode, SparkSession, SQLContext} import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, Table} @@ -79,12 +80,26 @@ class SparkSource SparkSource.toBaseRelation(table, sqlContext) } - private def loadTable(options: JMap[String, String]): DataTable = { + private def loadTable(parameters: JMap[String, String]): DataTable = { val catalogContext = CatalogContext.create( - Options.fromMap(mergeSQLConf(options)), + Options.fromMap(mergeSQLConf(parameters)), SparkSession.active.sessionState.newHadoopConf()) - val table = FileStoreTableFactory.create(catalogContext) - if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) { + val options = Options.fromMap(parameters) + val databaseName = options.get(SparkConnectorOptions.DATABASE) + val tableName = options.get(SparkConnectorOptions.TABLE) + val table = + if ( + !StringUtils.isNullOrWhitespaceOnly(databaseName) && !StringUtils.isNullOrWhitespaceOnly( + tableName) + ) { + CatalogFactory + .createCatalog(catalogContext) + .getTable(new Identifier(databaseName, tableName)) + .asInstanceOf[FileStoreTable] + } else { + FileStoreTableFactory.create(catalogContext) + } + if (options.get(SparkConnectorOptions.READ_CHANGELOG)) { new AuditLogTable(table) } else { table