Skip to content

Commit

Permalink
fix: parallel parquet can underflow when max_record_batch_rows < exec…
Browse files Browse the repository at this point in the history
…ution.batch_size (#9737)

* loop split rb

* add test

* add new test

* fmt

* lower batch size in test

* make test faster

* use path not into_path
  • Loading branch information
devinjdangelo authored Mar 23, 2024
1 parent 1dbec3e commit 02fd450
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 36 deletions.
56 changes: 54 additions & 2 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ impl DataFrame {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use super::super::Result;
use super::*;
use crate::arrow::util::pretty;
use crate::execution::context::SessionContext;
use crate::execution::options::ParquetReadOptions;
use crate::test_util;
use crate::test_util::{self, register_aggregate_csv};

use datafusion_common::file_options::parquet_writer::parse_compression_string;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{col, lit};

use object_store::local::LocalFileSystem;
Expand Down Expand Up @@ -150,7 +152,7 @@ mod tests {
.await?;

// Check that file actually used the specified compression
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;
let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?;

let reader =
parquet::file::serialized_reader::SerializedFileReader::new(file)
Expand All @@ -166,4 +168,54 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn write_parquet_with_small_rg_size() -> Result<()> {
// This test verifies writing a parquet file with small rg size
// relative to datafusion.execution.batch_size does not panic
let mut ctx = SessionContext::new_with_config(
SessionConfig::from_string_hash_map(HashMap::from_iter(
[("datafusion.execution.batch_size", "10")]
.iter()
.map(|(s1, s2)| (s1.to_string(), s2.to_string())),
))?,
);
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
let test_df = ctx.table("aggregate_test_100").await?;

let output_path = "file://local/test.parquet";

for rg_size in 1..10 {
let df = test_df.clone();
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
let ctx = &test_df.session_state;
ctx.runtime_env().register_object_store(&local_url, local);
let mut options = TableParquetOptions::default();
options.global.max_row_group_size = rg_size;
options.global.allow_single_file_parallelism = true;
df.write_parquet(
output_path,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(options),
)
.await?;

// Check that file actually used the correct rg size
let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?;

let reader =
parquet::file::serialized_reader::SerializedFileReader::new(file)
.unwrap();

let parquet_metadata = reader.metadata();

let written_rows = parquet_metadata.row_group(0).num_rows();

assert_eq!(written_rows as usize, rg_size);
}

Ok(())
}
}
73 changes: 39 additions & 34 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,42 +876,47 @@ fn spawn_parquet_parallel_serialization_task(
)?;
let mut current_rg_rows = 0;

while let Some(rb) = data.recv().await {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;
while let Some(mut rb) = data.recv().await {
// This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
// when max_row_group_rows < execution.batch_size as an alternative to a recursive async
// function.
loop {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
break;
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);

let b = rb.slice(rows_left, rb.num_rows() - rows_left);
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
send_arrays_to_col_writers(&col_array_channels, &b, schema.clone())
.await?;
current_rg_rows = b.num_rows();
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
}
}
}

Expand Down

0 comments on commit 02fd450

Please sign in to comment.