Skip to content

Commit

Permalink
refactor: DeleteFileManager -> DeleteFileIndex, moved to own file
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Dec 11, 2024
1 parent 88f2c83 commit 2ff526f
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 94 deletions.
95 changes: 95 additions & 0 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use futures::channel::mpsc::Receiver;
use futures::{StreamExt, TryStreamExt};

use crate::runtime::spawn;
use crate::scan::FileScanTaskDeleteFile;
use crate::spec::DataFile;
use crate::{Error, Result};

type DeleteFileIndexResult = Result<Option<Arc<Vec<FileScanTaskDeleteFile>>>>;

/// Safely shareable on-demand-populated delete file index.
///
/// Constructed during the file plan phase of a table scan from a channel that will
/// receive the details of all delete files that can possibly apply to a scan.
/// Asynchronously retrieves and lazily processes of all the delete files from FileIO
/// concurrently whilst the plan proceeds with collating all of the applicable data files.
/// Awaited on when constructing the resulting FileScanTask for each selected data file
/// in order to populate the list of delete files to include in each `FileScanTask`.
#[derive(Debug, Clone)]
pub(crate) struct DeleteFileIndex {
files: Arc<RwLock<Option<DeleteFileIndexResult>>>,
}

#[derive(Debug, Clone)]
pub(crate) struct DeleteFileIndexFuture {
files: Arc<RwLock<Option<DeleteFileIndexResult>>>,
}

impl Future for DeleteFileIndexFuture {
type Output = DeleteFileIndexResult;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let Ok(guard) = self.files.try_read() else {
return Poll::Pending;
};

if let Some(value) = guard.as_ref() {
Poll::Ready(match value.as_ref() {
Ok(deletes) => Ok(deletes.clone()),
Err(err) => Err(Error::new(err.kind(), err.message())),
})
} else {
Poll::Pending
}
}
}

impl DeleteFileIndex {
pub(crate) fn from_receiver(receiver: Receiver<Result<FileScanTaskDeleteFile>>) -> Self {
let delete_file_stream = receiver.boxed();
let files = Arc::new(RwLock::new(None));

// spawn a task to handle accumulating all the DeleteFiles that are streamed into the
// index through the receiver channel. Update the `None` inside the `RwLock` to a `Some`
// once the stream has been exhausted so that any consumers awaiting on the Future returned
// by DeleteFileIndex::get_deletes_for_data_file can proceed
spawn({
let files = files.clone();
async move {
let _ = spawn(async move {
let result = delete_file_stream.try_collect::<Vec<_>>().await;
let result = result.map(|files| {
if files.is_empty() {
None
} else {
Some(Arc::new(files))
}
});

// Unwrap is ok here since this is the only place where a write lock
// can be acquired, so the lock can't already have been poisoned
let mut guard = files.write().unwrap();
*guard = Some(result);
})
.await;
}
});

DeleteFileIndex { files }
}

/// Asynchronously determines all the delete files that apply to the provided `DataFile`.
pub(crate) fn get_deletes_for_data_file(&self, _data_file: &DataFile) -> DeleteFileIndexFuture {
// TODO: filtering

DeleteFileIndexFuture {
files: self.files.clone(),
}
}
}
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,6 @@ pub mod transform;
mod runtime;

pub mod arrow;
pub(crate) mod delete_file_index;
mod utils;
pub mod writer;
110 changes: 16 additions & 94 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@
//! Table scan api.
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use arrow_array::RecordBatch;
use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::channel::mpsc::{channel, Sender};
use futures::stream::BoxStream;
use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use serde::{Deserialize, Serialize};

use crate::arrow::ArrowReaderBuilder;
use crate::delete_file_index::DeleteFileIndex;
use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
Expand All @@ -39,7 +37,7 @@ use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, DataFile, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
Expand Down Expand Up @@ -368,9 +366,9 @@ impl TableScan {
// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

// used to stream delete files into the DeleteFileManager
// used to stream delete files into the DeleteFileIndex
let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries);
let delete_file_manager = DeleteFileManager::from_receiver(delete_file_rx);
let delete_file_index = DeleteFileIndex::from_receiver(delete_file_rx);

let manifest_list = self.plan_context.get_manifest_list().await?;

Expand All @@ -381,7 +379,7 @@ impl TableScan {
manifest_list,
manifest_entry_data_ctx_tx,
manifest_entry_delete_ctx_tx,
delete_file_manager.clone(),
delete_file_index.clone(),
)?;

let mut channel_for_manifest_error = file_scan_task_tx.clone();
Expand Down Expand Up @@ -593,7 +591,7 @@ struct ManifestFileContext {
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_manager: DeleteFileManager,
delete_file_index: DeleteFileIndex,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
Expand All @@ -606,7 +604,7 @@ struct ManifestEntryContext {
bound_predicates: Option<Arc<BoundPredicates>>,
partition_spec_id: i32,
snapshot_schema: SchemaRef,
delete_file_manager: DeleteFileManager,
delete_file_index: DeleteFileIndex,
}

impl ManifestFileContext {
Expand All @@ -621,7 +619,7 @@ impl ManifestFileContext {
field_ids,
mut sender,
expression_evaluator_cache,
delete_file_manager,
delete_file_index,
..
} = self;

Expand All @@ -636,7 +634,7 @@ impl ManifestFileContext {
partition_spec_id: manifest_file.partition_spec_id,
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
delete_file_manager: delete_file_manager.clone(),
delete_file_index: delete_file_index.clone(),
};

sender
Expand All @@ -654,7 +652,7 @@ impl ManifestEntryContext {
/// created from it
async fn into_file_scan_task(self) -> Result<FileScanTask> {
let deletes = self
.delete_file_manager
.delete_file_index
.get_deletes_for_data_file(self.manifest_entry.data_file())
.await?;

Expand Down Expand Up @@ -712,7 +710,7 @@ impl PlanContext {
manifest_list: Arc<ManifestList>,
sender_data: Sender<ManifestEntryContext>,
sender_delete: Sender<ManifestEntryContext>,
delete_file_manager: DeleteFileManager,
delete_file_index: DeleteFileIndex,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
let manifest_files = manifest_list.entries().iter();

Expand Down Expand Up @@ -741,7 +739,7 @@ impl PlanContext {
} else {
sender_delete.clone()
},
delete_file_manager.clone(),
delete_file_index.clone(),
);

filtered_mfcs.push(Ok(mfc));
Expand All @@ -757,7 +755,7 @@ impl PlanContext {
} else {
sender_delete.clone()
},
delete_file_manager.clone(),
delete_file_index.clone(),
);

filtered_mfcs.push(Ok(mfc));
Expand All @@ -772,7 +770,7 @@ impl PlanContext {
manifest_file: &ManifestFile,
partition_filter: Option<Arc<BoundPredicate>>,
sender: Sender<ManifestEntryContext>,
delete_file_manager: DeleteFileManager,
delete_file_index: DeleteFileIndex,
) -> ManifestFileContext {
let bound_predicates =
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
Expand All @@ -794,7 +792,7 @@ impl PlanContext {
snapshot_schema: self.snapshot_schema.clone(),
field_ids: self.field_ids.clone(),
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_manager,
delete_file_index,
}
}
}
Expand Down Expand Up @@ -1077,82 +1075,6 @@ impl FileScanTask {
}
}

type DeleteFileManagerResult = Result<Option<Arc<Vec<FileScanTaskDeleteFile>>>>;

/// Manages async retrieval of all the delete files from FileIO that are
/// applicable to the scan. Provides references to them for inclusion within FileScanTasks
#[derive(Debug, Clone)]
struct DeleteFileManager {
files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
}

#[derive(Debug, Clone)]
struct DeleteFileManagerFuture {
files: Arc<RwLock<Option<DeleteFileManagerResult>>>,
}

impl Future for DeleteFileManagerFuture {
type Output = DeleteFileManagerResult;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let Ok(guard) = self.files.try_read() else {
return Poll::Pending;
};

if let Some(value) = guard.as_ref() {
Poll::Ready(match value.as_ref() {
Ok(deletes) => Ok(deletes.clone()),
Err(err) => Err(Error::new(err.kind(), err.message())),
})
} else {
Poll::Pending
}
}
}

impl DeleteFileManager {
pub(crate) fn from_receiver(receiver: Receiver<Result<FileScanTaskDeleteFile>>) -> Self {
let delete_file_stream = receiver.boxed();
let files = Arc::new(RwLock::new(None));

spawn({
let files = files.clone();
async move {
let _ = spawn(async move {
let result = delete_file_stream.try_collect::<Vec<_>>().await;
let result = result.map(|files| {
if files.is_empty() {
None
} else {
Some(Arc::new(files))
}
});

// Unwrap is ok here since this is the only place where a write lock
// can be acquired, so the lock can't already have been poisoned
let mut guard = files.write().unwrap();
*guard = Some(result);
})
.await;
}
});

DeleteFileManager { files }
}

pub(crate) fn get_deletes_for_data_file(
&self,
_data_file: &DataFile,
) -> DeleteFileManagerFuture {
// TODO: in the future we may want to filter out delete files
// that are not applicable to the DataFile?

DeleteFileManagerFuture {
files: self.files.clone(),
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down

0 comments on commit 2ff526f

Please sign in to comment.