Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][PySpark] Vacuum operation is too slow on very large delta table in S3 #4008

Open
1 of 8 tasks
saiharshavellanki opened this issue Dec 31, 2024 · 3 comments
Open
1 of 8 tasks
Labels
bug Something isn't working

Comments

@saiharshavellanki
Copy link

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • PySpark

Describe the problem

We are using Delta Lake version 2.1.0 (in a Glue 4.0 job) to write data to a Delta table in S3. However, this has resulted in many small files (~15 KB each) across 27,000 partitions, with each partition containing about 50 files. While compacting some partitions has been straightforward, the vacuum operation is taking over 6 hours, even with a G2X configuration and 40 workers since vacuum runs across all the partitions.

I’ve explored the following options but need further guidance due to limited PySpark documentation:

  • Inventory File Feature: I couldn’t find clear PySpark documentation on how to use this.
  • Parallel Delete Configuration: I found the setting spark.databricks.delta.vacuum.parallelDelete.enabled=true, but since we’re not on Databricks, I’m unsure of the equivalent for PySpark.
  • Vacuum Lite Operation: I came across this option but need clarification on how to use it.

We’ve recently migrated to Glue 5.0, which supports Delta Lake 3.2.1. As a result, new tables will use version 3.2.1, while older tables remain on 2.1.0. I’d appreciate suggestions for handling this scenario in both versions.

Steps to reproduce

Observed results

delta_table.vacuum(24) taking more than 6 hours for very large delta tables with too many small files

Expected results

delta_table.vacuum(24) taking less than 15-20 minutes or option to execute vacuum only on certain partitions

Further details

Environment information

Both the environments listed below
1 -->

  • Glue version: 4.0
  • Delta Lake version: 2.1.0
  • Spark version: 3. 3
  • Scala version:

2-->

  • Glue version: 5.0
  • Delta Lake version: 3.2.1
  • Spark version: 3.5.2
  • Scala version: 2.12.18

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@saiharshavellanki saiharshavellanki added the bug Something isn't working label Dec 31, 2024
@saiharshavellanki saiharshavellanki changed the title [BUG][Spark] Vacuum operation is too slow on very large delta table in S3 [BUG][PySpark] Vacuum operation is too slow on very large delta table in S3 Dec 31, 2024
@saiharshavellanki
Copy link
Author

saiharshavellanki commented Dec 31, 2024

I realized that we can use Spark SQL to query the inventory table. However, I'm considering an alternative approach: directly reading the JSON file corresponding to the latest compaction in the Delta table (located in the _delta_log directory). This would allow us to identify the files that were removed and pass them to the inventory. While I'm unsure if this is the best approach, it could significantly speed up the vacuum process since we’d only be targeting the files affected in specific versions. Could someone advise if this approach would be better?

@lucabem
Copy link

lucabem commented Jan 2, 2025

Hi @saiharshavellanki, we are using Delta too on S3 and we are removing over 5k files in less than 3 minutes using a cluster such as

    {
      "dynamicAllocation_initialExecutors": "3",
      "dynamicAllocation_minExecutors": "3",
      "dynamicAllocation_maxExecutors": "8",
      "driver_cores": "1.0",
      "driver_coreRequest": "1000m",
      "driver_coreLimit": "1000m",
      "driver_memory": "2048m",
      "executor_instances": "3",
      "executor_cores": "3.0",
      "executor_coreRequest": "3000m",
      "executor_coreLimit": "3000m",
      "executor_memory": "12288m"
}

The Vacuum output is

VacuumCommand: Deleted 5143 files (751423721796 bytes) and directories in a total of 24780 directories

We are using Kubernetes instead of Glue and using Delta 3.2.1 version. We are only adding this config to Spark on Vacuum command:

      "spark.databricks.delta.optimize.maxThreads": "500",
      "spark.databricks.delta.dataSkippingNumIndexedCols": "5",
      "spark.databricks.delta.schema.autoMerge.enabled": "true",
      "spark.databricks.delta.optimize.repartition.enabled": "true",
      "spark.databricks.delta.vacuum.parallelDelete.enabled" :"true",
      "spark.databricks.delta.retentionDurationCheck.enabled": "false",
      "spark.databricks.delta.merge.repartitionBeforeWrite.enabled": "false",

      "spark.databricks.delta.autoCompact.enabled": "true",
      "spark.databricks.delta.autoCompact.minNumFiles": "40",
      "spark.databricks.delta.optimizeWrite.enabled": "true",

On DeltaSQLConf.scala

  val DELTA_VACUUM_PARALLEL_DELETE_ENABLED =
    buildConf("vacuum.parallelDelete.enabled")
      .doc("Enables parallelizing the deletion of files during a vacuum command. Enabling " +
        "may result hitting rate limits on some storage backends. When enabled, parallelization " +
        "is controlled 'spark.databricks.delta.vacuum.parallelDelete.parallelism'.")
      .booleanConf
      .createWithDefault(false)

  val DELTA_VACUUM_PARALLEL_DELETE_PARALLELISM =
    buildConf("vacuum.parallelDelete.parallelism")
      .doc("Sets the number of partitions to use for parallel deletes. If not set, defaults to " +
        "spark.sql.shuffle.partitions.")
      .intConf
      .checkValue(_ > 0, "parallelDelete.parallelism must be positive")
      .createOptional

When you enable parallelism in vacuum proccess, its getting the value from spark.sql.shuffle.partitions. Increase this Delta spark configuration to allow more paralellism (by default G2X uses poor paralellism). By default spark.sql.shuffle.partitions will be

spark.sql.shuffle.partitions = number of cores per worker x (number of workers - 1)

Anyways, you must check that you are not reaching S3 Quota Limits (5000 per second) over the same prefix

@saiharshavellanki
Copy link
Author

Got it. Thanks for sharing the config @lucabem. We are currently going with passing recently removed files as inventory file list to VACUUM command. This might reduce the costs further given we are not performing s3 list calls anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants