Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R] filtering a dataset down to a single partition (dplyr interface) prior to collection still accesses other partitions, leading to slow reads #44725

Open
debrouwere opened this issue Nov 14, 2024 · 3 comments

Comments

@debrouwere
Copy link

debrouwere commented Nov 14, 2024

Describe the bug, including details regarding any error messages, version, and platform.

I have a hive-style partitioned Parquet dataset, and each partition consists of only a single file, part-0.parquet. When I filter down to a single partition using the dplyr interface, open_dataset still ends up unnecessarily accessing 10 to 15 files which leads to unexpectedly slow loads. The filtering itself is correct, it's the performance I'm concerned about.

I am using R 4.4.1 on macOS (Intel), with a precompiled version of Arrow, 17.0.0.1. I have generated the partitioned dataset with the same version of Arrow and the arrow R package that is used to read it in. It was written using the default version (2.4 as far as I know) and zstd compression, though I have just now rewritten the entire dataset with version 2.6 and am still getting the same behavior.

This takes about 20 seconds for me:

since <- Sys.time()
assessments <- read_parquet('build/pisa.rx/cycle=2022/country=Belgium/part-0.parquet', col_select = starts_with('w_'))
until <- Sys.time()
until - since

whereas this takes about 50 seconds:

since <- Sys.time()
assessments <- open_dataset('build/pisa.rx') |>
  filter(country == 'Belgium', cycle == 2022) |>
  select(starts_with('w_')) |>
  collect()
until <- Sys.time()
until - since

and even this takes 40 seconds:

rx_schema <- assessments |> schema()

since <- Sys.time()
assessments <- open_dataset('build/pisa.rx',
                            hive_style = TRUE,
                            partitioning = partitioning,
                            unify_schemas = FALSE,
                            format = 'parquet',
                            schema = rx_schema) |>
  filter(country == 'Belgium', cycle == 2022) |>
  select(starts_with('w_')) |>
  collect()
until <- Sys.time()
until - since

and in fact not filtering down to a single partition seems to be faster, 35 seconds, even though it's reading 8 times as much data:

since <- Sys.time()
assessments <- open_dataset('build/pisa.rx') |>
  filter(country == 'Belgium') |>
  select(starts_with('w_')) |>
  collect()
until <- Sys.time()
until - since

I know open_dataset is reading in so many files because I get about 10 or more (innocuous) "invalid metadata$r" errors (see #40423) for a single call to open_dataset. Possibly it is trying to unify the schemas by peeking in these other files, even though unify_schemas = FALSE, or perhaps it is ignoring (part of) the hive-style partitioning and resorting to scanning the rows in order to filter? There are 8 cycles and over 30 countries in the dataset though, so it doesn't look like it is going through the entire dataset either.

I realize that open_dataset has some overhead relative to read_parquet because it has to walk the directory structure etc., but if I filter down to a single partition surely open_dataset should only access that particular partition?

I'm not sure but I don't think "invalid metadata$r" itself is the problem because although the metadata contains the schema, as you can see above I have also tried to load the data with a valid schema pre-specified.

Component(s)

R

@raulcd raulcd changed the title filtering a dataset down to a single partition (dplyr interface) prior to collection still accesses other partitions, leading to slow reads [R] filtering a dataset down to a single partition (dplyr interface) prior to collection still accesses other partitions, leading to slow reads Nov 14, 2024
@debrouwere
Copy link
Author

Okay, so, it turns out that performance is much much better (and no more "invalid metadata$r") across the board for a data.frame as opposed to a tibble, and there is no longer much of a difference in performance between directly reading in a partition vs. filtering on it before collecting. (For writing, doesn't matter if it reads the data into a tibble.) That solves my immediate problem but I'll leave this issue open because I imagine Arrow/Parquet is supposed to work with tibbles too?

Also, for completeness, I did upgrade to Arrow 18 and that didn't help either.

@amoeba
Copy link
Member

amoeba commented Dec 11, 2024

Hi @debrouwere, thanks for the issue.

To my knowledge, open_dataset is eagerly evaluated and so whether it's involved in a dplyr pipeline or not won't change how long it takes. I should double-check this but if that's true, it might be an area for improvement. What do you get if you run your code snippets on the same Dataset instance? e.g.,

ds <- open_dataset('build/pisa.rx')

ds |>
  filter(country == 'Belgium', cycle == 2022) |>
  select(starts_with('w_')) |>
  collect()

# And run other tests here to calculate timings

Also, I didn't really understand your last comment about data.frames vs. tibbles and performance. Could you explain that a bit more?

@debrouwere
Copy link
Author

debrouwere commented Dec 12, 2024

I wonder if you meant lazily evaluated, in the sense that, until collect is called, no data is read?

With regards to tibbles and performance, my bad, I can see now that my comment was unclear. In R, I can take any tibble and then write that to disk as a hive-style Parquet dataset with write_dataset. Although writing to Parquet from a tibble doesn't produce any errors, I have noticed that if I then try to read the resulting dataset, it results in very slow reads and invalid metadata$r errors, which is what my original bug report was about. Ultimately the data gets read, there is no corruption, it's just very slow. However, if instead I persist the dataset to disk using my_tibble |> as.data.frame() |> write_dataset(...) subsequent reads are much faster and don't result in invalid metadata$r errors. Therefore, my guess is that there is something suboptimal / weird about how data from tibbles is converted into Parquet. To be completely clear, I am talking about read performance, not about whether analysis with a data.frame or with a tibble is faster in R, which wouldn't really have anything to do with Parquet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants