Skip to content

Commit

Permalink
refactor: DeleteFileIndex cloneable, with an awaitable method to get …
Browse files Browse the repository at this point in the history
…deletes for data file.
  • Loading branch information
sdd committed Dec 23, 2024
1 parent b41a6b1 commit 0b8ce7c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 66 deletions.
105 changes: 81 additions & 24 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,81 @@
// under the License.

use std::collections::HashMap;
use std::sync::Arc;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};

use futures::channel::mpsc;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::watch;
use futures::channel::mpsc::{channel, Sender};
use futures::StreamExt;

use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, DataFile, Struct};
use crate::Result;

type DeleteFileIndexRef = Arc<Result<DeleteFileIndex>>;
pub(crate) type DeleteFileIndexRefReceiver = watch::Receiver<Option<DeleteFileIndexRef>>;
#[derive(Debug)]
enum DeleteFileIndexState {
Populating,
Populated(PopulatedDeleteFileIndex),
}

/// Index of delete files
#[derive(Debug)]
pub(crate) struct DeleteFileIndex {
struct PopulatedDeleteFileIndex {
#[allow(dead_code)]
global_deletes: Vec<Arc<DeleteFileContext>>,
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
}

/// Index of delete files
#[derive(Clone, Debug)]
pub(crate) struct DeleteFileIndex {
state: Arc<RwLock<DeleteFileIndexState>>,
}

impl DeleteFileIndex {
pub(crate) fn from_del_file_chan(
receiver: mpsc::Receiver<Result<DeleteFileContext>>,
) -> watch::Receiver<Option<DeleteFileIndexRef>> {
let (tx, rx) = watch::channel(None);

let delete_file_stream = receiver.boxed();
spawn(async move {
let delete_files = delete_file_stream.try_collect::<Vec<_>>().await;
let delete_file_index = delete_files.map(DeleteFileIndex::from_delete_files);
let delete_file_index = Arc::new(delete_file_index);
tx.send(Some(delete_file_index))
/// create a new `DeleteFileIndex` along with the sender that populates it with delete files
pub(crate) fn new() -> (DeleteFileIndex, Sender<DeleteFileContext>) {
// TODO: what should the channel limit be?
let (tx, rx) = channel(10);
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating));
let delete_file_stream = rx.boxed();

spawn({
let state = state.clone();
async move {
let delete_files = delete_file_stream.collect::<Vec<_>>().await;

let populated_delete_file_index = PopulatedDeleteFileIndex::new(delete_files);

let mut guard = state.write().unwrap();
*guard = DeleteFileIndexState::Populated(populated_delete_file_index);
}
});

rx
(DeleteFileIndex { state }, tx)
}

/// Gets all the delete files that apply to the specified data file.
///
/// Returns a future that resolves to a Vec<FileScanTaskDeleteFile>
pub(crate) fn get_deletes_for_data_file<'a>(
&self,
data_file: &'a DataFile,
seq_num: Option<i64>,
) -> DeletesForDataFile<'a> {
DeletesForDataFile {
state: self.state.clone(),
data_file,
seq_num,
}
}
}

fn from_delete_files(files: Vec<DeleteFileContext>) -> Self {
impl PopulatedDeleteFileIndex {
fn new(files: Vec<DeleteFileContext>) -> PopulatedDeleteFileIndex {
let mut eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
Expand Down Expand Up @@ -111,7 +144,7 @@ impl DeleteFileIndex {
}
});

DeleteFileIndex {
PopulatedDeleteFileIndex {
global_deletes: vec![],
eq_deletes_by_partition,
pos_deletes_by_partition,
Expand All @@ -120,7 +153,7 @@ impl DeleteFileIndex {
}

/// Determine all the delete files that apply to the provided `DataFile`.
pub(crate) fn get_deletes_for_data_file(
fn get_deletes_for_data_file(
&self,
data_file: &DataFile,
seq_num: Option<i64>,
Expand Down Expand Up @@ -158,3 +191,27 @@ impl DeleteFileIndex {
results
}
}

/// Future for the `DeleteFileIndex::get_deletes_for_data_file` method
pub(crate) struct DeletesForDataFile<'a> {
state: Arc<RwLock<DeleteFileIndexState>>,
data_file: &'a DataFile,
seq_num: Option<i64>,
}

impl Future for DeletesForDataFile<'_> {
type Output = Vec<FileScanTaskDeleteFile>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let Ok(guard) = self.state.try_read() else {
return Poll::Pending;
};

match guard.deref() {
DeleteFileIndexState::Populated(idx) => {
Poll::Ready(idx.get_deletes_for_data_file(self.data_file, self.seq_num))
}
_ => Poll::Pending,
}
}
}
62 changes: 20 additions & 42 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use serde::{Deserialize, Serialize};

use crate::arrow::ArrowReaderBuilder;
use crate::delete_file_index::{DeleteFileIndex, DeleteFileIndexRefReceiver};
use crate::delete_file_index::DeleteFileIndex;
use crate::expr::visitors::expression_evaluator::ExpressionEvaluator;
use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator;
use crate::expr::visitors::inclusive_projection::InclusiveProjection;
Expand Down Expand Up @@ -385,18 +385,12 @@ impl TableScan {
// used to stream the results back to the caller
let (file_scan_task_tx, file_scan_task_rx) = channel(concurrency_limit_manifest_entries);

// DeleteFileIndexRefReceiver is a watch channel receiver that will
// be notified when the DeleteFileIndex is ready.
let delete_file_idx_and_tx: Option<(
DeleteFileIndexRefReceiver,
Sender<Result<DeleteFileContext>>,
)> = if self.delete_file_processing_enabled {
let (delete_file_tx, delete_file_rx) = channel(concurrency_limit_manifest_entries);
let delete_file_index_rx = DeleteFileIndex::from_del_file_chan(delete_file_rx);
Some((delete_file_index_rx, delete_file_tx))
} else {
None
};
let delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<DeleteFileContext>)> =
if self.delete_file_processing_enabled {
Some(DeleteFileIndex::new())
} else {
None
};

let manifest_list = self.plan_context.get_manifest_list().await?;

Expand Down Expand Up @@ -562,7 +556,7 @@ impl TableScan {

async fn process_delete_manifest_entry(
manifest_entry_context: ManifestEntryContext,
mut delete_file_ctx_tx: Sender<Result<DeleteFileContext>>,
mut delete_file_ctx_tx: Sender<DeleteFileContext>,
) -> Result<()> {
// skip processing this manifest entry if it has been marked as deleted
if !manifest_entry_context.manifest_entry.is_alive() {
Expand Down Expand Up @@ -594,10 +588,10 @@ impl TableScan {
}

delete_file_ctx_tx
.send(Ok(DeleteFileContext {
.send(DeleteFileContext {
manifest_entry: manifest_entry_context.manifest_entry.clone(),
partition_spec_id: manifest_entry_context.partition_spec_id,
}))
})
.await?;

Ok(())
Expand All @@ -621,7 +615,7 @@ struct ManifestFileContext {
object_cache: Arc<ObjectCache>,
snapshot_schema: SchemaRef,
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: Option<DeleteFileIndexRefReceiver>,
delete_file_index: Option<DeleteFileIndex>,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
Expand All @@ -634,7 +628,7 @@ struct ManifestEntryContext {
bound_predicates: Option<Arc<BoundPredicates>>,
partition_spec_id: i32,
snapshot_schema: SchemaRef,
delete_file_index: Option<DeleteFileIndexRefReceiver>,
delete_file_index: Option<DeleteFileIndex>,
}

impl ManifestFileContext {
Expand Down Expand Up @@ -681,29 +675,13 @@ impl ManifestEntryContext {
/// consume this `ManifestEntryContext`, returning a `FileScanTask`
/// created from it
async fn into_file_scan_task(self) -> Result<FileScanTask> {
// let deletes = self.get_deletes().await?;

let deletes = if let Some(mut delete_file_index_rx) = self.delete_file_index {
let del_file_idx_opt = delete_file_index_rx
.wait_for(Option::is_some)
let deletes = if let Some(delete_file_index) = self.delete_file_index {
delete_file_index
.get_deletes_for_data_file(
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
)
.await
.map_err(|_| Error::new(ErrorKind::Unexpected, "DeleteFileIndex recv error"))?;

match del_file_idx_opt.as_ref() {
Some(del_file_idx) => match del_file_idx.as_ref() {
Ok(delete_file_idx) => delete_file_idx.get_deletes_for_data_file(
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
),
Err(err) => {
return Err(Error::new(ErrorKind::Unexpected, err.message()));
}
},

// the `wait_for(Option::is_some)` above means that we can
// never get a `None` here
None => unreachable!(),
}
} else {
vec![]
};
Expand Down Expand Up @@ -761,7 +739,7 @@ impl PlanContext {
&self,
manifest_list: Arc<ManifestList>,
tx_data: Sender<ManifestEntryContext>,
delete_file_idx_and_tx: Option<(DeleteFileIndexRefReceiver, Sender<ManifestEntryContext>)>,
delete_file_idx_and_tx: Option<(DeleteFileIndex, Sender<ManifestEntryContext>)>,
) -> Result<Box<impl Iterator<Item = Result<ManifestFileContext>>>> {
let manifest_files = manifest_list.entries().iter();

Expand Down Expand Up @@ -820,7 +798,7 @@ impl PlanContext {
manifest_file: &ManifestFile,
partition_filter: Option<Arc<BoundPredicate>>,
sender: Sender<ManifestEntryContext>,
delete_file_index: Option<DeleteFileIndexRefReceiver>,
delete_file_index: Option<DeleteFileIndex>,
) -> ManifestFileContext {
let bound_predicates =
if let (Some(ref partition_bound_predicate), Some(snapshot_bound_predicate)) =
Expand Down

0 comments on commit 0b8ce7c

Please sign in to comment.