diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index 0467d43a8b..d86ef6f3e5 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -432,7 +432,7 @@ impl MergePlan { Some(task_parameters.input_parameters.target_size as usize), None, )?; - let mut writer = PartitionWriter::try_with_config(object_store.clone(), writer_config)?; + let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?; let mut read_stream = read_stream.await?; @@ -478,19 +478,7 @@ impl MergePlan { let object_store_ref = context.object_store.clone(); // Read all batches into a vec - let batches: Vec = futures::stream::iter(files.clone()) - .then(|file| { - let object_store_ref = object_store_ref.clone(); - async move { - let file_reader = ParquetObjectReader::new(object_store_ref.clone(), file); - ParquetRecordBatchStreamBuilder::new(file_reader) - .await? - .build() - } - }) - .try_flatten() - .try_collect::>() - .await?; + let batches = zorder::collect_batches(object_store_ref, files).await?; // For each batch, compute the zorder key let zorder_keys: Vec = @@ -608,7 +596,7 @@ impl MergePlan { for file in files.iter() { debug!(" file {}", file.location); } - let object_store_ref = log_store.object_store().clone(); + let object_store_ref = log_store.object_store(); let batch_stream = futures::stream::iter(files.clone()) .then(move |file| { let object_store_ref = object_store_ref.clone(); @@ -636,33 +624,30 @@ impl MergePlan { #[cfg(not(feature = "datafusion"))] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - log_store.object_store().clone(), + log_store.object_store(), // If there aren't enough bins to use all threads, then instead // use threads within the bins. This is important for the case where // the table is un-partitioned, in which case the entire table is just // one big bin. bins.len() <= num_cpus::get(), )); - let object_store = log_store.object_store().clone(); #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - object_store.clone(), + log_store.object_store(), max_spill_size, )?); let task_parameters = self.task_parameters.clone(); + let log_store = log_store.clone(); futures::stream::iter(bins) .map(move |(partition, files)| { let batch_stream = Self::read_zorder(files.clone(), exec_context.clone()); - - let object_store = object_store.clone(); - let rewrite_result = tokio::task::spawn(Self::rewrite_files( task_parameters.clone(), partition, files, - object_store, + log_store.object_store(), batch_stream, )); util::flatten_join_error(rewrite_result) @@ -1107,6 +1092,26 @@ pub(super) mod zorder { } } + /// Read all batches into a vec - is an async function in disguise + #[cfg(not(feature = "datafusion"))] + pub(super) fn collect_batches( + object_store: ObjectStoreRef, + files: MergeBin, + ) -> impl Future, ParquetError>> { + futures::stream::iter(files.clone()) + .then(move |file| { + let object_store = object_store.clone(); + async move { + let file_reader = ParquetObjectReader::new(object_store.clone(), file); + ParquetRecordBatchStreamBuilder::new(file_reader) + .await? + .build() + } + }) + .try_flatten() + .try_collect::>() + } + #[cfg(feature = "datafusion")] pub use self::datafusion::ZOrderExecContext;