Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncArrowWriter doesn't limit underlying ArrowWriter to respect buffer-size #5450

Closed
DDtKey opened this issue Mar 1, 2024 · 18 comments · Fixed by #5457
Closed

AsyncArrowWriter doesn't limit underlying ArrowWriter to respect buffer-size #5450

DDtKey opened this issue Mar 1, 2024 · 18 comments · Fixed by #5457
Assignees
Labels
documentation Improvements or additions to documentation enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers help wanted parquet Changes to the parquet crate

Comments

@DDtKey
Copy link
Contributor

DDtKey commented Mar 1, 2024

AsyncArrowWriter created with default WriterProperties will have a config of DEFAULT_MAX_ROW_GROUP_SIZE = 1024 * 1024

It means that underlying ArrowWriter won't flush to disk until it's reached.
It leads to the incredible memory consumption, because it will cache up to DEFAULT_MAX_ROW_GROUP_SIZE (1048576 by default) and will ignore buffer_capacity config at all.

Because the flushing condition of sync writer is:

if in_progress.buffered_rows >= self.max_row_group_size {
    self.flush()?
}

To Reproduce
Try to write many large rows to parquet with AsyncArrowWriter, you will see the memory consumption doesn't respect buffer size.

UPD: MRE was created #5450 (comment)

Expected behavior
Perfectly, it should respect buffer config.
I.e flush on either buffer size or max row group is reached.

But even if it's expected for some reason, documentation should clearly highlight that.

Additional context

Btw, why default is 1024 * 1024? Like it's byte unites

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2024

The structure of parquet forces us to buffer an entire row group before we can flush it. The async writer should do a better job of calling this out

It consumed 10gb of memory accordingly.

Something is wrong here, it should only consume up to 10Mb, perhaps you could use a memory profiler to identify where the usage is coming from

@alamb
Copy link
Contributor

alamb commented Mar 1, 2024

Btw, why default is 1024 * 1024? Like it's byte unites

According to the docs, DEFAULT_MAX_ROW_GROUP_SIZE is number of rows, not bytes
https://docs.rs/parquet/latest/parquet/file/properties/constant.DEFAULT_MAX_ROW_GROUP_SIZE.html
https://docs.rs/parquet/latest/parquet/file/properties/struct.WriterProperties.html#method.max_row_group_size

Returns maximum number of rows in a row group.

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2024

Aah yes, I thought there was a mechanism to also limit the maximum size of row groups, but perhaps that is only for pages

@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

The structure of parquet forces us to buffer an entire row group before we can flush it.

Yeah, that totally makes sense.

Something is wrong here, it should only consume up to 10Mb, perhaps you could use a memory profiler to identify where the usage is coming from

Well, it definitely was AsyncArrowWriter. Once I decreased max row group usage became normal - but still it caches up to row number limit ignoring any buffer limits.

And also I tried not to use arrow writer at all, but write to disk directly in streaming manner as is - there were no issues.

I think I can provide MRE easily. It will be easier to profile

@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

According to the docs, DEFAULT_MAX_ROW_GROUP_SIZE is number of rows, not bytes

Yeah, that's totally okay. I mean default value is weird, like it's bytes

@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

image

Here is simple MRE:
https://gist.github.com/DDtKey/706930c78dbb296899c2ef1bbf86459a
Memory keeps increasing over time. And after interuption - target file is 0 bytes.

Pay attention to lines 30-33, it can change everything => amount of consumed memory is stable (~40Gb file was generated before I interrupted):

image

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2024

Currently this is expected behaviour, row groups are only automatically "closed" based on row count.

I would suggest the following:

  • Document that AsyncArrowWriter's buffer size is not authoritative, and is bounded by the size of the row groups produced
  • Add the ability to limit the maximum size of a row group before ArrowWriter creates a new row group 1.

1. Unlike the underlying SerializedFileWriter where the API is in terms of columns and therefore chunking is controlled by the caller, ArrowWriter could do a best-effort approach where it checks the in-progress size after writing each batch and determines whether to flush

@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

Interestingly that with decreased max_row_group_size it also keeps increasing over time, but much much slower.
image

And this one with default one (with enabled delays in between, to slow-down process)
image

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2024

Interestingly that with decreased max_row_group_size it also keeps increasing over time, but much much slower.

This is not unexpected, certain information such as indexes and statistics need to be retained until the footer is written, smaller row groups will make the overheads of this worse (and are why very small row groups are generally not a fantastic idea)

@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

Oh, right, yes - that makes sense

@tustvold tustvold added documentation Improvements or additions to documentation good first issue Good for newcomers enhancement Any new improvement worthy of a entry in the changelog help wanted and removed bug labels Mar 1, 2024
@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

Btw, can't we just explicitly enforce ArrowWriter to "flush" and start new row group right from AsyncArrowWriter in try_flush? 🤔

Because the main issue (if I'm not wrong)- we don't reach this condition because of large max_row_group_size:

if in_progress.buffered_rows >= self.max_row_group_size {
self.flush()?
}

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2024

Btw, can't we just explicitly enforce ArrowWriter to "flush" and start new row group right from AsyncArrowWriter in try_flush? 🤔

Yes, that is an option that is available to users, and with #5251 the necessary meta information is exposed to the clients to make this judgement for themselves.

However, as this has come up a few times, providing a conservative default limit of say 1GB is probably a sane modification, users can then lower this if they're happy to accept the trade-off of smaller row groups.

We don't want to use the buffer_size setting as this would then present an unfortunate trade-off where the limit would become the pre-allocation for the buffer, which we might never hit.

@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

To be honest, my initial expectation was that I provide buffer-size (in AsyncArrowWriter) to ensure it flushes this amount of data.
But in fact, it doesn't at all, only number of rows is important.

Subjectively, it looks like we should flush underlying ArrowWriter (& start new row group) each time buffer has reached its capacity.

It's typical behavior at least for buffered writers, I don't expect it continue to keep so large amount of data.

Moreover, we have max_row_group_size, not min_row_group_size - so it's confusing currenly

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2024

I agree it is potentially confusing, but I think the solution is to better document what buffer size is and is not, and potentially add separate functionality to ArrowWriter / AsyncArrowWriter to constrain row group size.

I am reticent to change this definition as the API breakage would be subtle and not immediately obvious, beyond larger files and worse query performance

@DDtKey
Copy link
Contributor Author

DDtKey commented Mar 1, 2024

I am reticent to change this definition as the API breakage would be subtle and not immediately obvious, beyond larger files and worse query performance

It's doable by creating a new writer type or methods with changed behavior and optionally deprecate old one.

Or something like that: AsynvArrowWriter::with_strict_buffer(..)(could be better name, just example) and internally have a flag to switch the behavior.

@tustvold
Copy link
Contributor

tustvold commented Mar 1, 2024

I think we should start by documenting the current state of play and go from there, I'll try to get something up later today. It may be we can get by with just an example showing how to limit memory usage.

@tustvold
Copy link
Contributor

tustvold commented Mar 3, 2024

FWIW #5458 tracks moving ObjectStore away from the somewhat problematic AsyncWrite abstraction

tustvold added a commit that referenced this issue Mar 8, 2024
* Document parquet writer memory limiting (#5450)

* Review feedback

* Review feedback
@tustvold tustvold added the parquet Changes to the parquet crate label Mar 15, 2024
@tustvold
Copy link
Contributor

label_issue.py automatically added labels {'parquet'} from #5471

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers help wanted parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants