diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 3c9d3bda97..6964e46877 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -130,6 +130,8 @@ pub struct DeltaWriter { config: WriterConfig, /// partition writers for individual partitions partition_writers: HashMap, + /// Optional runtime for Parquet blocking IO + blocking_runtime: Option, } impl DeltaWriter { @@ -139,25 +141,45 @@ impl DeltaWriter { object_store, config, partition_writers: HashMap::new(), + blocking_runtime: None, } } + /// Sets runtime used for blocking Parquet IO + pub fn with_blocking_runtime(mut self, runtime: tokio::runtime::Handle) -> Self { + self.blocking_runtime = Some(runtime); + self + } + /// Apply custom writer_properties to the underlying parquet writer pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self { self.config.writer_properties = writer_properties; self } - fn divide_by_partition_values( + async fn divide_by_partition_values( &mut self, values: &RecordBatch, ) -> DeltaResult> { - Ok(divide_by_partition_values( - self.config.file_schema(), - self.config.partition_columns.clone(), - values, - ) - .map_err(|err| WriteError::Partitioning(err.to_string()))?) + Ok(match &self.blocking_runtime { + Some(handle) => { + let values = values.clone(); + let schema = self.config.file_schema(); + let partition_cols = self.config.partition_columns.clone(); + handle + .spawn_blocking(move || { + divide_by_partition_values(schema, partition_cols, &values) + }) + .await + .map_err(|e| WriteError::Partitioning(e.to_string()))? + } + None => divide_by_partition_values( + self.config.file_schema(), + self.config.partition_columns.clone(), + values, + ), + } + .map_err(|e| WriteError::Partitioning(e.to_string()))?) } /// Write a batch to the partition induced by the partition_values. The record batch is expected @@ -185,12 +207,17 @@ impl DeltaWriter { Some(self.config.target_file_size), Some(self.config.write_batch_size), )?; - let mut writer = PartitionWriter::try_with_config( + let writer = PartitionWriter::try_with_config( self.object_store.clone(), config, self.config.num_indexed_cols, self.config.stats_columns.clone(), )?; + let mut writer = match &self.blocking_runtime { + Some(runtime) => writer.with_blocking_runtime(runtime.clone()), + None => writer, + }; + writer.write(&record_batch).await?; let _ = self.partition_writers.insert(partition_key, writer); } @@ -205,7 +232,7 @@ impl DeltaWriter { /// The `close` method has to be invoked to write all data still buffered /// and get the list of all written files. pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> { - for result in self.divide_by_partition_values(batch)? { + for result in self.divide_by_partition_values(batch).await? { self.write_partition(result.record_batch, &result.partition_values) .await?; } @@ -274,6 +301,13 @@ impl PartitionWriterConfig { } } +#[derive(Debug, Default)] +enum WriterState { + Ready(ArrowWriter), + #[default] + Writing, +} + /// Partition writer implementation /// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files. /// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes. @@ -284,13 +318,14 @@ pub struct PartitionWriter { writer_id: uuid::Uuid, config: PartitionWriterConfig, buffer: ShareableBuffer, - arrow_writer: ArrowWriter, + arrow_writer: WriterState, part_counter: usize, files_written: Vec, /// Num index cols to collect stats for num_indexed_cols: i32, /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols stats_columns: Option>, + blocking_runtime: Option, } impl PartitionWriter { @@ -308,6 +343,8 @@ impl PartitionWriter { Some(config.writer_properties.clone()), )?; + let arrow_writer = WriterState::Ready(arrow_writer); + Ok(Self { object_store, writer_id: uuid::Uuid::new_v4(), @@ -318,9 +355,16 @@ impl PartitionWriter { files_written: Vec::new(), num_indexed_cols, stats_columns, + blocking_runtime: None, }) } + /// Sets runtime used for blocking Parquet IO + pub fn with_blocking_runtime(mut self, runtime: tokio::runtime::Handle) -> Self { + self.blocking_runtime = Some(runtime); + self + } + fn next_data_path(&mut self) -> Path { self.part_counter += 1; @@ -332,27 +376,88 @@ impl PartitionWriter { ) } - fn reset_writer(&mut self) -> DeltaResult<(ArrowWriter, ShareableBuffer)> { + fn reset_writer(&mut self) -> DeltaResult<(WriterState, ShareableBuffer)> { let new_buffer = ShareableBuffer::default(); let arrow_writer = ArrowWriter::try_new( new_buffer.clone(), self.config.file_schema.clone(), Some(self.config.writer_properties.clone()), )?; + let arrow_writer = WriterState::Ready(arrow_writer); Ok(( std::mem::replace(&mut self.arrow_writer, arrow_writer), std::mem::replace(&mut self.buffer, new_buffer), )) } - fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> { - Ok(self.arrow_writer.write(batch)?) + async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> { + match &self.blocking_runtime { + Some(handle) => { + let writer = std::mem::take(&mut self.arrow_writer); + match writer { + WriterState::Ready(mut writer) => { + let task_batch = batch.clone(); + let writer = handle + .spawn_blocking(move || { + writer.write(&task_batch)?; + Ok::<_, ArrowError>(writer) + }) + .await + .map_err(|e| { + DeltaTableError::Generic(format!("Error writing batch: {:?}", e)) + })??; + self.arrow_writer = WriterState::Ready(writer); + Ok(()) + } + WriterState::Writing => Err(DeltaTableError::Generic( + "Writer should be ready".to_string(), + )), + } + } + None => match &mut self.arrow_writer { + WriterState::Ready(writer) => Ok(writer.write(batch)?), + WriterState::Writing => Err(DeltaTableError::Generic( + "Writer should be ready".to_string(), + )), + }, + } } async fn flush_arrow_writer(&mut self) -> DeltaResult<()> { // replace counter / buffers and close the current writer let (writer, buffer) = self.reset_writer()?; - let metadata = writer.close()?; + + let metadata = match &self.blocking_runtime { + Some(handle) => { + let writer = match writer { + WriterState::Ready(writer) => writer, + WriterState::Writing => { + return Err(DeltaTableError::Generic( + "Writer should be ready".to_string(), + )) + } + }; + let metadata = handle + .spawn_blocking(move || { + let metadata = writer.close()?; + Ok::<_, ArrowError>(metadata) + }) + .await + .map_err(|e| { + DeltaTableError::Generic(format!("Error closing writer: {:?}", e)) + })??; + metadata + } + None => match writer { + WriterState::Ready(writer) => writer.close()?, + WriterState::Writing => { + return Err(DeltaTableError::Generic( + "Writer should be ready".to_string(), + )) + } + }, + }; + // don't write empty file if metadata.num_rows == 0 { return Ok(()); @@ -404,9 +509,13 @@ impl PartitionWriter { let max_offset = batch.num_rows(); for offset in (0..max_offset).step_by(self.config.write_batch_size) { let length = usize::min(self.config.write_batch_size, max_offset - offset); - self.write_batch(&batch.slice(offset, length))?; + self.write_batch(&batch.slice(offset, length)).await?; // flush currently buffered data to disk once we meet or exceed the target file size. - let estimated_size = self.buffer.len() + self.arrow_writer.in_progress_size(); + let in_progress_size = match &self.arrow_writer { + WriterState::Ready(writer) => writer.in_progress_size(), + WriterState::Writing => 0, + }; + let estimated_size = self.buffer.len() + in_progress_size; if estimated_size >= self.config.target_file_size { debug!( "Writing file with estimated size {:?} to disk.",