diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 39a061ef9..a562fef9d 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; use bytes::Bytes; +use itertools::Itertools; use serde_derive::{Deserialize, Serialize}; use serde_json::to_vec; use serde_with::{DeserializeFromStr, SerializeDisplay}; @@ -31,7 +32,8 @@ use typed_builder::TypedBuilder; use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, - Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, + PrimitiveLiteral, PrimitiveType, Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, + UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::io::OutputFile; @@ -128,7 +130,59 @@ pub struct ManifestWriter { key_metadata: Vec, - field_summary: HashMap, + partitions: Vec, +} + +struct PartitionFieldStats { + partition_type: PrimitiveType, + summary: FieldSummary, +} + +impl PartitionFieldStats { + pub(crate) fn new(partition_type: PrimitiveType) -> Self { + Self { + partition_type, + summary: Default::default(), + } + } + + pub(crate) fn update(&mut self, value: Option) -> Result<()> { + if let Some(value) = value { + if !self.partition_type.compatible(&value) { + return Err(Error::new( + ErrorKind::DataInvalid, + "value is not compatitable with type", + )); + } + let value = Datum::new(self.partition_type.clone(), value); + if value.is_nan() { + self.summary.contains_nan = Some(true); + } else { + if let Some(lower) = self.summary.lower_bound.as_mut() { + if value < *lower { + *lower = value.clone(); + } + } else { + self.summary.lower_bound = Some(value.clone()); + } + if let Some(upper) = self.summary.upper_bound.as_mut() { + if value < *upper { + *upper = value; + } + } else { + self.summary.lower_bound = Some(value); + } + } + } else { + self.summary.contains_null = true; + } + + Ok(()) + } + + pub(crate) fn finish(self) -> FieldSummary { + self.summary + } } impl ManifestWriter { @@ -145,62 +199,28 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - field_summary: HashMap::new(), + partitions: vec![], } } - fn update_field_summary(&mut self, entry: &ManifestEntry) { - // Update field summary - for (&k, &v) in &entry.data_file.null_value_counts { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_null = true; - } - } - - for (&k, &v) in &entry.data_file.nan_value_counts { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_nan = Some(true); - } - if v == 0 { - field_summary.contains_nan = Some(false); - } - } - - for (&k, v) in &entry.data_file.lower_bounds { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.lower_bound { - if v < cur { - field_summary.lower_bound = Some(v.clone()); - } - } else { - field_summary.lower_bound = Some(v.clone()); - } - } - - for (&k, v) in &entry.data_file.upper_bounds { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.upper_bound { - if v > cur { - field_summary.upper_bound = Some(v.clone()); - } - } else { - field_summary.upper_bound = Some(v.clone()); + fn construct_partition_summaries( + &mut self, + partition_spec: &BoundPartitionSpec, + ) -> Result> { + let partitions = std::mem::take(&mut self.partitions); + let mut field_stats: Vec<_> = partition_spec + .partition_type() + .fields() + .iter() + .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) + .collect(); + for partition in partitions { + for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + let primitve_literal = literal.map(|v| v.as_primitive_literal().unwrap()); + stat.update(primitve_literal)?; } } - } - - fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec { - let mut partition_summary = Vec::with_capacity(self.field_summary.len()); - for field in spec_fields { - let entry = self - .field_summary - .remove(&field.source_id) - .unwrap_or_default(); - partition_summary.push(entry); - } - partition_summary + Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } /// Write a manifest. @@ -276,7 +296,7 @@ impl ManifestWriter { } } - self.update_field_summary(&entry); + self.partitions.push(entry.data_file.partition.clone()); let value = match manifest.metadata.format_version { FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( @@ -299,7 +319,7 @@ impl ManifestWriter { self.output.write(Bytes::from(content)).await?; let partition_summary = - self.get_field_summary_vec(manifest.metadata.partition_spec.fields()); + self.construct_partition_summaries(&manifest.metadata.partition_spec)?; Ok(ManifestFile { manifest_path: self.output.location().to_string(), diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 5768b79d5..31ff9302e 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -857,12 +857,22 @@ pub(super) mod _serde { contains_nan: self.contains_nan, lower_bound: self .lower_bound - .map(|v| Datum::try_from_bytes(&v, r#type.clone())) - .transpose()?, + .as_ref() + .map(|v| Datum::try_from_bytes(v, r#type.clone())) + .transpose() + .map_err(|err| { + err.with_context("type", format!("{:?}", r#type)) + .with_context("bytes", format!("{:?}", self.lower_bound)) + })?, upper_bound: self .upper_bound + .as_ref() .map(|v| Datum::try_from_bytes(&v, r#type.clone())) - .transpose()?, + .transpose() + .map_err(|err| { + err.with_context("type", format!("{:?}", r#type)) + .with_context("bytes", format!("{:?}", self.upper_bound)) + })?, }) } }