Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE REQUEST] Add Support for S3A prefix #214

Open
TheerachotTle opened this issue Aug 27, 2024 · 15 comments
Open

[FEATURE REQUEST] Add Support for S3A prefix #214

TheerachotTle opened this issue Aug 27, 2024 · 15 comments
Labels
enhancement New feature or request

Comments

@TheerachotTle
Copy link

Is your feature request related to a problem? Please describe.

I have set the allowed location of the created catalog to S3 storage type using s3:// prefix. When I run remove_orphan_files procedure in Spark, it results in an error message: No FileSystem for scheme "s3". To solve this problem, I attempted to create the catalog with the s3a:// prefix, but I received a 400 Bad Request error with the message: Location prefix not allowed.
Here's my spark configuration

spark = SparkSession.builder \
            .config("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,org.apache.hadoop:hadoop-aws:3.4.0,org.apache.hadoop:hadoop-common:3.4.0") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config('spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation', 'true') \
            .config("spark.sql.catalog.polaris.uri", POLARIS_URI) \
            .config("spark.sql.catalog.polaris.type", "rest") \
            .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.polaris.warehouse", POLARIS_CATALOG_NAME) \
            .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config('spark.sql.catalog.polaris.credential', POLARIS_CREDENTIALS) \
            .config('spark.sql.catalog.polaris.scope', POLARIS_SCOPE) \
            .config('spark.sql.catalog.polaris.token-refresh-enabled', 'true') \
            .getOrCreate()

Describe the solution you'd like

Probably add the s3a:// prefix as an alternative for the S3 storage type.

Describe alternatives you've considered

No response

Additional context

No response

@TheerachotTle TheerachotTle added the enhancement New feature or request label Aug 27, 2024
@flyrain
Copy link
Contributor

flyrain commented Aug 27, 2024

Do other DMLs(e.g., insert, delete)work? Can you share the stack of the error?

@flyrain
Copy link
Contributor

flyrain commented Aug 27, 2024

Can you remove this config and try again?

            .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") 

@TheerachotTle
Copy link
Author

Yes, the other DML commands work as expected, and I also removed the config above, but it still results in an error. This is the code I ran
spark.sql("""CALL polaris.system.remove_orphan_files(table => 'polaris.namespace.table')""").show()
Here's the error.

Py4JJavaError: An error occurred while calling o48.sql.
: java.io.UncheckedIOException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:386)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listedFileDS(DeleteOrphanFilesSparkAction.java:311)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.actualFileIdentDS(DeleteOrphanFilesSparkAction.java:296)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.doExecute(DeleteOrphanFilesSparkAction.java:247)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:59)
at org.apache.iceberg.spark.JobGroupUtils.withJobGroupInfo(JobGroupUtils.java:51)
at org.apache.iceberg.spark.actions.BaseSparkAction.withJobGroupInfo(BaseSparkAction.java:130)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.execute(DeleteOrphanFilesSparkAction.java:223)
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.lambda$call$3(RemoveOrphanFilesProcedure.java:185)
at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:107)
at org.apache.iceberg.spark.procedures.BaseProcedure.withIcebergTable(BaseProcedure.java:96)
at org.apache.iceberg.spark.procedures.RemoveOrphanFilesProcedure.call(RemoveOrphanFilesProcedure.java:139)
at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction.listDirRecursively(DeleteOrphanFilesSparkAction.java:356)
... 55 more

@eric-maynard
Copy link
Contributor

eric-maynard commented Aug 27, 2024

@TheerachotTle I think the issue is this config:

            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \

If you refer to the quickstart guide, it gives an example of Spark configs that can be used to connect to an Iceberg REST catalog.

Having said that, I think s3a support is a reasonable feature request

@mayankvadariya
Copy link

the other DML commands work as expected

if this is specific to remove_orphan_files, lets change the title to reflect it.

@TheerachotTle
Copy link
Author

If you refer to the quickstart guide, it gives an example of Spark configs that can be used to connect to an Iceberg REST catalog.

Removing the config and it still doesn't work. From my understanding, the remove_orphan_files operation involves file listing to determine which files should be removed, and the Spark procedure uses Hadoop FS to perform listing operations.

if this is specific to remove_orphan_files, lets change the title to reflect it.

I have tried this procedure with other Iceberg catalogs, and it has the same problem when using the s3:// prefix. I'm not sure if the title should be changed to be about this procedure?

@flyrain
Copy link
Contributor

flyrain commented Aug 28, 2024

the Spark procedure uses Hadoop FS to perform listing operations.

Yup, I'm guessing the failure is triggered due to procedure is using the Spark Hadoop FS while other DML commands use the FileIO from the iceberg catalog. It more likely a config thing than a bug, but I need to take a close look. Would you share a way to to reproduce it? for example, the spark version and config, and the command used to call the procedure.

@TheerachotTle
Copy link
Author

I'm using spark 3.5.0
create catalog with POST request

{"name": "testcatalog", "type": "INTERNAL", "properties": {
        "default-base-location": "s3://bucket/folder/"
    },"storageConfigInfo": {
        "roleArn": "arn:aws:iam::xxxxxxxxx:role/demo-polaris",
        "storageType": "S3",
        "allowedLocations": [
            "s3://bucket/folder"
        ]
    } }

config of spark

spark = SparkSession.builder \
            .config("spark.jars.packages","org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.apache.iceberg:iceberg-aws-bundle:1.5.2,org.apache.hadoop:hadoop-aws:3.4.0,org.apache.hadoop:hadoop-common:3.4.0") \
            .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
            .config('spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation', 'true') \
            .config("spark.sql.catalog.polaris.uri", POLARIS_URI) \
            .config("spark.sql.catalog.polaris.type", "rest") \
            .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog") \
            .config("spark.sql.catalog.polaris.warehouse", POLARIS_CATALOG_NAME) \
            .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config('spark.sql.catalog.polaris.credential', POLARIS_CREDENTIALS) \
            .config('spark.sql.catalog.polaris.scope', POLARIS_SCOPE) \
            .config('spark.sql.catalog.polaris.token-refresh-enabled', 'true') \
            .getOrCreate()

code to reproduce

spark.sql("USE polaris")
spark.sql("USE NAMESPACE namespace1")
spark.sql("""CREATE TABLE IF NOT EXISTS table1 (
    id bigint NOT NULL COMMENT 'unique id',
    data string)
USING iceberg
LOCATION "s3://bucket/folder/namespace1/table1"
""")
spark.sql("INSERT INTO table1 VALUES (1,'test')")
spark.sql("""CALL polaris.system.remove_orphan_files(
  table => 'polaris.namespace1.table1'
  )
""").show()

@flyrain
Copy link
Contributor

flyrain commented Aug 28, 2024

This is an Iceberg issue instead of a Polaris one. To summarize, DML commands and procedures usually use FileIO object provided by the catalog for read and write files. However, the procedure RemoveOrphanFile uses the Spark configuration to get the FileSystem object for listing, which is a Hadoop s3a File System. It couldn't recognize the s3://. Solutions would be

  1. Using catalog FileIO instead of the File System from Spark config. ResolvingFileIO is the default one used by REST catalog, which delegates to S3FileIO in this case, it supports listPrefix. This requires code change in the procedure.
  2. Using aws s3 client instead of Hadoop s3a client in Spark, I guess this only needs a config change, I'm not familiar with that though. Recommend to check with the Iceberg community.

@anuragmantri
Copy link

Here is another old thread on Iceberg slack about this issue

https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1636652647457600?thread_ts=1636639133.442800&cid=C025PH0G1D4

RemoveOrphanFiles is probably the only procedure that requires HadoopFileSystem in Iceberg, because it has to scan the entire storage and Iceberg’s FileIO interface as of today does not have a list-flavor API

Since listPrefix is now available, maybe we can update the procedure to use FileIO. I will create an issue in Iceberg.

@anuragmantri
Copy link

Oh great! There is already a PR for this.
apache/iceberg#7914

@flyrain
Copy link
Contributor

flyrain commented Aug 29, 2024

Thanks @anuragmantri for chiming in. It'd be ideal to use Iceberg FileIO in removeOrphanFile, so that we don't have to config Spark file system differently, which is a duplication to avoid. I will take a look at the Iceberg PR.

We will still need a workaround at this moment though, as the Iceberg change and release will take a while. You can customize your iceberg lib of course, but not every user is able to do that. @dennishuo mentioned a workaround here. It doesn't work for me locally, but worth to try. cc @TheerachotTle

spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A

@yassan
Copy link

yassan commented Aug 29, 2024

How about replacing s3:// with s3a:// and configuring spark.sql.catalog.polaris.io-imp to use org.apache.iceberg.io.ResolvingFileIO ?

@TheerachotTle
Copy link
Author

How about replacing s3:// with s3a://

Polaris doesn't allow me to create a catalog with this prefix.

spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A

With this config, I can use remove_orphan_files without any error.

@flyrain
Copy link
Contributor

flyrain commented Sep 5, 2024

Let's document it before it is fixed in the Iceberg side, actually it should be documented in Iceberg side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

6 participants