Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support to append delete type data file #798

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 51 additions & 20 deletions crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use uuid::Uuid;
use crate::error::Result;
use crate::io::OutputFile;
use crate::spec::{
DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile,
ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType,
Summary, Transform, MAIN_BRANCH,
DataContentType, DataFile, DataFileFormat, FormatVersion, Manifest, ManifestContentType,
ManifestEntry, ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder,
Operation, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
Struct, StructType, Summary, Transform, MAIN_BRANCH,
};
use crate::table::Table;
use crate::TableUpdate::UpgradeFormatVersion;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl<'a> Transaction<'a> {
}

/// Commit transaction.
pub async fn commit(self, catalog: &impl Catalog) -> Result<Table> {
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
let table_commit = TableCommit::builder()
.ident(self.table.identifier().clone())
.updates(self.updates)
Expand Down Expand Up @@ -284,6 +284,7 @@ struct SnapshotProduceAction<'a> {
commit_uuid: Uuid,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
added_delete_files: Vec<DataFile>,
// A counter used to generate unique manifest file names.
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
Expand All @@ -304,6 +305,7 @@ impl<'a> SnapshotProduceAction<'a> {
commit_uuid,
snapshot_properties,
added_data_files: vec![],
added_delete_files: vec![],
manifest_counter: (0..),
key_metadata,
})
Expand Down Expand Up @@ -335,7 +337,12 @@ impl<'a> SnapshotProduceAction<'a> {
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition value is not compatitable partition type",
));
)
.with_context(
"partition value",
format!("{:?}", &value.as_primitive_literal().unwrap()),
)
.with_context("partition type", format!("{:?}", field.field_type)));
}
}
Ok(())
Expand All @@ -347,13 +354,7 @@ impl<'a> SnapshotProduceAction<'a> {
data_files: impl IntoIterator<Item = DataFile>,
) -> Result<&mut Self> {
let data_files: Vec<DataFile> = data_files.into_iter().collect();
for data_file in &data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
ErrorKind::DataInvalid,
"Only data content type is allowed for fast append",
));
}
for data_file in data_files {
Self::validate_partition_value(
data_file.partition(),
self.tx
Expand All @@ -362,8 +363,12 @@ impl<'a> SnapshotProduceAction<'a> {
.default_partition_spec()
.partition_type(),
)?;
if data_file.content_type() == DataContentType::Data {
self.added_data_files.push(data_file);
} else {
self.added_delete_files.push(data_file);
}
}
self.added_data_files.extend(data_files);
Ok(self)
}

Expand All @@ -380,8 +385,31 @@ impl<'a> SnapshotProduceAction<'a> {
}

// Write manifest file for added data files and return the ManifestFile for ManifestList.
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
async fn write_added_manifest(
&mut self,
added_data_files: Vec<DataFile>,
) -> Result<ManifestFile> {
let content_type = {
let mut data_num = 0;
let mut delete_num = 0;
for f in &added_data_files {
match f.content_type() {
DataContentType::Data => data_num += 1,
DataContentType::PositionDeletes => delete_num += 1,
DataContentType::EqualityDeletes => delete_num += 1,
}
}
if data_num == added_data_files.len() {
ManifestContentType::Data
} else if delete_num == added_data_files.len() {
ManifestContentType::Deletes
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
"added DataFile for a ManifestFile should be same type (Data or Delete)",
));
}
};
let manifest_entries = added_data_files
.into_iter()
.map(|data_file| {
Expand Down Expand Up @@ -410,7 +438,7 @@ impl<'a> SnapshotProduceAction<'a> {
.as_ref()
.clone(),
)
.content(crate::spec::ManifestContentType::Data)
.content(content_type)
.build();
let manifest = Manifest::new(manifest_meta, manifest_entries);
let writer = ManifestWriter::new(
Expand All @@ -426,12 +454,15 @@ impl<'a> SnapshotProduceAction<'a> {
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
let added_manifest = self.write_added_manifest().await?;
let data_files = std::mem::take(&mut self.added_data_files);
let delete_files = std::mem::take(&mut self.added_delete_files);
let added_manifest = self.write_added_manifest(data_files).await?;
let added_delete_manifest = self.write_added_manifest(delete_files).await?;
let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?;

// # TODO
// Support process delete entries.

let mut manifest_files = vec![added_manifest];
let mut manifest_files = vec![added_manifest, added_delete_manifest];
manifest_files.extend(existing_manifests);
let manifest_files = manifest_process.process_manifeset(manifest_files);
Ok(manifest_files)
Expand Down
Loading