diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 086c63080..249163018 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -656,6 +656,38 @@ mod _const_schema { }) }; + fn data_file_fields_v2(partition_type: StructType) -> Vec { + vec![ + CONTENT.clone(), + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + EQUALITY_IDS.clone(), + SORT_ORDER_ID.clone(), + ] + } + + pub(super) fn data_file_schema_v2(partition_type: StructType) -> Result { + let schema = Schema::builder() + .with_fields(data_file_fields_v2(partition_type)) + .build()?; + schema_to_avro_schema("data_file", &schema) + } + pub(super) fn manifest_schema_v2(partition_type: StructType) -> Result { let fields = vec![ STATUS.clone(), @@ -665,34 +697,44 @@ mod _const_schema { Arc::new(NestedField::required( 2, "data_file", - Type::Struct(StructType::new(vec![ - CONTENT.clone(), - FILE_PATH.clone(), - FILE_FORMAT.clone(), - Arc::new(NestedField::required( - 102, - "partition", - Type::Struct(partition_type), - )), - RECORD_COUNT.clone(), - FILE_SIZE_IN_BYTES.clone(), - COLUMN_SIZES.clone(), - VALUE_COUNTS.clone(), - NULL_VALUE_COUNTS.clone(), - NAN_VALUE_COUNTS.clone(), - LOWER_BOUNDS.clone(), - UPPER_BOUNDS.clone(), - KEY_METADATA.clone(), - SPLIT_OFFSETS.clone(), - EQUALITY_IDS.clone(), - SORT_ORDER_ID.clone(), - ])), + Type::Struct(StructType::new(data_file_fields_v2(partition_type))), )), ]; let schema = Schema::builder().with_fields(fields).build()?; schema_to_avro_schema("manifest_entry", &schema) } + fn data_file_fields_v1(partition_type: StructType) -> Vec { + vec![ + FILE_PATH.clone(), + FILE_FORMAT.clone(), + Arc::new(NestedField::required( + 102, + "partition", + Type::Struct(partition_type), + )), + RECORD_COUNT.clone(), + FILE_SIZE_IN_BYTES.clone(), + BLOCK_SIZE_IN_BYTES.clone(), + COLUMN_SIZES.clone(), + VALUE_COUNTS.clone(), + NULL_VALUE_COUNTS.clone(), + NAN_VALUE_COUNTS.clone(), + LOWER_BOUNDS.clone(), + UPPER_BOUNDS.clone(), + KEY_METADATA.clone(), + SPLIT_OFFSETS.clone(), + SORT_ORDER_ID.clone(), + ] + } + + pub(super) fn data_file_schema_v1(partition_type: StructType) -> Result { + let schema = Schema::builder() + .with_fields(data_file_fields_v1(partition_type)) + .build()?; + schema_to_avro_schema("data_file", &schema) + } + pub(super) fn manifest_schema_v1(partition_type: StructType) -> Result { let fields = vec![ STATUS.clone(), @@ -700,27 +742,7 @@ mod _const_schema { Arc::new(NestedField::required( 2, "data_file", - Type::Struct(StructType::new(vec![ - FILE_PATH.clone(), - FILE_FORMAT.clone(), - Arc::new(NestedField::required( - 102, - "partition", - Type::Struct(partition_type), - )), - RECORD_COUNT.clone(), - FILE_SIZE_IN_BYTES.clone(), - BLOCK_SIZE_IN_BYTES.clone(), - COLUMN_SIZES.clone(), - VALUE_COUNTS.clone(), - NULL_VALUE_COUNTS.clone(), - NAN_VALUE_COUNTS.clone(), - LOWER_BOUNDS.clone(), - UPPER_BOUNDS.clone(), - KEY_METADATA.clone(), - SPLIT_OFFSETS.clone(), - SORT_ORDER_ID.clone(), - ])), + Type::Struct(StructType::new(data_file_fields_v1(partition_type))), )), ]; let schema = Schema::builder().with_fields(fields).build()?; @@ -1189,6 +1211,47 @@ impl DataFile { self.sort_order_id } } + +/// Convert data files to avro bytes. +pub fn data_files_to_avro( + data_files: impl IntoIterator, + partition_type: &StructType, + version: FormatVersion, +) -> Result> { + let avro_schema = match version { + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(), + }; + let mut writer = AvroWriter::new(&avro_schema, Vec::new()); + + for data_file in data_files { + let value = to_value(_serde::DataFile::try_from(data_file, partition_type, true)?)? + .resolve(&avro_schema)?; + writer.append(value)?; + } + + Ok(writer.into_inner()?) +} + +/// Parse data files from avro bytes. +pub fn parse_from_avro( + bytes: &[u8], + schema: &Schema, + partition_type: &StructType, + version: FormatVersion, +) -> Result> { + let avro_schema = match version { + FormatVersion::V1 => _const_schema::data_file_schema_v1(partition_type.clone()).unwrap(), + FormatVersion::V2 => _const_schema::data_file_schema_v2(partition_type.clone()).unwrap(), + }; + + let reader = AvroReader::with_schema(&avro_schema, bytes)?; + reader + .into_iter() + .map(|value| from_value::<_serde::DataFile>(&value?)?.try_into(partition_type, schema)) + .collect::>>() +} + /// Type of content stored by the data file: data, equality deletes, or /// position deletes (all v1 files are data files) #[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]