Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading a list of S3 parquet files with query planning enabled is ~25x slower #1061

Open
b-phi opened this issue May 10, 2024 · 11 comments
Open

Comments

@b-phi
Copy link

b-phi commented May 10, 2024

Was struggling to understand why creating a dask dataframe from a large list of parquet files was taking ages. Eventually tried disabling query planning and saw normal timing again. These are all relatively small S3 files ~1MB. There is no metadata file or similar.

Screenshot 2024-05-10 at 3 21 23 PM

Environment:

  • dask==2024.5.0
  • dask-expr==1.1.0
  • python==3.10
@phofl
Copy link
Collaborator

phofl commented May 10, 2024

Hi, thanks for your report. We will look into this

@phofl
Copy link
Collaborator

phofl commented May 10, 2024

cc @fjetter the checksum calculation causes the slowdown here. Looks like the info isn't cached for lists of paths.

files = [f"s3://coiled-data/uber/part.{i}.parquet" for i in range(300)]
dd.read_parquet(files)

99% of the runtime is the checksum calculation, just passing the path takes 1 seconds instead of 13

@fjetter
Copy link
Member

fjetter commented May 17, 2024

Well, this is not strictly about caching. The legacy reader did indeed not calculate a checksum. The checksum is currently used to cache division statistics per dataset. If we drop that cache (i.e. we'd have to re-calculate the divisions for every distinct read_parquet. Right now, multiple read_parquet calls on the same unaltered dataset would re-use the cache) we could skip the checksum.

As I said, the problem is not that we don't cache the checksum calculation but that to calculate the checksum we have to perform N requests to S3. If one provides simply a prefix, we only have to perform a single request.

I was already considering dropping this API entirely. Accepting a list of files introduces this and other weird artifacts. @b-phi can you tell us a bit more about why you are choosing this approach instead of simply providing a common prefix?

@b-phi
Copy link
Author

b-phi commented May 17, 2024

Hey @fjetter, happy to provide more details. A simplified view of how our data is laid out in S3 looks like this. Where multiple files within an S3 "folder" can indicate either multiple versions, or a large file that was split into multiple smaller chunks. We have an internal application that translates user queries into a list of files to load, for example "give me all symbol=FOO files" might return [file_3, file_5] and "give me all files as of this time" might return [file_1, file_7]. These types of load operations cannot be fit into a prefix-only structure.

bucket/dataset_1/date=2020-01-01/symbol=FOO/file_1.parquet
                                        .../file_2.parquet
                                        .../file_3.parquet
bucket/dataset_1/date=2020-01-01/symbol=BAR/file_4.parquet
bucket/dataset_1/date=2020-01-02/symbol=FOO/file_5.parquet
bucket/dataset_1/date=2020-01-02/symbol=BAR/file_6.parquet
                                        .../file_7.parquet
                                        .../file_8.parquet

I'm a big fan of the recent work to add query planning to dask. While I can appreciate that supporting list inputs introduces some complexity here, loading a provided list of files in parallel seems to me to be one of the fundamental use cases of distributed dataframes.

For my knowledge, is it possible to shortly summarize the difference between dd.read_parquet([...]) and dd.from_delayed([delayed(pd.read_parquet)(file) for file in ...], meta=...)?

@fjetter
Copy link
Member

fjetter commented May 17, 2024

For my knowledge, is it possible to shortly summarize the difference between dd.read_parquet([...]) and dd.from_delayed([delayed(pd.read_parquet)(file) for file in ...], meta=...)?

Well, I'm trying to stay very short

  • from_delayed doesn't support any optimizations. If you have to, use from_map instead
  • Column projections and filter pushdowns only work for the parquet reader directly
  • we make use of some parquet statistics, e.g. we check how large columns are and may or may not trigger automatic repartitioning based on this

These types of load operations cannot be fit into a prefix-only structure.

Dask should be able to handle the date and symbol stuff you are posting here. This is called "hive like" partitioning. Try something like dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]])
If there are no bugs this should filter the files and only select those which match the provided filters. In fact, with the optimizer, this should just look like

ddf = read_parquet("bucket/dataset_1")
ddf = ddf[ddf.data > datetime.date(2020, 1, 1)]  # I'm actually not sure which type the value has to be
ddf = ddf[ddf.symbol == "BAR"]

and the optimizer should rewrite it such that the filters are provided to the parquet layer automatically.

@b-phi
Copy link
Author

b-phi commented May 17, 2024

Dask should be able to handle the date and symbol stuff you are posting here. This is called "hive like" partitioning. Try something like dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]])

Yes understood, and if our use case was limited to filtering on the hive partitions that would cover it. However there's additional metadata that we often need to filter on that isn't represented in the S3 folder structure. This use case for example "give me all files as of this time" would refer to file creation time as stored in the internal application, rather than a hive partition filter we could create.

Another issue is that while most python libraries with similar functionality generally support accepting a list of parquet files as input (arrow, ray) there isn't a standard way of filtering partitions and file paths. Ray for example seems to have a callable based approach (disclaimer I haven't used this personally). As a result, if we need to support multiple tools, I'd much rather filter for partitions myself and pass the resulting files to different libraries rather than translate a given filter into each libraries preferred partition filtering approach.

@b-phi
Copy link
Author

b-phi commented May 17, 2024

I will take a look at from_map, we had been using from_delayed for some use cases but that seems like it may be faster. Thanks!

@fjetter
Copy link
Member

fjetter commented May 17, 2024

I empathize with your situation. I know that other libraries are offering this kind of interface but there is different context. Most importantly, we are running an optimizer on this and have stricter requirements for the input than other libraries might have.

A solution to this might be for us to implement a simpler reader for this kind API request that supports a smaller feature set that is essentially a from_map behind the scenes without further optimization. Supporting all those different inputs is what made the initial implementation unmaintainable and I don't want to go down that path again.

As a result, if we need to support multiple tools, I'd much rather filter for partitions myself and pass the resulting files to different libraries rather than translate a given filter into each libraries preferred partition filtering approach.

This is certainly not always an option but you may want to consider writing this information into the file itself. If the file has a single value in a column, the parquet file compresses this exceptionally well. The parquet metadata is then sufficient to decide on whether this file has to be loaded and you would never have to read the column back in.

@fjetter fjetter changed the title Reading S3 parquet files with query planning enabled is ~25x slower Reading a list of S3 parquet files with query planning enabled is ~25x slower May 17, 2024
@b-phi
Copy link
Author

b-phi commented May 17, 2024

In our case, we could easily have 5-10 million files under bucket/dataset_1/.... We're able to find the files to load in <1s for most queries. Could we expect similar performance from this approach?

dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]])

@b-phi
Copy link
Author

b-phi commented May 17, 2024

In our case, we could easily have 5-10 million files under bucket/dataset_1/.... We're able to find the files to load in <1s for most queries. Could we expect similar performance from this approach?

dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]])

I'll do some performance testing with the expression filters, but it sounds like the overall takeaway is to use dd.from_map rather than dd.from_parquet if a use case requires providing an explicit list of parquet files.

@fjetter
Copy link
Member

fjetter commented May 21, 2024

We're able to find the files to load in <1s for most queries. Could we expect similar performance from this approach?

probably not but I don't know for sure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants