Skip to content

Commit

Permalink
fix FieldSummary generated from Manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Dec 13, 2024
1 parent d16e81e commit fe784ad
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 58 deletions.
130 changes: 75 additions & 55 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;

use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter};
use bytes::Bytes;
use itertools::Itertools;
use serde_derive::{Deserialize, Serialize};
use serde_json::to_vec;
use serde_with::{DeserializeFromStr, SerializeDisplay};
Expand All @@ -31,7 +32,8 @@ use typed_builder::TypedBuilder;
use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
use super::{
BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile,
Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER,
PrimitiveLiteral, PrimitiveType, Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER,
UNASSIGNED_SEQUENCE_NUMBER,
};
use crate::error::Result;
use crate::io::OutputFile;
Expand Down Expand Up @@ -128,7 +130,59 @@ pub struct ManifestWriter {

key_metadata: Vec<u8>,

field_summary: HashMap<i32, FieldSummary>,
partitions: Vec<Struct>,
}

struct PartitionFieldStats {
partition_type: PrimitiveType,
summary: FieldSummary,
}

impl PartitionFieldStats {
pub(crate) fn new(partition_type: PrimitiveType) -> Self {
Self {
partition_type,
summary: Default::default(),
}
}

pub(crate) fn update(&mut self, value: Option<PrimitiveLiteral>) -> Result<()> {
if let Some(value) = value {
if !self.partition_type.compatible(&value) {
return Err(Error::new(
ErrorKind::DataInvalid,
"value is not compatitable with type",
));
}
let value = Datum::new(self.partition_type.clone(), value);
if value.is_nan() {
self.summary.contains_nan = Some(true);
} else {
if let Some(lower) = self.summary.lower_bound.as_mut() {
if value < *lower {
*lower = value.clone();
}
} else {
self.summary.lower_bound = Some(value.clone());
}
if let Some(upper) = self.summary.upper_bound.as_mut() {
if value < *upper {
*upper = value;
}
} else {
self.summary.lower_bound = Some(value);
}
}
} else {
self.summary.contains_null = true;
}

Ok(())
}

pub(crate) fn finish(self) -> FieldSummary {
self.summary
}
}

impl ManifestWriter {
Expand All @@ -145,62 +199,28 @@ impl ManifestWriter {
deleted_rows: 0,
min_seq_num: None,
key_metadata,
field_summary: HashMap::new(),
partitions: vec![],
}
}

fn update_field_summary(&mut self, entry: &ManifestEntry) {
// Update field summary
for (&k, &v) in &entry.data_file.null_value_counts {
let field_summary = self.field_summary.entry(k).or_default();
if v > 0 {
field_summary.contains_null = true;
}
}

for (&k, &v) in &entry.data_file.nan_value_counts {
let field_summary = self.field_summary.entry(k).or_default();
if v > 0 {
field_summary.contains_nan = Some(true);
}
if v == 0 {
field_summary.contains_nan = Some(false);
}
}

for (&k, v) in &entry.data_file.lower_bounds {
let field_summary = self.field_summary.entry(k).or_default();
if let Some(cur) = &field_summary.lower_bound {
if v < cur {
field_summary.lower_bound = Some(v.clone());
}
} else {
field_summary.lower_bound = Some(v.clone());
}
}

for (&k, v) in &entry.data_file.upper_bounds {
let field_summary = self.field_summary.entry(k).or_default();
if let Some(cur) = &field_summary.upper_bound {
if v > cur {
field_summary.upper_bound = Some(v.clone());
}
} else {
field_summary.upper_bound = Some(v.clone());
fn construct_partition_summaries(
&mut self,
partition_spec: &BoundPartitionSpec,
) -> Result<Vec<FieldSummary>> {
let partitions = std::mem::take(&mut self.partitions);
let mut field_stats: Vec<_> = partition_spec
.partition_type()
.fields()
.iter()
.map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone()))
.collect();
for partition in partitions {
for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) {
let primitve_literal = literal.map(|v| v.as_primitive_literal().unwrap());
stat.update(primitve_literal)?;
}
}
}

fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec<FieldSummary> {
let mut partition_summary = Vec::with_capacity(self.field_summary.len());
for field in spec_fields {
let entry = self
.field_summary
.remove(&field.source_id)
.unwrap_or_default();
partition_summary.push(entry);
}
partition_summary
Ok(field_stats.into_iter().map(|stat| stat.finish()).collect())
}

/// Write a manifest.
Expand Down Expand Up @@ -276,7 +296,7 @@ impl ManifestWriter {
}
}

self.update_field_summary(&entry);
self.partitions.push(entry.data_file.partition.clone());

let value = match manifest.metadata.format_version {
FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from(
Expand All @@ -299,7 +319,7 @@ impl ManifestWriter {
self.output.write(Bytes::from(content)).await?;

let partition_summary =
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());
self.construct_partition_summaries(&manifest.metadata.partition_spec)?;

Ok(ManifestFile {
manifest_path: self.output.location().to_string(),
Expand Down
16 changes: 13 additions & 3 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,12 +857,22 @@ pub(super) mod _serde {
contains_nan: self.contains_nan,
lower_bound: self
.lower_bound
.map(|v| Datum::try_from_bytes(&v, r#type.clone()))
.transpose()?,
.as_ref()
.map(|v| Datum::try_from_bytes(v, r#type.clone()))
.transpose()
.map_err(|err| {
err.with_context("type", format!("{:?}", r#type))
.with_context("bytes", format!("{:?}", self.lower_bound))
})?,
upper_bound: self
.upper_bound
.as_ref()
.map(|v| Datum::try_from_bytes(&v, r#type.clone()))
.transpose()?,
.transpose()
.map_err(|err| {
err.with_context("type", format!("{:?}", r#type))
.with_context("bytes", format!("{:?}", self.upper_bound))
})?,
})
}
}
Expand Down

0 comments on commit fe784ad

Please sign in to comment.