diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs new file mode 100644 index 000000000..f1a07d20d --- /dev/null +++ b/crates/iceberg/src/delete_file_index.rs @@ -0,0 +1,95 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; + +use futures::channel::mpsc::Receiver; +use futures::{StreamExt, TryStreamExt}; + +use crate::runtime::spawn; +use crate::scan::FileScanTaskDeleteFile; +use crate::spec::DataFile; +use crate::{Error, Result}; + +type DeleteFileIndexResult = Result>>>; + +/// Safely shareable on-demand-populated delete file index. +/// +/// Constructed during the file plan phase of a table scan from a channel that will +/// receive the details of all delete files that can possibly apply to a scan. +/// Asynchronously retrieves and lazily processes of all the delete files from FileIO +/// concurrently whilst the plan proceeds with collating all of the applicable data files. +/// Awaited on when constructing the resulting FileScanTask for each selected data file +/// in order to populate the list of delete files to include in each `FileScanTask`. +#[derive(Debug, Clone)] +pub(crate) struct DeleteFileIndex { + files: Arc>>, +} + +#[derive(Debug, Clone)] +pub(crate) struct DeleteFileIndexFuture { + files: Arc>>, +} + +impl Future for DeleteFileIndexFuture { + type Output = DeleteFileIndexResult; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + let Ok(guard) = self.files.try_read() else { + return Poll::Pending; + }; + + if let Some(value) = guard.as_ref() { + Poll::Ready(match value.as_ref() { + Ok(deletes) => Ok(deletes.clone()), + Err(err) => Err(Error::new(err.kind(), err.message())), + }) + } else { + Poll::Pending + } + } +} + +impl DeleteFileIndex { + pub(crate) fn from_receiver(receiver: Receiver>) -> Self { + let delete_file_stream = receiver.boxed(); + let files = Arc::new(RwLock::new(None)); + + // spawn a task to handle accumulating all the DeleteFiles that are streamed into the + // index through the receiver channel. Update the `None` inside the `RwLock` to a `Some` + // once the stream has been exhausted so that any consumers awaiting on the Future returned + // by DeleteFileIndex::get_deletes_for_data_file can proceed + spawn({ + let files = files.clone(); + async move { + let _ = spawn(async move { + let result = delete_file_stream.try_collect::>().await; + let result = result.map(|files| { + if files.is_empty() { + None + } else { + Some(Arc::new(files)) + } + }); + + // Unwrap is ok here since this is the only place where a write lock + // can be acquired, so the lock can't already have been poisoned + let mut guard = files.write().unwrap(); + *guard = Some(result); + }) + .await; + } + }); + + DeleteFileIndex { files } + } + + /// Asynchronously determines all the delete files that apply to the provided `DataFile`. + pub(crate) fn get_deletes_for_data_file(&self, _data_file: &DataFile) -> DeleteFileIndexFuture { + // TODO: filtering + + DeleteFileIndexFuture { + files: self.files.clone(), + } + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 72cf18d4b..0dea11399 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -82,5 +82,6 @@ pub mod transform; mod runtime; pub mod arrow; +pub(crate) mod delete_file_index; mod utils; pub mod writer; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 4112342ec..9f74e6c61 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -18,18 +18,16 @@ //! Table scan api. use std::collections::HashMap; -use std::future::Future; -use std::pin::Pin; use std::sync::{Arc, RwLock}; -use std::task::{Context, Poll}; use arrow_array::RecordBatch; -use futures::channel::mpsc::{channel, Receiver, Sender}; +use futures::channel::mpsc::{channel, Sender}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use crate::arrow::ArrowReaderBuilder; +use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::visitors::inclusive_projection::InclusiveProjection; @@ -39,7 +37,7 @@ use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile, + DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; @@ -368,9 +366,9 @@ impl TableScan { // used to stream the results back to the caller let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries); - // used to stream delete files into the DeleteFileManager + // used to stream delete files into the DeleteFileIndex let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries); - let delete_file_manager = DeleteFileManager::from_receiver(delete_file_rx); + let delete_file_index = DeleteFileIndex::from_receiver(delete_file_rx); let manifest_list = self.plan_context.get_manifest_list().await?; @@ -381,7 +379,7 @@ impl TableScan { manifest_list, manifest_entry_data_ctx_tx, manifest_entry_delete_ctx_tx, - delete_file_manager.clone(), + delete_file_index.clone(), )?; let mut channel_for_manifest_error = file_scan_task_tx.clone(); @@ -593,7 +591,7 @@ struct ManifestFileContext { object_cache: Arc, snapshot_schema: SchemaRef, expression_evaluator_cache: Arc, - delete_file_manager: DeleteFileManager, + delete_file_index: DeleteFileIndex, } /// Wraps a [`ManifestEntryRef`] alongside the objects that are needed @@ -606,7 +604,7 @@ struct ManifestEntryContext { bound_predicates: Option>, partition_spec_id: i32, snapshot_schema: SchemaRef, - delete_file_manager: DeleteFileManager, + delete_file_index: DeleteFileIndex, } impl ManifestFileContext { @@ -621,7 +619,7 @@ impl ManifestFileContext { field_ids, mut sender, expression_evaluator_cache, - delete_file_manager, + delete_file_index, .. } = self; @@ -636,7 +634,7 @@ impl ManifestFileContext { partition_spec_id: manifest_file.partition_spec_id, bound_predicates: bound_predicates.clone(), snapshot_schema: snapshot_schema.clone(), - delete_file_manager: delete_file_manager.clone(), + delete_file_index: delete_file_index.clone(), }; sender @@ -654,7 +652,7 @@ impl ManifestEntryContext { /// created from it async fn into_file_scan_task(self) -> Result { let deletes = self - .delete_file_manager + .delete_file_index .get_deletes_for_data_file(self.manifest_entry.data_file()) .await?; @@ -712,7 +710,7 @@ impl PlanContext { manifest_list: Arc, sender_data: Sender, sender_delete: Sender, - delete_file_manager: DeleteFileManager, + delete_file_index: DeleteFileIndex, ) -> Result>>> { let manifest_files = manifest_list.entries().iter(); @@ -741,7 +739,7 @@ impl PlanContext { } else { sender_delete.clone() }, - delete_file_manager.clone(), + delete_file_index.clone(), ); filtered_mfcs.push(Ok(mfc)); @@ -757,7 +755,7 @@ impl PlanContext { } else { sender_delete.clone() }, - delete_file_manager.clone(), + delete_file_index.clone(), ); filtered_mfcs.push(Ok(mfc)); @@ -772,7 +770,7 @@ impl PlanContext { manifest_file: &ManifestFile, partition_filter: Option>, sender: Sender, - delete_file_manager: DeleteFileManager, + delete_file_index: DeleteFileIndex, ) -> ManifestFileContext { let bound_predicates = if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) = @@ -794,7 +792,7 @@ impl PlanContext { snapshot_schema: self.snapshot_schema.clone(), field_ids: self.field_ids.clone(), expression_evaluator_cache: self.expression_evaluator_cache.clone(), - delete_file_manager, + delete_file_index, } } } @@ -1077,82 +1075,6 @@ impl FileScanTask { } } -type DeleteFileManagerResult = Result>>>; - -/// Manages async retrieval of all the delete files from FileIO that are -/// applicable to the scan. Provides references to them for inclusion within FileScanTasks -#[derive(Debug, Clone)] -struct DeleteFileManager { - files: Arc>>, -} - -#[derive(Debug, Clone)] -struct DeleteFileManagerFuture { - files: Arc>>, -} - -impl Future for DeleteFileManagerFuture { - type Output = DeleteFileManagerResult; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - let Ok(guard) = self.files.try_read() else { - return Poll::Pending; - }; - - if let Some(value) = guard.as_ref() { - Poll::Ready(match value.as_ref() { - Ok(deletes) => Ok(deletes.clone()), - Err(err) => Err(Error::new(err.kind(), err.message())), - }) - } else { - Poll::Pending - } - } -} - -impl DeleteFileManager { - pub(crate) fn from_receiver(receiver: Receiver>) -> Self { - let delete_file_stream = receiver.boxed(); - let files = Arc::new(RwLock::new(None)); - - spawn({ - let files = files.clone(); - async move { - let _ = spawn(async move { - let result = delete_file_stream.try_collect::>().await; - let result = result.map(|files| { - if files.is_empty() { - None - } else { - Some(Arc::new(files)) - } - }); - - // Unwrap is ok here since this is the only place where a write lock - // can be acquired, so the lock can't already have been poisoned - let mut guard = files.write().unwrap(); - *guard = Some(result); - }) - .await; - } - }); - - DeleteFileManager { files } - } - - pub(crate) fn get_deletes_for_data_file( - &self, - _data_file: &DataFile, - ) -> DeleteFileManagerFuture { - // TODO: in the future we may want to filter out delete files - // that are not applicable to the DataFile? - - DeleteFileManagerFuture { - files: self.files.clone(), - } - } -} - #[cfg(test)] mod tests { use std::collections::HashMap;