diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index edf1a8596..1e1d61da8 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -28,10 +28,10 @@ use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, - Summary, Transform, MAIN_BRANCH, + DataContentType, DataFile, DataFileFormat, FormatVersion, Manifest, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + Struct, StructType, Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; @@ -161,7 +161,7 @@ impl<'a> Transaction<'a> { } /// Commit transaction. - pub async fn commit(self, catalog: &impl Catalog) -> Result { + pub async fn commit(self, catalog: &dyn Catalog) -> Result
{ let table_commit = TableCommit::builder() .ident(self.table.identifier().clone()) .updates(self.updates) @@ -284,6 +284,7 @@ struct SnapshotProduceAction<'a> { commit_uuid: Uuid, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -304,6 +305,7 @@ impl<'a> SnapshotProduceAction<'a> { commit_uuid, snapshot_properties, added_data_files: vec![], + added_delete_files: vec![], manifest_counter: (0..), key_metadata, }) @@ -335,7 +337,12 @@ impl<'a> SnapshotProduceAction<'a> { return Err(Error::new( ErrorKind::DataInvalid, "Partition value is not compatitable partition type", - )); + ) + .with_context( + "partition value", + format!("{:?}", &value.as_primitive_literal().unwrap()), + ) + .with_context("partition type", format!("{:?}", field.field_type))); } } Ok(()) @@ -347,13 +354,7 @@ impl<'a> SnapshotProduceAction<'a> { data_files: impl IntoIterator, ) -> Result<&mut Self> { let data_files: Vec = data_files.into_iter().collect(); - for data_file in &data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } + for data_file in data_files { Self::validate_partition_value( data_file.partition(), self.tx @@ -362,8 +363,12 @@ impl<'a> SnapshotProduceAction<'a> { .default_partition_spec() .partition_type(), )?; + if data_file.content_type() == DataContentType::Data { + self.added_data_files.push(data_file); + } else { + self.added_delete_files.push(data_file); + } } - self.added_data_files.extend(data_files); Ok(self) } @@ -380,8 +385,31 @@ impl<'a> SnapshotProduceAction<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); + async fn write_added_manifest( + &mut self, + added_data_files: Vec, + ) -> Result { + let content_type = { + let mut data_num = 0; + let mut delete_num = 0; + for f in &added_data_files { + match f.content_type() { + DataContentType::Data => data_num += 1, + DataContentType::PositionDeletes => delete_num += 1, + DataContentType::EqualityDeletes => delete_num += 1, + } + } + if data_num == added_data_files.len() { + ManifestContentType::Data + } else if delete_num == added_data_files.len() { + ManifestContentType::Deletes + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "added DataFile for a ManifestFile should be same type (Data or Delete)", + )); + } + }; let manifest_entries = added_data_files .into_iter() .map(|data_file| { @@ -410,7 +438,7 @@ impl<'a> SnapshotProduceAction<'a> { .as_ref() .clone(), ) - .content(crate::spec::ManifestContentType::Data) + .content(content_type) .build(); let manifest = Manifest::new(manifest_meta, manifest_entries); let writer = ManifestWriter::new( @@ -426,12 +454,15 @@ impl<'a> SnapshotProduceAction<'a> { snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; + let data_files = std::mem::take(&mut self.added_data_files); + let delete_files = std::mem::take(&mut self.added_delete_files); + let added_manifest = self.write_added_manifest(data_files).await?; + let added_delete_manifest = self.write_added_manifest(delete_files).await?; let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; + // # TODO // Support process delete entries. - - let mut manifest_files = vec![added_manifest]; + let mut manifest_files = vec![added_manifest, added_delete_manifest]; manifest_files.extend(existing_manifests); let manifest_files = manifest_process.process_manifeset(manifest_files); Ok(manifest_files)