Skip to content

Commit

Permalink
add delete file support for transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 13, 2024
1 parent fe784ad commit 90115e0
Showing 1 changed file with 60 additions and 20 deletions.
80 changes: 60 additions & 20 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use uuid::Uuid;
use crate::error::Result;
use crate::io::OutputFile;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile,
ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType,
Summary, Transform, MAIN_BRANCH,
DataContentType, DataFile, DataFileFormat, FormatVersion, Manifest, ManifestContentType,
ManifestEntry, ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder,
Operation, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
Struct, StructType, Summary, Transform, MAIN_BRANCH,
};
use crate::table::Table;
use crate::TableUpdate::UpgradeFormatVersion;
Expand Down Expand Up @@ -170,6 +170,17 @@ impl<'a> Transaction<'a> {

catalog.update_table(table_commit).await
}

/// Commit transaction with dynamic catalog.
pub async fn commit_dyn(self, catalog: &dyn Catalog) -> Result<Table> {
let table_commit = TableCommit::builder()
.ident(self.table.identifier().clone())
.updates(self.updates)
.requirements(self.requirements)
.build();

catalog.update_table(table_commit).await
}
}

/// FastAppendAction is a transaction action for fast append data files to the table.
Expand Down Expand Up @@ -284,6 +295,7 @@ struct SnapshotProduceAction<'a> {
commit_uuid: Uuid,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
added_delete_files: Vec<DataFile>,
// A counter used to generate unique manifest file names.
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
Expand All @@ -304,6 +316,7 @@ impl<'a> SnapshotProduceAction<'a> {
commit_uuid,
snapshot_properties,
added_data_files: vec![],
added_delete_files: vec![],
manifest_counter: (0..),
key_metadata,
})
Expand Down Expand Up @@ -335,7 +348,12 @@ impl<'a> SnapshotProduceAction<'a> {
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatitable partition type",
));
)
.with_context(
"partition value",
format!("{:?}", &value.as_primitive_literal().unwrap()),
)
.with_context("partition type", format!("{:?}", field.field_type)));
}
}
Ok(())
Expand All @@ -347,13 +365,7 @@ impl<'a> SnapshotProduceAction<'a> {
data_files: impl IntoIterator<Item = DataFile>,
) -> Result<&mut Self> {
let data_files: Vec<DataFile> = data_files.into_iter().collect();
for data_file in &data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
"Only data content type is allowed for fast append",
));
}
for data_file in data_files {
Self::validate_partition_value(
data_file.partition(),
self.tx
Expand All @@ -362,8 +374,12 @@ impl<'a> SnapshotProduceAction<'a> {
.default_partition_spec()
.partition_type(),
)?;
if data_file.content_type() == DataContentType::Data {
self.added_data_files.push(data_file);
} else {
self.added_delete_files.push(data_file);
}
}
self.added_data_files.extend(data_files);
Ok(self)
}

Expand All @@ -380,8 +396,31 @@ impl<'a> SnapshotProduceAction<'a> {
}

// Write manifest file for added data files and return the ManifestFile for ManifestList.
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
async fn write_added_manifest(
&mut self,
added_data_files: Vec<DataFile>,
) -> Result<ManifestFile> {
let content_type = {
let mut data_num = 0;
let mut delete_num = 0;
for f in &added_data_files {
match f.content_type() {
DataContentType::Data => data_num = data_num + 1,
DataContentType::PositionDeletes => delete_num = delete_num + 1,
DataContentType::EqualityDeletes => delete_num = delete_num + 1,
}
}
if data_num == added_data_files.len() {
ManifestContentType::Data
} else if delete_num == added_data_files.len() {
ManifestContentType::Deletes
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
"added DataFile for a ManifestFile should be same type (Data or Delete)",
));
}
};
let manifest_entries = added_data_files
.into_iter()
.map(|data_file| {
Expand Down Expand Up @@ -410,7 +449,7 @@ impl<'a> SnapshotProduceAction<'a> {
.as_ref()
.clone(),
)
.content(crate::spec::ManifestContentType::Data)
.content(content_type)
.build();
let manifest = Manifest::new(manifest_meta, manifest_entries);
let writer = ManifestWriter::new(
Expand All @@ -426,12 +465,13 @@ impl<'a> SnapshotProduceAction<'a> {
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
let added_manifest = self.write_added_manifest().await?;
let data_files = std::mem::take(&mut self.added_data_files);
let delete_files = std::mem::take(&mut self.added_delete_files);
let added_manifest = self.write_added_manifest(data_files).await?;
let added_delete_manifest = self.write_added_manifest(delete_files).await?;
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;
// # TODO
// Support process delete entries.

let mut manifest_files = vec![added_manifest];
let mut manifest_files = vec![added_manifest, added_delete_manifest];
manifest_files.extend(existing_manifests);
let manifest_files = manifest_process.process_manifeset(manifest_files);
Ok(manifest_files)
Expand Down

0 comments on commit 90115e0

Please sign in to comment.