From 179dfd2e45025f4146747c67d057d8af8fa1dbdc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sun, 3 Mar 2024 10:31:32 +1300 Subject: [PATCH 1/3] Document parquet writer memory limiting (#5450) --- parquet/src/arrow/arrow_writer/mod.rs | 26 ++++++++++++++++++++++++++ parquet/src/arrow/async_writer/mod.rs | 23 +++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index b677ef354c7f..4fe55a467362 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -80,6 +80,32 @@ mod levels; /// /// assert_eq!(to_write, read); /// ``` +/// +/// ## Memory Limiting +/// +/// The nature of parquet forces buffering of an entire row group before it can be flushed +/// to the underlying writer. Data is buffered in its encoded form, to reduce memory usage, +/// but if writing rows containing large strings or very nested data, this may still result in +/// non-trivial memory usage. +/// +/// [`ArrowWriter::in_progress_size`] can be used to track the size of the buffered row group, +/// and potentially trigger an early flush of a row group based on a memory threshold and/or +/// global memory pressure. However, users should be aware that smaller row groups will result +/// in higher metadata overheads, and may worsen compression ratios and query performance. +/// +/// ```no_run +/// # use std::io::Write; +/// # use arrow_array::RecordBatch; +/// # use parquet::arrow::ArrowWriter; +/// # let mut writer: ArrowWriter> = todo!(); +/// # let batch: RecordBatch = todo!(); +/// writer.write(&batch).unwrap(); +/// // Trigger an early flush if buffered size exceeds 1_000_000 +/// if writer.in_progress_size() > 1_000_000 { +/// writer.flush().unwrap(); +/// } +/// ``` +/// pub struct ArrowWriter { /// Underlying Parquet writer writer: SerializedFileWriter, diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 60097e88737f..4bb45edc1716 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -69,6 +69,29 @@ use tokio::io::{AsyncWrite, AsyncWriteExt}; /// It is implemented based on the sync writer [`ArrowWriter`] with an inner buffer. /// The buffered data will be flushed to the writer provided by caller when the /// buffer's threshold is exceeded. +/// +/// ## Memory Limiting +/// +/// The nature of parquet forces buffering of an entire row group before it can be flushed +/// to the underlying writer. This buffering may exceed the configured buffer size +/// of [`AsyncArrowWriter`]. Memory usage can be limited by prematurely flushing the row group, +/// although this will have implications for file size and query performance. See [ArrowWriter] +/// for more information. +/// +/// ```no_run +/// # use tokio::fs::File; +/// # use arrow_array::RecordBatch; +/// # use parquet::arrow::AsyncArrowWriter; +/// # async fn test() { +/// let mut writer: AsyncArrowWriter = todo!(); +/// let batch: RecordBatch = todo!(); +/// writer.write(&batch).await.unwrap(); +/// // Trigger an early flush if buffered size exceeds 1_000_000 +/// if writer.in_progress_size() > 1_000_000 { +/// writer.flush().await.unwrap() +/// } +/// # } +/// ``` pub struct AsyncArrowWriter { /// Underlying sync writer sync_writer: ArrowWriter, From 98cca13921e0d0a32510902636dbe48e1d6edfd7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 8 Mar 2024 14:43:40 +1300 Subject: [PATCH 2/3] Review feedback --- parquet/src/arrow/async_writer/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index 4bb45edc1716..ada74a4a0654 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -116,6 +116,8 @@ impl AsyncArrowWriter { /// /// [`Self::write`] will flush this intermediate buffer if it is at least /// half full + /// + /// Please see the documentation on [`Self`] for details on memory usage. pub fn try_new( writer: W, arrow_schema: SchemaRef, @@ -135,6 +137,8 @@ impl AsyncArrowWriter { /// /// [`Self::write`] will flush this intermediate buffer if it is at least /// half full + /// + /// Please see the documentation on [`Self`] for details on memory usage. pub fn try_new_with_options( writer: W, arrow_schema: SchemaRef, From 1409d4d5d60e62c533d3c31e9171842875fc78bc Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 8 Mar 2024 14:54:20 +1300 Subject: [PATCH 3/3] Review feedback --- parquet/src/arrow/async_writer/mod.rs | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/parquet/src/arrow/async_writer/mod.rs b/parquet/src/arrow/async_writer/mod.rs index ada74a4a0654..264b05dc5cef 100644 --- a/parquet/src/arrow/async_writer/mod.rs +++ b/parquet/src/arrow/async_writer/mod.rs @@ -109,15 +109,10 @@ pub struct AsyncArrowWriter { impl AsyncArrowWriter { /// Try to create a new Async Arrow Writer. /// - /// `buffer_size` determines the number of bytes to buffer before flushing - /// to the underlying [`AsyncWrite`] - /// - /// The intermediate buffer will automatically be resized if necessary - /// - /// [`Self::write`] will flush this intermediate buffer if it is at least - /// half full - /// - /// Please see the documentation on [`Self`] for details on memory usage. + /// `buffer_size` determines the minimum number of bytes to buffer before flushing + /// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may + /// force buffering of data in excess of this within the underlying [`ArrowWriter`]. + /// See the documentation on [`ArrowWriter`] for more details pub fn try_new( writer: W, arrow_schema: SchemaRef, @@ -130,15 +125,10 @@ impl AsyncArrowWriter { /// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]. /// - /// `buffer_size` determines the number of bytes to buffer before flushing - /// to the underlying [`AsyncWrite`] - /// - /// The intermediate buffer will automatically be resized if necessary - /// - /// [`Self::write`] will flush this intermediate buffer if it is at least - /// half full - /// - /// Please see the documentation on [`Self`] for details on memory usage. + /// `buffer_size` determines the minimum number of bytes to buffer before flushing + /// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may + /// force buffering of data in excess of this within the underlying [`ArrowWriter`]. + /// See the documentation on [`ArrowWriter`] for more details pub fn try_new_with_options( writer: W, arrow_schema: SchemaRef,