Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rest Catalog: spark catalog api fails to work with rest based catalog #11741

Open
1 of 3 tasks
sunny1154 opened this issue Dec 10, 2024 · 10 comments
Open
1 of 3 tasks

Rest Catalog: spark catalog api fails to work with rest based catalog #11741

sunny1154 opened this issue Dec 10, 2024 · 10 comments
Labels
bug Something isn't working

Comments

@sunny1154
Copy link

Apache Iceberg version

1.5.0

Query engine

Spark

Please describe the bug 🐞

Hi,

I am observing issues when working with rest based catalog.
my spark session has default catalog defined which is based of REST based catalog.

SparkSession.catalog api fails to work with rest based catalog.
tested with Spark 3.4.

${SPARK_HOME}/bin/spark-shell --master local[*]
--driver-memory 2g
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
--conf spark.sql.catalog.iceberg.uri=https://xx.xxx.xxxx.domain.com
--conf spark.sql.warehouse.dir=$SQL_WAREHOUSE_DIR
--conf spark.sql.defaultCatalog=iceberg
--conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.rest.RESTCatalog \

scala> spark.catalog.currentCatalog
res1: String = iceberg

scala> spark.sql("select * from restDb.restTable").show
+---+----------+
| id| data|
+---+----------+
| 1|some_value|
+---+----------+

scala> spark.catalog.tableExists("restDb.restTable")
res3: Boolean = true

scala> spark.catalog.tableExists("restDb", "restTable")
res4: Boolean = false

other API also fail like

spark.catalog.getTable("restDb", "restTable")
-- fails with database not found
spark.catalog.getTable("restDb.restTable")
-- returns table object

spark.catalog.tableExists("restDb", "restTable")
-- return false (even though table exists)
spark.catalog.tableExists("restDb.restTable")
-- return true (if table exists and registered with rest catalog)

spark.catalog.listColumns("restDb", "restTable")
-- fails with database not found
spark.catalog.listColumns("restDb.restTable")
-- return list of columns

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@sunny1154 sunny1154 added the bug Something isn't working label Dec 10, 2024
@sunny1154 sunny1154 changed the title Rest Catalog: spark catalog api fails working rest based catalog is used Rest Catalog: spark catalog api fails to work with rest based catalog Dec 10, 2024
@dramaticlly
Copy link
Contributor

dramaticlly commented Dec 11, 2024

I think use this we can reproduce the problem in a unit test from org.apache.iceberg.spark.sql.TestCreateTable as it support all of hive/spark/hadoop/rest catalog, I think rest and hadoop catalog fails while hive and spark pass

  @TestTemplate
  public void testCreateTable() {
    assumeThat(catalogName).isEqualTo(SparkCatalogConfig.REST.catalogName());
    assertThat(validationCatalog.tableExists(tableIdent))
        .as("Table should not already exist")
        .isFalse();

    sql("CREATE TABLE %s (id BIGINT NOT NULL, data STRING) USING iceberg", tableName);

    Table table = validationCatalog.loadTable(tableIdent);
    assertThat(table).as("Should load the new table").isNotNull();

    StructType expectedSchema =
        StructType.of(
            NestedField.required(1, "id", Types.LongType.get()),
            NestedField.optional(2, "data", Types.StringType.get()));
    assertThat(table.schema().asStruct())
        .as("Should have the expected schema")
        .isEqualTo(expectedSchema);
    assertThat(table.spec().fields()).as("Should not be partitioned").hasSize(0);
    assertThat(table.properties().get(TableProperties.DEFAULT_FILE_FORMAT))
        .as("Should not have the default format set")
        .isNull();

    spark.sessionState().catalogManager().setCurrentCatalog(catalogName);
    assertThat(spark.catalog().tableExists(tableIdent.toString())).isTrue();  // success
    assertThat(spark.catalog().tableExists(tableIdent.namespace().toString(), tableIdent.name())).isTrue(); //failure
  }

I am wondering if anyone run into such when using REST base catalog?

CC @RussellSpitzer @flyrain

@dramaticlly
Copy link
Contributor

FYI @szehon-ho @huaxingao and @stevenzwu

@huaxingao
Copy link
Contributor

@dramaticlly Thanks for pinging me. This seems to be a Spark bug. I'll investigate further.

@flyrain
Copy link
Contributor

flyrain commented Dec 11, 2024

Yeah, it's more like a Spark bug, probably the Iceberg REST catalog didn't impl this method in class org.apache.spark.sql.catalog.Catalog well.

 public abstract boolean tableExists(final String dbName, final String tableName);

@huaxingao
Copy link
Contributor

After taking a closer look at the Java Doc, I found that the API is intended only for the Hive Metastore. As specified in the Java Doc

To check existence of table/view in other catalogs, please use `tableExists(tableName)` with qualified table/view name instead.

It's a user error to use tableExists(final String dbName, final String tableName) for rest catalog.

@dramaticlly
Copy link
Contributor

After taking a closer look at the Java Doc, I found that the API is intended only for the Hive Metastore. As specified in the Java Doc

To check existence of table/view in other catalogs, please use `tableExists(tableName)` with qualified table/view name instead.

It's a user error to use tableExists(final String dbName, final String tableName) for rest catalog.

thank you @huaxingao and appreciated your finding,

FYI @sunny1154 @stevenzwu looks like spark also explicitly mentioned these function is only meant for hive metastore

  • listColumns(dbName: String, tableName: String)
  • functionExists(dbName: String, functionName: String)
  • getTable(dbName: String, tableName: String)
  • getFunction(dbName: String, functionName: String)

@sunny1154
Copy link
Author

thanks @huaxingao for looking into this.

is spark.sessionState.catalog.getTableMetadata(TableIdentifier(table, Some(database))) also expected to work with HMS? currently with spark 3.4, spark.sessionState.catalog is also only working with HMS and ignore default catalog if default catalog is REST Based catalog and not HMS based.

I can open another bug for spark.sessionState.catalog.getTableMetadata if you think it should work with both rest based catalog and hms.

@kazuyukitanimura
Copy link

kazuyukitanimura commented Dec 11, 2024

Just to add @huaxingao's point

tableExists(dbName: String, tableName: String): Boolean
it is meant to be only for the hardcoded spark_catalog. But looks like @sunny1154 is trying to access iceberg catalog.

For the Spark master branch (future Spark 4.0)
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala#L551

  override def tableExists(dbName: String, tableName: String): Boolean = {
    if (sessionCatalog.isGlobalTempViewDB(dbName)) {
      tableExists(Seq(dbName, tableName))
    } else {
      // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the
      // Hive Metastore.
      tableExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName))
    }
  }

this is due to backward compatibility. Is it possible for you to to just use the dot notation?

@kazuyukitanimura
Copy link

@sunny1154 I think you would need to specify the catalog in TableIdentifier()

Otherwise, Spark tries to use spark_catalog
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L192

  private def getCatalog(ident: CatalystIdentifier): Option[String] = {
    if (conf.getConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME)) {
      ident.catalog
    } else {
      Some(format(ident.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME)))
    }
  }

@huaxingao
Copy link
Contributor

@sunny1154 spark.sessionState.catalog.getTableMetadata(TableIdentifier(table, Some(database))) also doesn't work with rest catalog

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants