diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 60036e7f9..baf49a51e 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,7 +18,10 @@ //! Table scan api. use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; use arrow_array::RecordBatch; use futures::channel::mpsc::{channel, Receiver, Sender}; @@ -292,7 +295,6 @@ impl<'a> TableScanBuilder<'a> { partition_filter_cache: Arc::new(PartitionFilterCache::new()), manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()), expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()), - delete_file_manager: Arc::new(DeleteFileManager::new()), }; Ok(TableScan { @@ -332,68 +334,6 @@ pub struct TableScan { row_selection_enabled: bool, } -/// Manages async retrieval of all of a given snapshot's delete files -/// from FileIO and then subsequently serving filtered references to them -/// up for inclusion within FileScanTasks -#[derive(Debug)] -struct DeleteFileManager { - file_scan_task_delete_files: RwLock>>>, -} - -impl DeleteFileManager { - pub(crate) fn new() -> Self { - DeleteFileManager { - file_scan_task_delete_files: RwLock::new(None), - } - } - - pub(crate) async fn handle_delete_file_stream( - &self, - delete_file_rx: Receiver>, - ) -> Result<()> { - let mut delete_file_stream = delete_file_rx.boxed(); - - let mut delete_files = vec![]; - - while let Some(delete_file) = delete_file_stream.try_next().await? { - delete_files.push(delete_file); - } - - if !delete_files.is_empty() { - let mut guard = self - .file_scan_task_delete_files - .write() - .map_err(|_| { - Error::new( - ErrorKind::Unexpected, - "DeleteFileManager RwLock was poisoned", - ) - }) - .unwrap(); - - *guard = Some(Arc::new(delete_files)); - } - - Ok(()) - } - - pub(crate) async fn get_deletes_for_data_file( - &self, - _data_file: &DataFile, - ) -> Option>> { - self.file_scan_task_delete_files - .read() - .map_err(|_| { - Error::new( - ErrorKind::Unexpected, - "DeleteFileManager RwLock was poisoned", - ) - }) - .unwrap() - .clone() - } -} - /// PlanContext wraps a [`SnapshotRef`] alongside all the other /// objects that are required to perform a scan file plan. #[derive(Debug)] @@ -407,7 +347,6 @@ struct PlanContext { snapshot_bound_predicate: Option>, object_cache: Arc, field_ids: Arc>, - delete_file_manager: Arc, partition_filter_cache: Arc, manifest_evaluator_cache: Arc, @@ -425,10 +364,13 @@ impl TableScan { channel(concurrency_limit_manifest_files); let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) = channel(concurrency_limit_manifest_files); + // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); + // used to stream delete files into the DeleteFileManager let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries); + let delete_file_manager = Arc::new(DeleteFileManager::from_receiver(delete_file_rx)); let manifest_list = self.plan_context.get_manifest_list().await?; @@ -439,7 +381,7 @@ impl TableScan { manifest_list, manifest_entry_data_ctx_tx, manifest_entry_delete_ctx_tx, - self.plan_context.delete_file_manager.clone(), + delete_file_manager.clone(), )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -483,11 +425,6 @@ impl TableScan { }) .await; - self.plan_context - .delete_file_manager - .handle_delete_file_stream(delete_file_rx) - .await?; - // Process the data file [`ManifestEntry`] stream in parallel spawn(async move { let result = manifest_entry_data_ctx_rx @@ -586,7 +523,7 @@ impl TableScan { // entire plan without getting filtered out. Create a corresponding // FileScanTask and push it to the result stream file_scan_task_tx - .send(Ok(manifest_entry_context.into_file_scan_task().await)) + .send(Ok(manifest_entry_context.into_file_scan_task().await?)) .await?; Ok(()) @@ -715,8 +652,13 @@ impl ManifestFileContext { impl ManifestEntryContext { /// consume this `ManifestEntryContext`, returning a `FileScanTask` /// created from it - async fn into_file_scan_task(self) -> FileScanTask { - FileScanTask { + async fn into_file_scan_task(self) -> Result { + let deletes = self + .delete_file_manager + .get_deletes_for_data_file(self.manifest_entry.data_file()) + .await?; + + Ok(FileScanTask { start: 0, length: self.manifest_entry.file_size_in_bytes(), record_count: Some(self.manifest_entry.record_count()), @@ -731,11 +673,8 @@ impl ManifestEntryContext { .bound_predicates .map(|x| x.as_ref().snapshot_bound_predicate.clone()), - deletes: self - .delete_file_manager - .get_deletes_for_data_file(self.manifest_entry.data_file()) - .await, - } + deletes, + }) } } @@ -1138,6 +1077,82 @@ impl FileScanTask { } } +type DeleteFileManagerResult = Result>>>; + +/// Manages async retrieval of all the delete files from FileIO that are +/// applicable to the scan. Provides references to them for inclusion within FileScanTasks +#[derive(Debug, Clone)] +struct DeleteFileManager { + files: Arc>>, +} + +#[derive(Debug, Clone)] +struct DeleteFileManagerFuture { + files: Arc>>, +} + +impl Future for DeleteFileManagerFuture { + type Output = DeleteFileManagerResult; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let Ok(guard) = self.files.try_read() else { + return Poll::Pending; + }; + + if let Some(value) = guard.as_ref() { + Poll::Ready(match value.as_ref() { + Ok(deletes) => Ok(deletes.clone()), + Err(err) => Err(Error::new(err.kind(), err.message())), + }) + } else { + Poll::Pending + } + } +} + +impl DeleteFileManager { + pub(crate) fn from_receiver(receiver: Receiver>) -> Self { + let delete_file_stream = receiver.boxed(); + let files = Arc::new(RwLock::new(None)); + + spawn({ + let files = files.clone(); + async move { + let _ = spawn(async move { + let result = delete_file_stream.try_collect::>().await; + let result = result.map(|files| { + if files.is_empty() { + None + } else { + Some(Arc::new(files)) + } + }); + + // Unwrap is ok here since this is the only place where a write lock + // can be acquired, so the lock can't already have been poisoned + let mut guard = files.write().unwrap(); + *guard = Some(result); + }) + .await; + } + }); + + DeleteFileManager { files } + } + + pub(crate) fn get_deletes_for_data_file( + &self, + _data_file: &DataFile, + ) -> DeleteFileManagerFuture { + // TODO: in the future we may want to filter out delete files + // that are not applicable to the DataFile? + + DeleteFileManagerFuture { + files: self.files.clone(), + } + } +} + #[cfg(test)] mod tests { use std::collections::HashMap;