Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: spawn sync parquet write on blocking runtime #2806

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 125 additions & 16 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ pub struct DeltaWriter {
config: WriterConfig,
/// partition writers for individual partitions
partition_writers: HashMap<Path, PartitionWriter>,
/// Optional runtime for Parquet blocking IO
blocking_runtime: Option<tokio::runtime::Handle>,
}

impl DeltaWriter {
Expand All @@ -139,25 +141,45 @@ impl DeltaWriter {
object_store,
config,
partition_writers: HashMap::new(),
blocking_runtime: None,
}
}

/// Sets runtime used for blocking Parquet IO
pub fn with_blocking_runtime(mut self, runtime: tokio::runtime::Handle) -> Self {
self.blocking_runtime = Some(runtime);
self
}

/// Apply custom writer_properties to the underlying parquet writer
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.config.writer_properties = writer_properties;
self
}

fn divide_by_partition_values(
async fn divide_by_partition_values(
&mut self,
values: &RecordBatch,
) -> DeltaResult<Vec<PartitionResult>> {
Ok(divide_by_partition_values(
self.config.file_schema(),
self.config.partition_columns.clone(),
values,
)
.map_err(|err| WriteError::Partitioning(err.to_string()))?)
Ok(match &self.blocking_runtime {
Some(handle) => {
let values = values.clone();
let schema = self.config.file_schema();
let partition_cols = self.config.partition_columns.clone();
handle
.spawn_blocking(move || {
divide_by_partition_values(schema, partition_cols, &values)
})
.await
.map_err(|e| WriteError::Partitioning(e.to_string()))?
}
None => divide_by_partition_values(
self.config.file_schema(),
self.config.partition_columns.clone(),
values,
),
}
.map_err(|e| WriteError::Partitioning(e.to_string()))?)
}

/// Write a batch to the partition induced by the partition_values. The record batch is expected
Expand Down Expand Up @@ -185,12 +207,17 @@ impl DeltaWriter {
Some(self.config.target_file_size),
Some(self.config.write_batch_size),
)?;
let mut writer = PartitionWriter::try_with_config(
let writer = PartitionWriter::try_with_config(
self.object_store.clone(),
config,
self.config.num_indexed_cols,
self.config.stats_columns.clone(),
)?;
let mut writer = match &self.blocking_runtime {
Some(runtime) => writer.with_blocking_runtime(runtime.clone()),
None => writer,
};

writer.write(&record_batch).await?;
let _ = self.partition_writers.insert(partition_key, writer);
}
Expand All @@ -205,7 +232,7 @@ impl DeltaWriter {
/// The `close` method has to be invoked to write all data still buffered
/// and get the list of all written files.
pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
for result in self.divide_by_partition_values(batch)? {
for result in self.divide_by_partition_values(batch).await? {
self.write_partition(result.record_batch, &result.partition_values)
.await?;
}
Expand Down Expand Up @@ -274,6 +301,13 @@ impl PartitionWriterConfig {
}
}

#[derive(Debug, Default)]
enum WriterState {
Ready(ArrowWriter<ShareableBuffer>),
#[default]
Writing,
}

/// Partition writer implementation
/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
Expand All @@ -284,13 +318,14 @@ pub struct PartitionWriter {
writer_id: uuid::Uuid,
config: PartitionWriterConfig,
buffer: ShareableBuffer,
arrow_writer: ArrowWriter<ShareableBuffer>,
arrow_writer: WriterState,
part_counter: usize,
files_written: Vec<Add>,
/// Num index cols to collect stats for
num_indexed_cols: i32,
/// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols
stats_columns: Option<Vec<String>>,
blocking_runtime: Option<tokio::runtime::Handle>,
}

impl PartitionWriter {
Expand All @@ -308,6 +343,8 @@ impl PartitionWriter {
Some(config.writer_properties.clone()),
)?;

let arrow_writer = WriterState::Ready(arrow_writer);

Ok(Self {
object_store,
writer_id: uuid::Uuid::new_v4(),
Expand All @@ -318,9 +355,16 @@ impl PartitionWriter {
files_written: Vec::new(),
num_indexed_cols,
stats_columns,
blocking_runtime: None,
})
}

/// Sets runtime used for blocking Parquet IO
pub fn with_blocking_runtime(mut self, runtime: tokio::runtime::Handle) -> Self {
self.blocking_runtime = Some(runtime);
self
}

fn next_data_path(&mut self) -> Path {
self.part_counter += 1;

Expand All @@ -332,27 +376,88 @@ impl PartitionWriter {
)
}

fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter<ShareableBuffer>, ShareableBuffer)> {
fn reset_writer(&mut self) -> DeltaResult<(WriterState, ShareableBuffer)> {
let new_buffer = ShareableBuffer::default();
let arrow_writer = ArrowWriter::try_new(
new_buffer.clone(),
self.config.file_schema.clone(),
Some(self.config.writer_properties.clone()),
)?;
let arrow_writer = WriterState::Ready(arrow_writer);
Ok((
std::mem::replace(&mut self.arrow_writer, arrow_writer),
std::mem::replace(&mut self.buffer, new_buffer),
))
}

fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
Ok(self.arrow_writer.write(batch)?)
async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
match &self.blocking_runtime {
Some(handle) => {
let writer = std::mem::take(&mut self.arrow_writer);
match writer {
WriterState::Ready(mut writer) => {
let task_batch = batch.clone();
let writer = handle
.spawn_blocking(move || {
writer.write(&task_batch)?;
Ok::<_, ArrowError>(writer)
})
.await
.map_err(|e| {
DeltaTableError::Generic(format!("Error writing batch: {:?}", e))
})??;
self.arrow_writer = WriterState::Ready(writer);
Ok(())
}
WriterState::Writing => Err(DeltaTableError::Generic(
"Writer should be ready".to_string(),
)),
}
}
None => match &mut self.arrow_writer {
WriterState::Ready(writer) => Ok(writer.write(batch)?),
WriterState::Writing => Err(DeltaTableError::Generic(
"Writer should be ready".to_string(),
)),
},
}
}

async fn flush_arrow_writer(&mut self) -> DeltaResult<()> {
// replace counter / buffers and close the current writer
let (writer, buffer) = self.reset_writer()?;
let metadata = writer.close()?;

let metadata = match &self.blocking_runtime {
Some(handle) => {
let writer = match writer {
WriterState::Ready(writer) => writer,
WriterState::Writing => {
return Err(DeltaTableError::Generic(
"Writer should be ready".to_string(),
))
}
};
let metadata = handle
.spawn_blocking(move || {
let metadata = writer.close()?;
Ok::<_, ArrowError>(metadata)
})
.await
.map_err(|e| {
DeltaTableError::Generic(format!("Error closing writer: {:?}", e))
})??;
metadata
}
None => match writer {
WriterState::Ready(writer) => writer.close()?,
WriterState::Writing => {
return Err(DeltaTableError::Generic(
"Writer should be ready".to_string(),
))
}
},
};

// don't write empty file
if metadata.num_rows == 0 {
return Ok(());
Expand Down Expand Up @@ -404,9 +509,13 @@ impl PartitionWriter {
let max_offset = batch.num_rows();
for offset in (0..max_offset).step_by(self.config.write_batch_size) {
let length = usize::min(self.config.write_batch_size, max_offset - offset);
self.write_batch(&batch.slice(offset, length))?;
self.write_batch(&batch.slice(offset, length)).await?;
// flush currently buffered data to disk once we meet or exceed the target file size.
let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size();
let in_progress_size = match &self.arrow_writer {
WriterState::Ready(writer) => writer.in_progress_size(),
WriterState::Writing => 0,
};
let estimated_size = self.buffer.len() + in_progress_size;
if estimated_size >= self.config.target_file_size {
debug!(
"Writing file with estimated size {:?} to disk.",
Expand Down
Loading