From 491d60fcc9f32d2157ba88775baa70eae56eec9f Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 19 Dec 2024 23:21:52 +0800 Subject: [PATCH] refine interface of manifest writer: 1. adopt factory method to build different type manifest writer 2. provide add, exist, delete method --- crates/iceberg/src/io/object_cache.rs | 55 +- crates/iceberg/src/scan.rs | 37 +- crates/iceberg/src/spec/manifest.rs | 757 +++++++++++++++++++------- crates/iceberg/src/transaction.rs | 68 ++- 4 files changed, 643 insertions(+), 274 deletions(-) diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 6ea7594ba..0dd1ac493 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -191,9 +191,8 @@ mod tests { use super::*; use crate::io::{FileIO, OutputFile}; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, - ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, Struct, TableMetadata, + DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, + ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata, }; use crate::table::Table; use crate::TableIdent; @@ -263,37 +262,33 @@ mod tests { let current_partition_spec = self.table.metadata().default_partition_spec(); // Write data files - let data_file_manifest = ManifestWriter::new( + let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), current_snapshot.snapshot_id(), vec![], + current_schema.clone(), + current_partition_spec.as_ref().clone(), ) - .write(Manifest::new( - ManifestMetadata::builder() - .schema(current_schema.clone()) - .content(ManifestContentType::Data) - .format_version(FormatVersion::V2) - .partition_spec((**current_partition_spec).clone()) - .schema_id(current_schema.schema_id()) - .build(), - vec![ManifestEntry::builder() - .status(ManifestStatus::Added) - .data_file( - DataFileBuilder::default() - .content(DataContentType::Data) - .file_path(format!("{}/1.parquet", &self.table_location)) - .file_format(DataFileFormat::Parquet) - .file_size_in_bytes(100) - .record_count(1) - .partition(Struct::from_iter([Some(Literal::long(100))])) - .key_metadata(None) - .build() - .unwrap(), - ) - .build()], - )) - .await - .unwrap(); + .build_v2_data(); + writer + .add( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &self.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + let data_file_manifest = writer.to_manifest_file().await.unwrap(); // Write to manifest list let mut manifest_list_write = ManifestListWriter::v2( diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 89cc21bbf..8e1ce3f20 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -974,9 +974,9 @@ mod tests { use crate::io::{FileIO, OutputFile}; use crate::scan::FileScanTask; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest, - ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type, + DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, + ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PrimitiveType, + Schema, Struct, TableMetadata, Type, }; use crate::table::Table; use crate::TableIdent; @@ -1049,20 +1049,16 @@ mod tests { let current_partition_spec = self.table.metadata().default_partition_spec(); // Write data files - let data_file_manifest = ManifestWriter::new( + let mut writer = ManifestWriterBuilder::new( self.next_manifest_file(), current_snapshot.snapshot_id(), vec![], + current_schema.clone(), + current_partition_spec.as_ref().clone(), ) - .write(Manifest::new( - ManifestMetadata::builder() - .schema(current_schema.clone()) - .content(ManifestContentType::Data) - .format_version(FormatVersion::V2) - .partition_spec((**current_partition_spec).clone()) - .schema_id(current_schema.schema_id()) - .build(), - vec![ + .build_v2_data(); + writer + .add( ManifestEntry::builder() .status(ManifestStatus::Added) .data_file( @@ -1078,6 +1074,10 @@ mod tests { .unwrap(), ) .build(), + ) + .unwrap(); + writer + .delete( ManifestEntry::builder() .status(ManifestStatus::Deleted) .snapshot_id(parent_snapshot.snapshot_id()) @@ -1095,6 +1095,10 @@ mod tests { .unwrap(), ) .build(), + ) + .unwrap(); + writer + .existing( ManifestEntry::builder() .status(ManifestStatus::Existing) .snapshot_id(parent_snapshot.snapshot_id()) @@ -1112,10 +1116,9 @@ mod tests { .unwrap(), ) .build(), - ], - )) - .await - .unwrap(); + ) + .unwrap(); + let data_file_manifest = writer.to_manifest_file().await.unwrap(); // Write to manifest list let mut manifest_list_write = ManifestListWriter::v2( diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 086c63080..0798c6c53 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -113,6 +113,70 @@ impl Manifest { } } +/// The builder used to create a [`ManifestWriter`]. +pub struct ManifestWriterBuilder { + output: OutputFile, + snapshot_id: i64, + key_metadata: Vec, + schema: SchemaRef, + partition_spec: PartitionSpec, +} + +impl ManifestWriterBuilder { + /// Create a new builder. + pub fn new( + output: OutputFile, + snapshot_id: i64, + key_metadata: Vec, + schema: SchemaRef, + partition_spec: PartitionSpec, + ) -> Self { + Self { + output, + snapshot_id, + key_metadata, + schema, + partition_spec, + } + } + + /// Build a [`ManifestWriter`] for format version 1. + pub fn build_v1(self) -> ManifestWriter { + let metadata = ManifestMetadata::builder() + .schema_id(self.schema.schema_id()) + .schema(self.schema) + .partition_spec(self.partition_spec) + .format_version(FormatVersion::V1) + .content(ManifestContentType::Data) + .build(); + ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata) + } + + /// Build a [`ManifestWriter`] for format version 2, data content. + pub fn build_v2_data(self) -> ManifestWriter { + let metadata = ManifestMetadata::builder() + .schema_id(self.schema.schema_id()) + .schema(self.schema) + .partition_spec(self.partition_spec) + .format_version(FormatVersion::V2) + .content(ManifestContentType::Data) + .build(); + ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata) + } + + /// Build a [`ManifestWriter`] for format version 2, deletes content. + pub fn build_v2_deletes(self) -> ManifestWriter { + let metadata = ManifestMetadata::builder() + .schema_id(self.schema.schema_id()) + .schema(self.schema) + .partition_spec(self.partition_spec) + .format_version(FormatVersion::V2) + .content(ManifestContentType::Deletes) + .build(); + ManifestWriter::new(self.output, self.snapshot_id, self.key_metadata, metadata) + } +} + /// A manifest writer. pub struct ManifestWriter { output: OutputFile, @@ -130,7 +194,9 @@ pub struct ManifestWriter { key_metadata: Vec, - partitions: Vec, + manifset_entries: Vec, + + metadata: ManifestMetadata, } struct PartitionFieldStats { @@ -197,7 +263,12 @@ impl PartitionFieldStats { impl ManifestWriter { /// Create a new manifest writer. - pub fn new(output: OutputFile, snapshot_id: i64, key_metadata: Vec) -> Self { + pub(crate) fn new( + output: OutputFile, + snapshot_id: i64, + key_metadata: Vec, + metadata: ManifestMetadata, + ) -> Self { Self { output, snapshot_id, @@ -209,7 +280,8 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - partitions: vec![], + manifset_entries: Vec::new(), + metadata, } } @@ -217,14 +289,13 @@ impl ManifestWriter { &mut self, partition_type: &StructType, ) -> Result> { - let partitions = std::mem::take(&mut self.partitions); let mut field_stats: Vec<_> = partition_type .fields() .iter() .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) .collect(); - for partition in partitions { - for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + for partition in self.manifset_entries.iter().map(|e| &e.data_file.partition) { + for (literal, stat) in partition.iter().zip_eq(field_stats.iter_mut()) { let primitive_literal = literal.map(|v| v.as_primitive_literal().unwrap()); stat.update(primitive_literal)?; } @@ -232,15 +303,118 @@ impl ManifestWriter { Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } - /// Write a manifest. - pub async fn write(mut self, manifest: Manifest) -> Result { + fn check_entry(&self, entry: &ManifestEntry) -> Result<()> { + match self.metadata.content { + ManifestContentType::Data => { + if entry.data_file.content != DataContentType::Data { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Content type of entry {:?} should have DataContentType::Data", + entry.data_file.content + ), + )); + } + } + ManifestContentType::Deletes => { + if entry.data_file.content != DataContentType::EqualityDeletes + && entry.data_file.content != DataContentType::PositionDeletes + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Content type of entry {:?} should have DataContentType::EqualityDeletes or DataContentType::PositionDeletes", entry.data_file.content), + )); + } + } + } + Ok(()) + } + + /// Add a new manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Added` + /// - Set the snapshot id to the current snapshot id + /// - Set the sequence number to `None` if it is invalid(smaller than 0) + /// - Set the file sequence number to `None` + pub fn add(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_entry(&entry)?; + if entry.sequence_number().is_some_and(|n| n >= 0) { + entry.status = ManifestStatus::Added; + entry.snapshot_id = Some(self.snapshot_id); + entry.file_sequence_number = None; + } else { + entry.status = ManifestStatus::Added; + entry.snapshot_id = Some(self.snapshot_id); + entry.sequence_number = None; + entry.file_sequence_number = None; + }; + self.add_entry(entry)?; + Ok(()) + } + + /// Add a delete manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Deleted` + /// - Set the snapshot id to the current snapshot id + pub fn delete(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_entry(&entry)?; + entry.status = ManifestStatus::Deleted; + entry.snapshot_id = Some(self.snapshot_id); + self.add_entry(entry)?; + Ok(()) + } + + /// Add an existing manifest entry. This method will update following status of the entry: + /// - Update the entry status to `Existing` + pub fn existing(&mut self, mut entry: ManifestEntry) -> Result<()> { + self.check_entry(&entry)?; + entry.status = ManifestStatus::Existing; + self.add_entry(entry)?; + Ok(()) + } + + fn add_entry(&mut self, entry: ManifestEntry) -> Result<()> { + // Check if the entry has sequence number + if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) + && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + "Manifest entry with status Existing or Deleted should have sequence number", + )); + } + + // Update the statistics + match entry.status { + ManifestStatus::Added => { + self.added_files += 1; + self.added_rows += entry.data_file.record_count; + } + ManifestStatus::Deleted => { + self.deleted_files += 1; + self.deleted_rows += entry.data_file.record_count; + } + ManifestStatus::Existing => { + self.existing_files += 1; + self.existing_rows += entry.data_file.record_count; + } + } + if entry.is_alive() { + if let Some(seq_num) = entry.sequence_number { + self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num))); + } + } + self.manifset_entries.push(entry); + Ok(()) + } + + /// Write manifest file and return it. + pub async fn to_manifest_file(mut self) -> Result { // Create the avro writer - let partition_type = manifest + let partition_type = self .metadata .partition_spec - .partition_type(&manifest.metadata.schema)?; - let table_schema = &manifest.metadata.schema; - let avro_schema = match manifest.metadata.format_version { + .partition_type(&self.metadata.schema)?; + let table_schema = &self.metadata.schema; + let avro_schema = match self.metadata.format_version { FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?, }; @@ -258,69 +432,36 @@ impl ManifestWriter { )?; avro_writer.add_user_metadata( "partition-spec".to_string(), - to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| { + to_vec(&self.metadata.partition_spec.fields()).map_err(|err| { Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec") .with_source(err) })?, )?; avro_writer.add_user_metadata( "partition-spec-id".to_string(), - manifest.metadata.partition_spec.spec_id().to_string(), + self.metadata.partition_spec.spec_id().to_string(), )?; avro_writer.add_user_metadata( "format-version".to_string(), - (manifest.metadata.format_version as u8).to_string(), + (self.metadata.format_version as u8).to_string(), )?; - if manifest.metadata.format_version == FormatVersion::V2 { + if self.metadata.format_version == FormatVersion::V2 { avro_writer - .add_user_metadata("content".to_string(), manifest.metadata.content.to_string())?; + .add_user_metadata("content".to_string(), self.metadata.content.to_string())?; } + let partition_summary = self.construct_partition_summaries(&partition_type)?; // Write manifest entries - for entry in manifest.entries { - if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) - && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) - { - return Err(Error::new( - ErrorKind::DataInvalid, - "Manifest entry with status Existing or Deleted should have sequence number", - )); - } - - match entry.status { - ManifestStatus::Added => { - self.added_files += 1; - self.added_rows += entry.data_file.record_count; - } - ManifestStatus::Deleted => { - self.deleted_files += 1; - self.deleted_rows += entry.data_file.record_count; + for entry in std::mem::take(&mut self.manifset_entries) { + let value = match self.metadata.format_version { + FormatVersion::V1 => { + to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? } - ManifestStatus::Existing => { - self.existing_files += 1; - self.existing_rows += entry.data_file.record_count; - } - } - - if entry.is_alive() { - if let Some(seq_num) = entry.sequence_number { - self.min_seq_num = Some(self.min_seq_num.map_or(seq_num, |v| min(v, seq_num))); + FormatVersion::V2 => { + to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)? + .resolve(&avro_schema)? } - } - - self.partitions.push(entry.data_file.partition.clone()); - - let value = match manifest.metadata.format_version { - FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( - (*entry).clone(), - &partition_type, - )?)? - .resolve(&avro_schema)?, - FormatVersion::V2 => to_value(_serde::ManifestEntryV2::try_from( - (*entry).clone(), - &partition_type, - )?)? - .resolve(&avro_schema)?, }; avro_writer.append(value)?; @@ -330,13 +471,11 @@ impl ManifestWriter { let length = content.len(); self.output.write(Bytes::from(content)).await?; - let partition_summary = self.construct_partition_summaries(&partition_type)?; - Ok(ManifestFile { manifest_path: self.output.location().to_string(), manifest_length: length as i64, - partition_spec_id: manifest.metadata.partition_spec.spec_id(), - content: manifest.metadata.content, + partition_spec_id: self.metadata.partition_spec.spec_id(), + content: self.metadata.content, // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with // real sequence number in `ManifestListWriter`. sequence_number: UNASSIGNED_SEQUENCE_NUMBER, @@ -1632,16 +1771,18 @@ mod tests { .build() .unwrap(), ); - let manifest = Manifest { - metadata: ManifestMetadata { - schema_id: 0, - schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), - content: ManifestContentType::Data, - format_version: FormatVersion::V2, - }, - entries: vec![ - Arc::new(ManifestEntry { + let metadata = ManifestMetadata { + schema_id: 0, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema) + .with_spec_id(0) + .build() + .unwrap(), + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }; + let mut entries = vec![ + ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, sequence_number: None, @@ -1664,13 +1805,34 @@ mod tests { equality_ids: Vec::new(), sort_order_id: None, } - }) - ] - }; + } + ]; - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + // write manifest to file + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("test_manifest.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let mut writer = ManifestWriterBuilder::new( + output_file, + 1, + vec![], + metadata.schema.clone(), + metadata.partition_spec.clone(), + ) + .build_v2_data(); + for entry in &entries { + writer.add(entry.clone()).unwrap(); + } + writer.to_manifest_file().await.unwrap(); - test_manifest_read_write(manifest, writer).await; + // read back the manifest file and check the content + let actual_manifest = + Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice()) + .unwrap(); + // The snapshot id is assigned when the entry is added to the manifest. + entries[0].snapshot_id = Some(1); + assert_eq!(actual_manifest, Manifest::new(metadata, entries)); } #[tokio::test] @@ -1745,17 +1907,21 @@ mod tests { .build() .unwrap(), ); - let manifest = Manifest { - metadata: ManifestMetadata { - schema_id: 0, - schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema) - .with_spec_id(0).add_partition_field("v_int", "v_int", Transform::Identity).unwrap() - .add_partition_field("v_long", "v_long", Transform::Identity).unwrap().build().unwrap(), - content: ManifestContentType::Data, - format_version: FormatVersion::V2, - }, - entries: vec![Arc::new(ManifestEntry { + let metadata = ManifestMetadata { + schema_id: 0, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema) + .with_spec_id(0) + .add_partition_field("v_int", "v_int", Transform::Identity) + .unwrap() + .add_partition_field("v_long", "v_long", Transform::Identity) + .unwrap() + .build() + .unwrap(), + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }; + let mut entries = vec![ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, sequence_number: None, @@ -1820,15 +1986,38 @@ mod tests { equality_ids: vec![], sort_order_id: None, }, - })], - }; - - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + }]; - let res = test_manifest_read_write(manifest, writer).await; + // write manifest to file and check the return manifest file. + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("test_manifest.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let mut writer = ManifestWriterBuilder::new( + output_file, + 2, + vec![], + metadata.schema.clone(), + metadata.partition_spec.clone(), + ) + .build_v2_data(); + for entry in &entries { + writer.add(entry.clone()).unwrap(); + } + let manifest_file = writer.to_manifest_file().await.unwrap(); + assert_eq!(manifest_file.sequence_number, UNASSIGNED_SEQUENCE_NUMBER); + assert_eq!( + manifest_file.min_sequence_number, + UNASSIGNED_SEQUENCE_NUMBER + ); - assert_eq!(res.sequence_number, UNASSIGNED_SEQUENCE_NUMBER); - assert_eq!(res.min_sequence_number, UNASSIGNED_SEQUENCE_NUMBER); + // read back the manifest file and check the content + let actual_manifest = + Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice()) + .unwrap(); + // The snapshot id is assigned when the entry is added to the manifest. + entries[0].snapshot_id = Some(2); + assert_eq!(actual_manifest, Manifest::new(metadata, entries)); } #[tokio::test] @@ -1856,15 +2045,17 @@ mod tests { .build() .unwrap(), ); - let manifest = Manifest { - metadata: ManifestMetadata { - schema_id: 1, - schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), - content: ManifestContentType::Data, - format_version: FormatVersion::V1, - }, - entries: vec![Arc::new(ManifestEntry { + let metadata = ManifestMetadata { + schema_id: 1, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema) + .with_spec_id(0) + .build() + .unwrap(), + content: ManifestContentType::Data, + format_version: FormatVersion::V1, + }; + let mut entries = vec![ManifestEntry { status: ManifestStatus::Added, snapshot_id: Some(0), sequence_number: Some(0), @@ -1887,13 +2078,33 @@ mod tests { equality_ids: vec![], sort_order_id: Some(0), } - })], - }; + }]; - let writer = - |output_file: OutputFile| ManifestWriter::new(output_file, 2966623707104393227, vec![]); + // write manifest to file + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("test_manifest.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let mut writer = ManifestWriterBuilder::new( + output_file, + 3, + vec![], + metadata.schema.clone(), + metadata.partition_spec.clone(), + ) + .build_v1(); + for entry in &entries { + writer.add(entry.clone()).unwrap(); + } + writer.to_manifest_file().await.unwrap(); - test_manifest_read_write(manifest, writer).await; + // read back the manifest file and check the content + let actual_manifest = + Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice()) + .unwrap(); + // The snapshot id is assigned when the entry is added to the manifest. + entries[0].snapshot_id = Some(3); + assert_eq!(actual_manifest, Manifest::new(metadata, entries)); } #[tokio::test] @@ -1920,16 +2131,19 @@ mod tests { .build() .unwrap(), ); - let manifest = Manifest { - metadata: ManifestMetadata { - schema_id: 0, - schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).add_partition_field("category", "category", Transform::Identity).unwrap().build().unwrap(), - content: ManifestContentType::Data, - format_version: FormatVersion::V1, - }, - entries: vec![ - Arc::new(ManifestEntry { + let metadata = ManifestMetadata { + schema_id: 0, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema) + .add_partition_field("category", "category", Transform::Identity) + .unwrap() + .build() + .unwrap(), + content: ManifestContentType::Data, + format_version: FormatVersion::V1, + }; + let mut entries = vec![ + ManifestEntry { status: ManifestStatus::Added, snapshot_id: Some(0), sequence_number: Some(0), @@ -1967,17 +2181,43 @@ mod tests { equality_ids: vec![], sort_order_id: Some(0), }, - }) - ] - }; - - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + } + ]; - let entry = test_manifest_read_write(manifest, writer).await; + // write manifest to file + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("test_manifest.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let mut writer = ManifestWriterBuilder::new( + output_file, + 2, + vec![], + metadata.schema.clone(), + metadata.partition_spec.clone(), + ) + .build_v1(); + for entry in &entries { + writer.add(entry.clone()).unwrap(); + } + let manifest_file = writer.to_manifest_file().await.unwrap(); + assert_eq!(manifest_file.partitions.len(), 1); + assert_eq!( + manifest_file.partitions[0].lower_bound, + Some(Datum::string("x")) + ); + assert_eq!( + manifest_file.partitions[0].upper_bound, + Some(Datum::string("x")) + ); - assert_eq!(entry.partitions.len(), 1); - assert_eq!(entry.partitions[0].lower_bound, Some(Datum::string("x"))); - assert_eq!(entry.partitions[0].upper_bound, Some(Datum::string("x"))); + // read back the manifest file and check the content + let actual_manifest = + Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice()) + .unwrap(); + // The snapshot id is assigned when the entry is added to the manifest. + entries[0].snapshot_id = Some(2); + assert_eq!(actual_manifest, Manifest::new(metadata, entries)); } #[tokio::test] @@ -1999,15 +2239,17 @@ mod tests { .build() .unwrap(), ); - let manifest = Manifest { - metadata: ManifestMetadata { - schema_id: 0, - schema: schema.clone(), - partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), - content: ManifestContentType::Data, - format_version: FormatVersion::V2, - }, - entries: vec![Arc::new(ManifestEntry { + let metadata = ManifestMetadata { + schema_id: 0, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema) + .with_spec_id(0) + .build() + .unwrap(), + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }; + let entries = vec![ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, sequence_number: None, @@ -2042,18 +2284,34 @@ mod tests { equality_ids: vec![], sort_order_id: None, }, - })], - }; - - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + }]; - let (avro_bytes, _) = write_manifest(&manifest, writer).await; + // write manifest to file + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("test_manifest.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let mut writer = ManifestWriterBuilder::new( + output_file, + 2, + vec![], + metadata.schema.clone(), + metadata.partition_spec.clone(), + ) + .build_v2_data(); + for entry in &entries { + writer.add(entry.clone()).unwrap(); + } + writer.to_manifest_file().await.unwrap(); - // The parse should succeed. - let actual_manifest = Manifest::parse_avro(avro_bytes.as_slice()).unwrap(); + // read back the manifest file and check the content + let actual_manifest = + Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice()) + .unwrap(); // Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and // other parts should be same. + // The snapshot id is assigned when the entry is added to the manifest. let schema = Arc::new( Schema::builder() .with_fields(vec![ @@ -2081,7 +2339,7 @@ mod tests { }, entries: vec![Arc::new(ManifestEntry { status: ManifestStatus::Added, - snapshot_id: None, + snapshot_id: Some(2), sequence_number: None, file_sequence_number: None, data_file: DataFile { @@ -2152,16 +2410,15 @@ mod tests { .unwrap() .build() .unwrap(); - let manifest = Manifest { - metadata: ManifestMetadata { - schema_id: 0, - schema, - partition_spec, - content: ManifestContentType::Data, - format_version: FormatVersion::V2, - }, - entries: vec![ - Arc::new(ManifestEntry { + let metadata = ManifestMetadata { + schema_id: 0, + schema, + partition_spec, + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }; + let entries = vec![ + ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, sequence_number: None, @@ -2190,8 +2447,7 @@ mod tests { equality_ids: Vec::new(), sort_order_id: None, } - }), - Arc::new( + }, ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, @@ -2221,9 +2477,7 @@ mod tests { equality_ids: Vec::new(), sort_order_id: None, } - } - ), - Arc::new( + }, ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, @@ -2253,9 +2507,7 @@ mod tests { equality_ids: Vec::new(), sort_order_id: None, } - } - ), - Arc::new( + }, ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, @@ -2285,14 +2537,27 @@ mod tests { equality_ids: Vec::new(), sort_order_id: None, } - } - ), - ] - }; + }, + ]; - let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); + // write manifest to file + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("test_manifest.avro"); + let io = FileIOBuilder::new_fs_io().build().unwrap(); + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let mut writer = ManifestWriterBuilder::new( + output_file, + 1, + vec![], + metadata.schema.clone(), + metadata.partition_spec.clone(), + ) + .build_v2_data(); + for entry in &entries { + writer.add(entry.clone()).unwrap(); + } + let res = writer.to_manifest_file().await.unwrap(); - let res = test_manifest_read_write(manifest, writer).await; assert!(res.partitions.len() == 3); assert!(res.partitions[0].lower_bound == Some(Datum::int(1111))); assert!(res.partitions[0].upper_bound == Some(Datum::int(2021))); @@ -2310,30 +2575,138 @@ mod tests { assert!(res.partitions[2].contains_nan == Some(false)); } - async fn test_manifest_read_write( - manifest: Manifest, - writer_builder: impl FnOnce(OutputFile) -> ManifestWriter, - ) -> ManifestFile { - let (bs, res) = write_manifest(&manifest, writer_builder).await; - let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap(); - - assert_eq!(actual_manifest, manifest); - res - } + #[tokio::test] + async fn test_add_delete_existing() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "name", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let metadata = ManifestMetadata { + schema_id: 0, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema) + .with_spec_id(0) + .build() + .unwrap(), + content: ManifestContentType::Data, + format_version: FormatVersion::V2, + }; + let mut entries = vec![ + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: Some(1), + file_sequence_number: Some(1), + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(1, 61), (2, 73)]), + value_counts: HashMap::from([(1, 1), (2, 1)]), + null_value_counts: HashMap::from([(1, 0), (2, 0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Some(Vec::new()), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + }, + }, + ManifestEntry { + status: ManifestStatus::Deleted, + snapshot_id: Some(1), + sequence_number: Some(1), + file_sequence_number: Some(1), + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(1, 61), (2, 73)]), + value_counts: HashMap::from([(1, 1), (2, 1)]), + null_value_counts: HashMap::from([(1, 0), (2, 0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Some(Vec::new()), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + }, + }, + ManifestEntry { + status: ManifestStatus::Existing, + snapshot_id: Some(1), + sequence_number: Some(1), + file_sequence_number: Some(1), + data_file: DataFile { + content: DataContentType::Data, + file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-ba56fbfa-f2ff-40c9-bb27-565ad6dc2be8-00000.parquet".to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count: 1, + file_size_in_bytes: 5442, + column_sizes: HashMap::from([(1, 61), (2, 73)]), + value_counts: HashMap::from([(1, 1), (2, 1)]), + null_value_counts: HashMap::from([(1, 0), (2, 0)]), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: Some(Vec::new()), + split_offsets: vec![4], + equality_ids: Vec::new(), + sort_order_id: None, + }, + }, + ]; - /// Utility method which writes out a manifest and returns the bytes. - async fn write_manifest( - manifest: &Manifest, - writer_builder: impl FnOnce(OutputFile) -> ManifestWriter, - ) -> (Vec, ManifestFile) { - let temp_dir = TempDir::new().unwrap(); - let path = temp_dir.path().join("test_manifest.avro"); + // write manifest to file + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("test_manifest.avro"); let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let writer = writer_builder(output_file); - let res = writer.write(manifest.clone()).await.unwrap(); - - // Verify manifest - (fs::read(path).expect("read_file must succeed"), res) + let mut writer = ManifestWriterBuilder::new( + output_file, + 3, + vec![], + metadata.schema.clone(), + metadata.partition_spec.clone(), + ) + .build_v2_data(); + writer.add(entries[0].clone()).unwrap(); + writer.delete(entries[1].clone()).unwrap(); + writer.existing(entries[2].clone()).unwrap(); + writer.to_manifest_file().await.unwrap(); + + // read back the manifest file and check the content + let actual_manifest = + Manifest::parse_avro(fs::read(path).expect("read_file must succeed").as_slice()) + .unwrap(); + + // The snapshot id is assigned when the entry is added and delete to the manifest. Existing entries are keep original. + entries[0].snapshot_id = Some(3); + entries[1].snapshot_id = Some(3); + // file sequence number is assigned to None when the entry is added and delete to the manifest. + entries[0].file_sequence_number = None; + assert_eq!(actual_manifest, Manifest::new(metadata, entries)); } } diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index cfd6a8381..f93986ce9 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -28,10 +28,9 @@ use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, - Summary, Transform, MAIN_BRANCH, + DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile, ManifestListWriter, + ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, + SortDirection, SortField, SortOrder, Struct, StructType, Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; @@ -378,43 +377,42 @@ impl<'a> SnapshotProduceAction<'a> { // Write manifest file for added data files and return the ManifestFile for ManifestList. async fn write_added_manifest(&mut self) -> Result { let added_data_files = std::mem::take(&mut self.added_data_files); - let manifest_entries = added_data_files - .into_iter() - .map(|data_file| { - let builder = ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .data_file(data_file); - if self.tx.table.metadata().format_version() == FormatVersion::V1 { - builder.snapshot_id(self.snapshot_id).build() - } else { - // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when - // commit failed. - builder.build() - } - }) - .collect(); - let schema = self.tx.table.metadata().current_schema(); - let manifest_meta = ManifestMetadata::builder() - .schema(schema.clone()) - .schema_id(schema.schema_id()) - .format_version(self.tx.table.metadata().format_version()) - .partition_spec( + let snapshot_id = self.snapshot_id; + let manifest_entries = added_data_files.into_iter().map(|data_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file); + if self.tx.table.metadata().format_version() == FormatVersion::V1 { + builder.snapshot_id(snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } + }); + let mut writer = { + let builder = ManifestWriterBuilder::new( + self.new_manifest_output()?, + self.snapshot_id, + self.key_metadata.clone(), + self.tx.table.metadata().current_schema().clone(), self.tx .table .metadata() .default_partition_spec() .as_ref() .clone(), - ) - .content(crate::spec::ManifestContentType::Data) - .build(); - let manifest = Manifest::new(manifest_meta, manifest_entries); - let writer = ManifestWriter::new( - self.new_manifest_output()?, - self.snapshot_id, - self.key_metadata.clone(), - ); - writer.write(manifest).await + ); + if self.tx.table.metadata().format_version() == FormatVersion::V1 { + builder.build_v1() + } else { + builder.build_v2_data() + } + }; + for entry in manifest_entries { + writer.add(entry)?; + } + writer.to_manifest_file().await } async fn manifest_file(