Skip to content

Commit

Permalink
refactor: improve design of DeleteFileManager
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Oct 31, 2024
1 parent d657429 commit df4e86a
Showing 1 changed file with 93 additions and 78 deletions.
171 changes: 93 additions & 78 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
//! Table scan api.
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Receiver, Sender};
Expand Down Expand Up @@ -292,7 +295,6 @@ impl<'a> TableScanBuilder<'a> {
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
delete_file_manager: Arc::new(DeleteFileManager::new()),
};

Ok(TableScan {
Expand Down Expand Up @@ -332,68 +334,6 @@ pub struct TableScan {
row_selection_enabled: bool,
}

/// Manages async retrieval of all of a given snapshot's delete files
/// from FileIO and then subsequently serving filtered references to them
/// up for inclusion within FileScanTasks
#[derive(Debug)]
struct DeleteFileManager {
file_scan_task_delete_files: RwLock<Option<Arc<Vec<FileScanTaskDeleteFile>>>>,
}

impl DeleteFileManager {
pub(crate) fn new() -> Self {
DeleteFileManager {
file_scan_task_delete_files: RwLock::new(None),
}
}

pub(crate) async fn handle_delete_file_stream(
&self,
delete_file_rx: Receiver<Result<FileScanTaskDeleteFile>>,
) -> Result<()> {
let mut delete_file_stream = delete_file_rx.boxed();

let mut delete_files = vec![];

while let Some(delete_file) = delete_file_stream.try_next().await? {
delete_files.push(delete_file);
}

if !delete_files.is_empty() {
let mut guard = self
.file_scan_task_delete_files
.write()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"DeleteFileManager RwLock was poisoned",
)
})
.unwrap();

*guard = Some(Arc::new(delete_files));
}

Ok(())
}

pub(crate) async fn get_deletes_for_data_file(
&self,
_data_file: &DataFile,
) -> Option<Arc<Vec<FileScanTaskDeleteFile>>> {
self.file_scan_task_delete_files
.read()
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
"DeleteFileManager RwLock was poisoned",
)
})
.unwrap()
.clone()
}
}

/// PlanContext wraps a [`SnapshotRef`] alongside all the other
/// objects that are required to perform a scan file plan.
#[derive(Debug)]
Expand All @@ -407,7 +347,6 @@ struct PlanContext {
snapshot_bound_predicate: Option<Arc<BoundPredicate>>,
object_cache: Arc<ObjectCache>,
field_ids: Arc<Vec<i32>>,
delete_file_manager: Arc<DeleteFileManager>,

partition_filter_cache: Arc<PartitionFilterCache>,
manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
Expand All @@ -425,10 +364,13 @@ impl TableScan {
channel(concurrency_limit_manifest_files);
let (manifest_entry_delete_ctx_tx, manifest_entry_delete_ctx_rx) =
channel(concurrency_limit_manifest_files);

// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

// used to stream delete files into the DeleteFileManager
let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries);
let delete_file_manager = Arc::new(DeleteFileManager::from_receiver(delete_file_rx));

let manifest_list = self.plan_context.get_manifest_list().await?;

Expand All @@ -439,7 +381,7 @@ impl TableScan {
manifest_list,
manifest_entry_data_ctx_tx,
manifest_entry_delete_ctx_tx,
self.plan_context.delete_file_manager.clone(),
delete_file_manager.clone(),
)?;

let mut channel_for_manifest_error = file_scan_task_tx.clone();
Expand Down Expand Up @@ -483,11 +425,6 @@ impl TableScan {
})
.await;

self.plan_context
.delete_file_manager
.handle_delete_file_stream(delete_file_rx)
.await?;

// Process the data file [`ManifestEntry`] stream in parallel
spawn(async move {
let result = manifest_entry_data_ctx_rx
Expand Down Expand Up @@ -586,7 +523,7 @@ impl TableScan {
// entire plan without getting filtered out. Create a corresponding
// FileScanTask and push it to the result stream
file_scan_task_tx
.send(Ok(manifest_entry_context.into_file_scan_task().await))
.send(Ok(manifest_entry_context.into_file_scan_task().await?))
.await?;

Ok(())
Expand Down Expand Up @@ -715,8 +652,13 @@ impl ManifestFileContext {
impl ManifestEntryContext {
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
/// created from it
async fn into_file_scan_task(self) -> FileScanTask {
FileScanTask {
async fn into_file_scan_task(self) -> Result<FileScanTask> {
let deletes = self
.delete_file_manager
.get_deletes_for_data_file(self.manifest_entry.data_file())
.await?;

Ok(FileScanTask {
start: 0,
length: self.manifest_entry.file_size_in_bytes(),
record_count: Some(self.manifest_entry.record_count()),
Expand All @@ -731,11 +673,8 @@ impl ManifestEntryContext {
.bound_predicates
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),

deletes: self
.delete_file_manager
.get_deletes_for_data_file(self.manifest_entry.data_file())
.await,
}
deletes,
})
}
}

Expand Down Expand Up @@ -1138,6 +1077,82 @@ impl FileScanTask {
}
}

type DeleteFileManagerResult = Result<Option<Arc<Vec<FileScanTaskDeleteFile>>>>;

/// Manages async retrieval of all the delete files from FileIO that are
/// applicable to the scan. Provides references to them for inclusion within FileScanTasks
#[derive(Debug, Clone)]
struct DeleteFileManager {
files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
}

#[derive(Debug, Clone)]
struct DeleteFileManagerFuture {
files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
}

impl Future for DeleteFileManagerFuture {
type Output = DeleteFileManagerResult;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let Ok(guard) = self.files.try_read() else {
return Poll::Pending;
};

if let Some(value) = guard.as_ref() {
Poll::Ready(match value.as_ref() {
Ok(deletes) => Ok(deletes.clone()),
Err(err) => Err(Error::new(err.kind(), err.message())),
})
} else {
Poll::Pending
}
}
}

impl DeleteFileManager {
pub(crate) fn from_receiver(receiver: Receiver<Result<FileScanTaskDeleteFile>>) -> Self {
let delete_file_stream = receiver.boxed();
let files = Arc::new(RwLock::new(None));

spawn({
let files = files.clone();
async move {
let _ = spawn(async move {
let result = delete_file_stream.try_collect::<Vec<_>>().await;
let result = result.map(|files| {
if files.is_empty() {
None
} else {
Some(Arc::new(files))
}
});

// Unwrap is ok here since this is the only place where a write lock
// can be acquired, so the lock can't already have been poisoned
let mut guard = files.write().unwrap();
*guard = Some(result);
})
.await;
}
});

DeleteFileManager { files }
}

pub(crate) fn get_deletes_for_data_file(
&self,
_data_file: &DataFile,
) -> DeleteFileManagerFuture {
// TODO: in the future we may want to filter out delete files
// that are not applicable to the DataFile?

DeleteFileManagerFuture {
files: self.files.clone(),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down

0 comments on commit df4e86a

Please sign in to comment.