Skip to content

Commit

Permalink
feat: support serialize/deserialize DataFile into avro bytes (#797)
Browse files Browse the repository at this point in the history
Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Jan 2, 2025
1 parent 2fb9808 commit 09fa1fa
Showing 1 changed file with 179 additions and 49 deletions.
228 changes: 179 additions & 49 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Manifest for Iceberg.
use std::cmp::min;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::str::FromStr;
use std::sync::Arc;

Expand Down Expand Up @@ -61,7 +62,7 @@ impl Manifest {

let entries = match metadata.format_version {
FormatVersion::V1 => {
let schema = manifest_schema_v1(partition_type.clone())?;
let schema = manifest_schema_v1(&partition_type)?;
let reader = AvroReader::with_schema(&schema, bs)?;
reader
.into_iter()
Expand All @@ -72,7 +73,7 @@ impl Manifest {
.collect::<Result<Vec<_>>>()?
}
FormatVersion::V2 => {
let schema = manifest_schema_v2(partition_type.clone())?;
let schema = manifest_schema_v2(&partition_type)?;
let reader = AvroReader::with_schema(&schema, bs)?;
reader
.into_iter()
Expand Down Expand Up @@ -241,8 +242,8 @@ impl ManifestWriter {
.partition_type(&manifest.metadata.schema)?;
let table_schema = &manifest.metadata.schema;
let avro_schema = match manifest.metadata.format_version {
FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?,
FormatVersion::V2 => manifest_schema_v2(partition_type.clone())?,
FormatVersion::V1 => manifest_schema_v1(&partition_type)?,
FormatVersion::V2 => manifest_schema_v2(&partition_type)?,
};
let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new());
avro_writer.add_user_metadata(
Expand Down Expand Up @@ -656,7 +657,39 @@ mod _const_schema {
})
};

pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result<AvroSchema, Error> {
fn data_file_fields_v2(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
CONTENT.clone(),
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type.clone()),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
EQUALITY_IDS.clone(),
SORT_ORDER_ID.clone(),
]
}

pub(super) fn data_file_schema_v2(partition_type: &StructType) -> Result<AvroSchema, Error> {
let schema = Schema::builder()
.with_fields(data_file_fields_v2(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}

pub(super) fn manifest_schema_v2(partition_type: &StructType) -> Result<AvroSchema, Error> {
let fields = vec![
STATUS.clone(),
SNAPSHOT_ID_V2.clone(),
Expand All @@ -665,62 +698,52 @@ mod _const_schema {
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(vec![
CONTENT.clone(),
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
EQUALITY_IDS.clone(),
SORT_ORDER_ID.clone(),
])),
Type::Struct(StructType::new(data_file_fields_v2(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
schema_to_avro_schema("manifest_entry", &schema)
}

pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result<AvroSchema, Error> {
fn data_file_fields_v1(partition_type: &StructType) -> Vec<NestedFieldRef> {
vec![
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type.clone()),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
BLOCK_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
SORT_ORDER_ID.clone(),
]
}

pub(super) fn data_file_schema_v1(partition_type: &StructType) -> Result<AvroSchema, Error> {
let schema = Schema::builder()
.with_fields(data_file_fields_v1(partition_type))
.build()?;
schema_to_avro_schema("data_file", &schema)
}

pub(super) fn manifest_schema_v1(partition_type: &StructType) -> Result<AvroSchema, Error> {
let fields = vec![
STATUS.clone(),
SNAPSHOT_ID_V1.clone(),
Arc::new(NestedField::required(
2,
"data_file",
Type::Struct(StructType::new(vec![
FILE_PATH.clone(),
FILE_FORMAT.clone(),
Arc::new(NestedField::required(
102,
"partition",
Type::Struct(partition_type),
)),
RECORD_COUNT.clone(),
FILE_SIZE_IN_BYTES.clone(),
BLOCK_SIZE_IN_BYTES.clone(),
COLUMN_SIZES.clone(),
VALUE_COUNTS.clone(),
NULL_VALUE_COUNTS.clone(),
NAN_VALUE_COUNTS.clone(),
LOWER_BOUNDS.clone(),
UPPER_BOUNDS.clone(),
KEY_METADATA.clone(),
SPLIT_OFFSETS.clone(),
SORT_ORDER_ID.clone(),
])),
Type::Struct(StructType::new(data_file_fields_v1(partition_type))),
)),
];
let schema = Schema::builder().with_fields(fields).build()?;
Expand Down Expand Up @@ -1189,6 +1212,49 @@ impl DataFile {
self.sort_order_id
}
}

/// Convert data files to avro bytes and write to writer.
/// Return the bytes written.
pub fn write_data_files_to_avro<W: Write>(
writer: &mut W,
data_files: impl IntoIterator<Item = DataFile>,
partition_type: &StructType,
version: FormatVersion,
) -> Result<usize> {
let avro_schema = match version {
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(),
};
let mut writer = AvroWriter::new(&avro_schema, writer);

for data_file in data_files {
let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)?
.resolve(&avro_schema)?;
writer.append(value)?;
}

Ok(writer.flush()?)
}

/// Parse data files from avro bytes.
pub fn read_data_files_from_avro<R: Read>(
reader: &mut R,
schema: &Schema,
partition_type: &StructType,
version: FormatVersion,
) -> Result<Vec<DataFile>> {
let avro_schema = match version {
FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type).unwrap(),
FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type).unwrap(),
};

let reader = AvroReader::with_schema(&avro_schema, reader)?;
reader
.into_iter()
.map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, schema))
.collect::<Result<Vec<_>>>()
}

/// Type of content stored by the data file: data, equality deletes, or
/// position deletes (all v1 files are data files)
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
Expand Down Expand Up @@ -1551,6 +1617,7 @@ mod _serde {
#[cfg(test)]
mod tests {
use std::fs;
use std::io::Cursor;
use std::sync::Arc;

use tempfile::TempDir;
Expand Down Expand Up @@ -2336,4 +2403,67 @@ mod tests {
// Verify manifest
(fs::read(path).expect("read_file must succeed"), res)
}

#[tokio::test]
async fn test_data_file_serialize_deserialize() {
let schema = Arc::new(
Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"v1",
Type::Primitive(PrimitiveType::Int),
)),
Arc::new(NestedField::optional(
2,
"v2",
Type::Primitive(PrimitiveType::String),
)),
Arc::new(NestedField::optional(
3,
"v3",
Type::Primitive(PrimitiveType::String),
)),
])
.build()
.unwrap(),
);
let data_files = vec![DataFile {
content: DataContentType::Data,
file_path: "s3://testbucket/iceberg_data/iceberg_ctl/iceberg_db/iceberg_tbl/data/00000-7-45268d71-54eb-476c-b42c-942d880c04a1-00001.parquet".to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 875,
column_sizes: HashMap::from([(1,47),(2,48),(3,52)]),
value_counts: HashMap::from([(1,1),(2,1),(3,1)]),
null_value_counts: HashMap::from([(1,0),(2,0),(3,0)]),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
upper_bounds: HashMap::from([(1,Datum::int(1)),(2,Datum::string("a")),(3,Datum::string("AC/DC"))]),
key_metadata: None,
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: Some(0),
}];

let mut buffer = Vec::new();
let _ = write_data_files_to_avro(
&mut buffer,
data_files.clone().into_iter(),
&StructType::new(vec![]),
FormatVersion::V2,
)
.unwrap();

let actual_data_file = read_data_files_from_avro(
&mut Cursor::new(buffer),
&schema,
&StructType::new(vec![]),
FormatVersion::V2,
)
.unwrap();

assert_eq!(data_files, actual_data_file);
}
}

0 comments on commit 09fa1fa

Please sign in to comment.