Skip to content

Commit

Permalink
feat: global delete support, DeleteFileIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Dec 24, 2024
1 parent a3b34b2 commit 22ef190
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 80 deletions.
156 changes: 77 additions & 79 deletions crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use futures::StreamExt;
use crate::runtime::spawn;
use crate::scan::{DeleteFileContext, FileScanTaskDeleteFile};
use crate::spec::{DataContentType, DataFile, Struct};
use crate::{Error, ErrorKind, Result};

/// Index of delete files
#[derive(Clone, Debug)]
Expand All @@ -47,7 +48,10 @@ struct PopulatedDeleteFileIndex {
global_deletes: Vec<Arc<DeleteFileContext>>,
eq_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>>,
pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,
// TODO: do we need this?
// pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>>,

// TODO: Deletion Vector support
}

impl DeleteFileIndex {
Expand Down Expand Up @@ -75,7 +79,7 @@ impl DeleteFileIndex {

/// Gets all the delete files that apply to the specified data file.
///
/// Returns a future that resolves to a Vec<FileScanTaskDeleteFile>
/// Returns a future that resolves to a Result<Vec<FileScanTaskDeleteFile>>
pub(crate) fn get_deletes_for_data_file<'a>(
&self,
data_file: &'a DataFile,
Expand All @@ -95,60 +99,41 @@ impl PopulatedDeleteFileIndex {
HashMap::default();
let mut pos_deletes_by_partition: HashMap<Struct, Vec<Arc<DeleteFileContext>>> =
HashMap::default();
let mut pos_deletes_by_path: HashMap<String, Vec<Arc<DeleteFileContext>>> =
HashMap::default();

files.into_iter().for_each(|del_file_ctx| {
let arc_del_file_ctx = Arc::new(del_file_ctx);
match arc_del_file_ctx.manifest_entry.content_type() {
DataContentType::PositionDeletes => {
// TODO: implement logic from ContentFileUtil.referencedDataFile
// see https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L54
let referenced_data_file_path = "TODO".to_string();

pos_deletes_by_path
.entry(referenced_data_file_path)
.and_modify(|entry| {
entry.push(arc_del_file_ctx.clone());
})
.or_insert(vec![arc_del_file_ctx.clone()]);

pos_deletes_by_partition
.entry(
arc_del_file_ctx
.manifest_entry
.data_file()
.partition()
.clone(),
)
.and_modify(|entry| {
entry.push(arc_del_file_ctx.clone());
})
.or_insert(vec![arc_del_file_ctx.clone()]);
}
DataContentType::EqualityDeletes => {
eq_deletes_by_partition
.entry(
arc_del_file_ctx
.manifest_entry
.data_file()
.partition()
.clone(),
)
.and_modify(|entry| {
entry.push(arc_del_file_ctx.clone());
})
.or_insert(vec![arc_del_file_ctx.clone()]);
let mut global_deletes: Vec<Arc<DeleteFileContext>> = vec![];

files.into_iter().for_each(|ctx| {
let arc_ctx = Arc::new(ctx);

let partition = arc_ctx.manifest_entry.data_file().partition();

// The spec states that "Equality delete files stored with an unpartitioned spec are applied as global deletes".
if partition.fields().is_empty() {
// TODO: confirm we're good to skip here if we encounter a pos del
if arc_ctx.manifest_entry.content_type() != DataContentType::PositionDeletes {
global_deletes.push(arc_ctx);
return;
}
_ => unreachable!(),
}

let destination_map = match arc_ctx.manifest_entry.content_type() {
DataContentType::PositionDeletes => &mut pos_deletes_by_partition,
DataContentType::EqualityDeletes => &mut eq_deletes_by_partition,
_ => unreachable!(),
};

destination_map
.entry(partition.clone())
.and_modify(|entry| {
entry.push(arc_ctx.clone());
})
.or_insert(vec![arc_ctx.clone()]);
});

PopulatedDeleteFileIndex {
global_deletes: vec![],
global_deletes,
eq_deletes_by_partition,
pos_deletes_by_partition,
pos_deletes_by_path,
}
}

Expand All @@ -158,33 +143,47 @@ impl PopulatedDeleteFileIndex {
data_file: &DataFile,
seq_num: Option<i64>,
) -> Vec<FileScanTaskDeleteFile> {
let mut deletes_queue = vec![];

if let Some(deletes) = self.pos_deletes_by_path.get(data_file.file_path()) {
deletes_queue.extend(deletes.iter());
}

if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
deletes_queue.extend(deletes.iter());
}
let mut results = vec![];

if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
deletes_queue.extend(deletes.iter());
}

deletes_queue
self.global_deletes
.iter()
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.unwrap_or_else(|| true)
})
.map(|delete| FileScanTaskDeleteFile {
file_path: delete.manifest_entry.file_path().to_string(),
file_type: delete.manifest_entry.content_type(),
partition_spec_id: delete.partition_spec_id,
})
.collect()
.for_each(|delete| results.push(delete.as_ref().into()));

if let Some(deletes) = self.eq_deletes_by_partition.get(data_file.partition()) {
deletes
.iter()
// filter that returns true if the provided delete file's sequence number is **greater than or equal to** `seq_num`
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() >= Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| results.push(delete.as_ref().into()));
}

// TODO: the spec states that:
// "The data file's file_path is equal to the delete file's referenced_data_file if it is non-null".
// we're not yet doing that here. The referenced data file's name will also be present in the positional
// delete file's file path column.
if let Some(deletes) = self.pos_deletes_by_partition.get(data_file.partition()) {
deletes
.iter()
// filter that returns true if the provided delete file's sequence number is **greater thano** `seq_num`
.filter(|&delete| {
seq_num
.map(|seq_num| delete.manifest_entry.sequence_number() > Some(seq_num))
.unwrap_or_else(|| true)
})
.for_each(|delete| results.push(delete.as_ref().into()));
}

results
}
}

Expand All @@ -196,18 +195,17 @@ pub(crate) struct DeletesForDataFile<'a> {
}

impl Future for DeletesForDataFile<'_> {
type Output = Vec<FileScanTaskDeleteFile>;
type Output = Result<Vec<FileScanTaskDeleteFile>>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let Ok(guard) = self.state.try_read() else {
return Poll::Pending;
};

match guard.deref() {
DeleteFileIndexState::Populated(idx) => {
Poll::Ready(idx.get_deletes_for_data_file(self.data_file, self.seq_num))
}
_ => Poll::Pending,
match self.state.try_read() {
Ok(guard) => match guard.deref() {
DeleteFileIndexState::Populated(idx) => Poll::Ready(Ok(
idx.get_deletes_for_data_file(self.data_file, self.seq_num)
)),
_ => Poll::Pending,
},
Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Unexpected, err.to_string()))),
}
}
}
12 changes: 11 additions & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ impl ManifestEntryContext {
self.manifest_entry.data_file(),
self.manifest_entry.sequence_number(),
)
.await
.await?
} else {
vec![]
};
Expand Down Expand Up @@ -1088,6 +1088,16 @@ pub(crate) struct DeleteFileContext {
pub(crate) partition_spec_id: i32,
}

impl From<&DeleteFileContext> for FileScanTaskDeleteFile {
fn from(ctx: &DeleteFileContext) -> Self {
FileScanTaskDeleteFile {
file_path: ctx.manifest_entry.file_path().to_string(),
file_type: ctx.manifest_entry.content_type(),
partition_spec_id: ctx.partition_spec_id,
}
}
}

impl FileScanTask {
/// Returns the data file path of this file scan task.
pub fn data_file_path(&self) -> &str {
Expand Down

0 comments on commit 22ef190

Please sign in to comment.