diff --git a/Cargo.toml b/Cargo.toml index 95057c59a..8e95dbefb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,11 +42,13 @@ apache-avro = "0.17" array-init = "2" arrow-arith = { version = "53" } arrow-array = { version = "53" } +arrow-buffer = { version = "53" } arrow-cast = { version = "53" } arrow-ord = { version = "53" } arrow-schema = { version = "53" } arrow-select = { version = "53" } arrow-string = { version = "53" } +arrow-row = { version = "53" } async-stream = "0.3.5" async-trait = "0.1" async-std = "1.12" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 68a8658b0..a34aa36d3 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -46,8 +46,10 @@ apache-avro = { workspace = true } array-init = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 0f01324cb..c4d96d59d 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -22,5 +22,8 @@ pub use schema::*; mod reader; pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; - +mod value; pub use reader::*; +pub use value::*; +mod record_batch_partition_spliter; +pub(crate) use record_batch_partition_spliter::*; diff --git a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs new file mode 100644 index 000000000..18ee7fe7f --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs @@ -0,0 +1,464 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::{DataType, Schema}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use super::record_batch_projector::RecordBatchProjector; +use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type}; +use crate::spec::{BoundPartitionSpecRef, Literal, Struct, StructType, Type}; +use crate::transform::{create_transform_function, BoxedTransformFunction}; +use crate::{Error, ErrorKind, Result}; + +/// A helper function to split the record batch into multiple record batches using computed partition columns. +pub(crate) fn split_with_partition( + row_converter: &RowConverter, + partition_columns: &[ArrayRef], + batch: &RecordBatch, +) -> Result> { + let rows = row_converter + .convert_columns(partition_columns) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{}", e)))?; + + // Group the batch by row value. + let mut group_ids = HashMap::new(); + rows.into_iter().enumerate().for_each(|(row_id, row)| { + group_ids.entry(row.owned()).or_insert(vec![]).push(row_id); + }); + + // Partition the batch with same partition partition_values + let mut partition_batches = Vec::with_capacity(group_ids.len()); + for (row, row_ids) in group_ids.into_iter() { + // generate the bool filter array from column_ids + let filter_array: BooleanArray = { + let mut filter = vec![false; batch.num_rows()]; + row_ids.into_iter().for_each(|row_id| { + filter[row_id] = true; + }); + filter.into() + }; + + // filter the RecordBatch + partition_batches.push(( + row, + filter_record_batch(batch, &filter_array) + .expect("We should guarantee the filter array is valid"), + )); + } + + Ok(partition_batches) +} + +pub(crate) fn convert_row_to_struct( + row_converter: &RowConverter, + struct_type: &StructType, + rows: Vec, +) -> Result> { + let arrow_struct_array = { + let partition_columns = row_converter + .convert_rows(rows.iter().map(|row| row.row())) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{e}")))?; + let partition_arrow_fields = { + let partition_arrow_type = type_to_arrow_type(&Type::Struct(struct_type.clone()))?; + let DataType::Struct(fields) = partition_arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partition arrow type is not a struct type", + )); + }; + fields + }; + StructArray::try_new(partition_arrow_fields, partition_columns, None)? + }; + let struct_array = { + let struct_array = arrow_struct_to_literal(&arrow_struct_array, struct_type)?; + struct_array + .into_iter() + .map(|s| { + if let Some(s) = s { + if let Literal::Struct(s) = s { + Ok(s) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "The struct is not a struct literal", + )) + } + } else { + Err(Error::new(ErrorKind::DataInvalid, "The struct is null")) + } + }) + .collect::>>()? + }; + + Ok(struct_array) +} + +/// The spliter used to split the record batch into multiple record batches by the partition spec. +pub(crate) struct RecordBatchPartitionSpliter { + partition_spec: BoundPartitionSpecRef, + projector: RecordBatchProjector, + transform_functions: Vec, + row_converter: RowConverter, +} + +impl RecordBatchPartitionSpliter { + pub(crate) fn new( + original_schema: &Schema, + partition_spec: BoundPartitionSpecRef, + ) -> Result { + if partition_spec.fields().is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Fail to create partition spliter using empty partition spec", + )); + } + let projector = RecordBatchProjector::new( + original_schema, + &partition_spec + .fields() + .iter() + .map(|field| field.source_id) + .collect::>(), + // The source columns, selected by ids, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct. + // ref: https://iceberg.apache.org/spec/#partitioning + |field| { + if field.data_type().is_nested() { + return Ok(None); + } + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|s| { + s.parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string())) + }) + .transpose() + }, + |_| true, + )?; + let transform_functions = partition_spec + .fields() + .iter() + .map(|field| create_transform_function(&field.transform)) + .collect::>>()?; + let row_converter = RowConverter::new( + partition_spec + .partition_type() + .fields() + .iter() + .map(|f| Ok(SortField::new(type_to_arrow_type(&f.field_type)?))) + .collect::>>()?, + )?; + Ok(Self { + partition_spec, + projector, + transform_functions, + row_converter, + }) + } + + /// Split the record batch into multiple record batches by the partition spec. + pub(crate) fn split(&self, batch: &RecordBatch) -> Result> { + // get array using partition spec + let source_columns = self.projector.project_column(batch.columns())?; + let partition_columns = source_columns + .into_iter() + .zip_eq(self.transform_functions.iter()) + .map(|(source_column, transform_function)| transform_function.transform(source_column)) + .collect::>>()?; + + split_with_partition(&self.row_converter, &partition_columns, batch) + } + + pub(crate) fn convert_row(&self, rows: Vec) -> Result> { + convert_row_to_struct( + &self.row_converter, + self.partition_spec.partition_type(), + rows, + ) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + + use super::*; + use crate::arrow::schema_to_arrow_schema; + use crate::spec::{BoundPartitionSpec, NestedField, Schema, Transform, UnboundPartitionField}; + + #[test] + fn test_record_batch_partition_spliter() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let partition_spliter = RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(partition_spec), + ) + .expect("Failed to create spliter"); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("data", DataType::Utf8, false), + ])); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } + + #[test] + fn test_record_batch_partition_spliter_with_extra_columns() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("extra_column1", DataType::Utf8, true), + Field::new("id", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("extra_column2", DataType::Utf8, true), + ])); + let extra_column1_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let extra_column2_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(extra_column1_array), + Arc::new(id_array), + Arc::new(data_array), + Arc::new(extra_column2_array), + ]) + .expect("Failed to create RecordBatch"); + let partition_spliter = RecordBatchPartitionSpliter::new(&schema, Arc::new(partition_spec)) + .expect("Failed to create spliter"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_extra_column1_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_extra_column2_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_extra_column1_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_extra_column2_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_extra_column1_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_extra_column2_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } + + #[test] + fn test_empty_partition() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .build() + .unwrap(); + assert!(RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(partition_spec), + ) + .is_err()) + } +} diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index f218983aa..7da5840ce 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_array::{make_array, ArrayRef, RecordBatch, StructArray}; +use arrow_buffer::NullBuffer; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use crate::error::Result; @@ -42,7 +43,7 @@ impl RecordBatchProjector { /// This function will iterate through the nested fields if the field is a struct, `searchable_field_func` can be used to control whether /// iterate into the nested fields. pub(crate) fn new( - original_schema: SchemaRef, + original_schema: &Schema, field_ids: &[i32], field_id_fetch_func: F1, searchable_field_func: F2, @@ -138,6 +139,7 @@ impl RecordBatchProjector { fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { let mut rev_iterator = field_index.iter().rev(); let mut array = batch[*rev_iterator.next().unwrap()].clone(); + let mut null_buffer = array.logical_nulls(); for idx in rev_iterator { array = array .as_any() @@ -148,8 +150,11 @@ impl RecordBatchProjector { ))? .column(*idx) .clone(); + null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref()); } - Ok(array) + Ok(make_array( + array.to_data().into_builder().nulls(null_buffer).build()?, + )) } } @@ -187,8 +192,7 @@ mod test { _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; let projector = - RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func, |_| true) - .unwrap(); + RecordBatchProjector::new(&schema, &[1, 3], field_id_fetch_func, |_| true).unwrap(); assert!(projector.field_indices.len() == 2); assert_eq!(projector.field_indices[0], vec![0]); @@ -250,8 +254,7 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[1, 5], field_id_fetch_func, |_| true); assert!(projector.is_err()); } @@ -280,12 +283,10 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| false); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| false); assert!(projector.is_err()); - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| true); assert!(projector.is_ok()); } } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index c64dd5de1..5824a366c 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -30,6 +30,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::data_type::ByteArray; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; @@ -676,6 +677,31 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result Result<[u8; 16]> { + if array.len() > 16 { + return Err(Error::new( + ErrorKind::DataInvalid, + "fail to extend array with len > 16 to array with 16", + )); + } + + // Check the sign bit: if the first byte's MSB is 1, it's negative + let is_negative = array.data().get(0).map_or(false, |&b| b & 0x80 != 0); + + // Create a buffer of 16 bytes filled with the sign extension value + let mut extended = if is_negative { + [0xFF; 16] // Fill with 0xFF for negative numbers + } else { + [0x00; 16] // Fill with 0x00 for positive numbers + }; + + // Copy the Vec into the rightmost part of the buffer + let start = 16 - array.len(); + extended[start..].copy_from_slice(&array.data()); + + Ok(extended) +} + macro_rules! get_parquet_stat_as_datum { ($limit_type:tt) => { paste::paste! { @@ -723,10 +749,21 @@ macro_rules! get_parquet_stat_as_datum { let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { return Ok(None); }; - Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_le_bytes(bytes.try_into()?)), + PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)), + )) + } + (PrimitiveType::Decimal { + precision: _, + scale: _, + }, Statistics::FixedLenByteArray(stats)) => { + let Some(bytes) = stats.[<$limit_type _bytes_opt>]() else { + return Ok(None); + }; + Some(Datum::new( + primitive_type.clone(), + PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)), )) } ( diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs new file mode 100644 index 000000000..702beff4d --- /dev/null +++ b/crates/iceberg/src/arrow/value.rs @@ -0,0 +1,1015 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ + Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, NullArray, StringArray, + StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, +}; +use arrow_schema::{DataType, TimeUnit}; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; +use crate::{Error, ErrorKind, Result}; + +/// A post order arrow array visitor. +/// # TODO +/// - Add support for ListArray, MapArray +trait ArrowArrayVistor { + type T; + fn null(&self, array: &NullArray, iceberg_type: &PrimitiveType) -> Result>; + fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result>; + fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result>; + fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result>; + fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result>; + fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result>; + fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result>; + fn decimal( + &self, + array: &Decimal128Array, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result>; + fn time( + &self, + array: &Time64MicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp( + &self, + array: &TimestampMicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp_nano( + &self, + array: &TimestampNanosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result>; + fn large_string( + &self, + array: &LargeStringArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result>; + fn large_binary( + &self, + array: &LargeBinaryArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn r#struct( + &self, + array: &StructArray, + iceberg_type: &StructType, + columns: Vec>, + ) -> Result>; +} + +struct ArrowArrayConvert; + +impl ArrowArrayVistor for ArrowArrayConvert { + type T = Option; + + fn null(&self, array: &NullArray, _iceberg_type: &PrimitiveType) -> Result> { + Ok(vec![None; array.len()]) + } + + fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Boolean => Ok(array.iter().map(|v| v.map(Literal::bool)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow boolean array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int16 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int32 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int64 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Float => Ok(array.iter().map(|v| v.map(Literal::float)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow float16 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Double => Ok(array.iter().map(|v| v.map(Literal::double)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow float64 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn decimal( + &self, + array: &Decimal128Array, + iceberg_type: &PrimitiveType, + ) -> Result> { + let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() else { + unreachable!() + }; + match iceberg_type { + PrimitiveType::Decimal { precision, scale } => { + if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The precision or scale ({},{}) of arrow decimal128 array is not compatitable with iceberg decimal type ({},{})", + arrow_precision, arrow_scale, precision, scale + ), + )); + } + Ok(array.iter().map(|v| v.map(Literal::decimal)).collect()) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow decimal128 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Date => Ok(array.iter().map(|v| v.map(Literal::date)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow date32 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn time( + &self, + array: &Time64MicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::Time => Ok(array + .iter() + .map(|v| v.map(Literal::time)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow time64 microsecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn timestamp( + &self, + array: &TimestampMicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::Timestamp => Ok(array + .iter() + .map(|v| v.map(Literal::timestamp)) + .collect()), + PrimitiveType::Timestamptz => Ok(array + .iter() + .map(|v| v.map(Literal::timestamptz)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow timestamp microsecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn timestamp_nano( + &self, + array: &TimestampNanosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::TimestampNs => Ok(array + .iter() + .map(|v| v.map(Literal::timestamp_nano)) + .collect()), + PrimitiveType::TimestamptzNs => Ok(array + .iter() + .map(|v| v.map(Literal::timestamptz_nano)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow timestamp nanosecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow string array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn large_string( + &self, + array: &LargeStringArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow large string array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Binary => Ok(array + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow binary array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn large_binary( + &self, + array: &LargeBinaryArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::Binary => Ok(array + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow binary array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn r#struct( + &self, + array: &StructArray, + _iceberg_type: &StructType, + columns: Vec>, + ) -> Result> { + let struct_literal_len = columns.first().map(|column| column.len()).unwrap_or(0); + let mut struct_literals = Vec::with_capacity(struct_literal_len); + let mut columns_iters = columns + .into_iter() + .map(|column| column.into_iter()) + .collect::>(); + + for row_idx in 0..struct_literal_len { + if array.is_null(row_idx) { + struct_literals.push(None); + continue; + } + let mut literals = Vec::with_capacity(columns_iters.len()); + for column_iter in columns_iters.iter_mut() { + literals.push(column_iter.next().unwrap()); + } + struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals)))); + } + + Ok(struct_literals) + } +} + +fn visit_arrow_struct_array( + array: &StructArray, + iceberg_type: &StructType, + visitor: &V, +) -> Result> { + let DataType::Struct(arrow_struct_fields) = array.data_type() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() != iceberg_type.fields().len() + || arrow_struct_fields.len() != iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for ((array, arrow_field), iceberg_field) in array + .columns() + .iter() + .zip_eq(arrow_struct_fields.iter()) + .zip_eq(iceberg_type.fields().iter()) + { + let arrow_type = arrow_field.data_type(); + if arrow_field.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.null(array, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.boolean(array, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int16(array, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int32(array, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int64(array, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.float(array, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.double(array, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.decimal(array, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.date(array, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.time(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp_nano(array, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.string(array, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_string(array, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.binary(array, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_binary(array, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } + } + + visitor.r#struct(array, iceberg_type, columns) +} + +// # TODO +// Add support for fulfill the missing field in arrow struct array +fn visit_arrow_struct_array_from_field_id( + array: &StructArray, + iceberg_type: &StructType, + visitor: &V, +) -> Result> { + let DataType::Struct(arrow_struct_fields) = array.data_type() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() < iceberg_type.fields().len() + || arrow_struct_fields.len() < iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for iceberg_field in iceberg_type.fields() { + let Some((idx, field)) = arrow_struct_fields.iter().enumerate().find(|(_idx, f)| { + f.metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|id| id.parse::().ok().map(|id: i32| id == iceberg_field.id)) + .unwrap_or(false) + }) else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The field {} in iceberg struct type is not found in arrow struct type", + iceberg_field.name + ), + )); + }; + let array = array.column(idx); + let arrow_type = field.data_type(); + if array.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.null(array, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.boolean(array, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int16(array, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int32(array, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int64(array, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.float(array, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.double(array, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.decimal(array, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.date(array, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.time(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp_nano(array, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.string(array, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_string(array, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.binary(array, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_binary(array, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } + } + + visitor.r#struct(array, iceberg_type, columns) +} + +/// Convert arrow struct array to iceberg struct value array. +/// This function will assume the schema of arrow struct array is the same as iceberg struct type. +pub fn arrow_struct_to_literal( + struct_array: &StructArray, + ty: &StructType, +) -> Result>> { + visit_arrow_struct_array(struct_array, ty, &ArrowArrayConvert) +} + +/// Convert arrow struct array to iceberg struct value array. +/// This function will use field id to find the corresponding field in arrow struct array. +pub fn arrow_struct_to_literal_from_field_id( + struct_array: &StructArray, + ty: &StructType, +) -> Result>> { + visit_arrow_struct_array_from_field_id(struct_array, ty, &ArrowArrayConvert) +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{ + ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, StringArray, StructArray, + Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, + }; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + + use super::*; + use crate::spec::{Literal, NestedField, PrimitiveType, StructType, Type}; + + #[test] + fn test_arrow_struct_to_iceberg_struct() { + let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]); + let int16_array = Int16Array::from(vec![Some(1), Some(2), None]); + let int32_array = Int32Array::from(vec![Some(3), Some(4), None]); + let int64_array = Int64Array::from(vec![Some(5), Some(6), None]); + let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]); + let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]); + let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None]) + .with_precision_and_scale(10, 2) + .unwrap(); + let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]); + let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]); + let timestamp_micro_array = TimestampMicrosecondArray::from(vec![ + Some(1622548800000000), + Some(1622635200000000), + None, + ]); + let timestamp_nano_array = TimestampNanosecondArray::from(vec![ + Some(1622548800000000000), + Some(1622635200000000000), + None, + ]); + let string_array = StringArray::from(vec![Some("a"), Some("b"), None]); + let binary_array = + BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("bool_field", DataType::Boolean, true)), + Arc::new(bool_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int16_field", DataType::Int16, true)), + Arc::new(int16_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int32_field", DataType::Int32, true)), + Arc::new(int32_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int64_field", DataType::Int64, true)), + Arc::new(int64_array) as ArrayRef, + ), + ( + Arc::new(Field::new("float32_field", DataType::Float32, true)), + Arc::new(float32_array) as ArrayRef, + ), + ( + Arc::new(Field::new("float64_field", DataType::Float64, true)), + Arc::new(float64_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "decimal_field", + DataType::Decimal128(10, 2), + true, + )), + Arc::new(decimal_array) as ArrayRef, + ), + ( + Arc::new(Field::new("date_field", DataType::Date32, true)), + Arc::new(date_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "time_field", + DataType::Time64(TimeUnit::Microsecond), + true, + )), + Arc::new(time_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "timestamp_micro_field", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )), + Arc::new(timestamp_micro_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "timestamp_nano_field", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )), + Arc::new(timestamp_nano_array) as ArrayRef, + ), + ( + Arc::new(Field::new("string_field", DataType::Utf8, true)), + Arc::new(string_array) as ArrayRef, + ), + ( + Arc::new(Field::new("binary_field", DataType::Binary, true)), + Arc::new(binary_array) as ArrayRef, + ), + ]); + + let iceberg_struct_type = StructType::new(vec![ + Arc::new(NestedField::optional( + 0, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 1, + "int16_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "int32_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "int64_field", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "float32_field", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "float64_field", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "decimal_field", + Type::Primitive(PrimitiveType::Decimal { + precision: 10, + scale: 2, + }), + )), + Arc::new(NestedField::optional( + 7, + "date_field", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 8, + "time_field", + Type::Primitive(PrimitiveType::Time), + )), + Arc::new(NestedField::optional( + 9, + "timestamp_micro_field", + Type::Primitive(PrimitiveType::Timestamp), + )), + Arc::new(NestedField::optional( + 10, + "timestamp_nao_field", + Type::Primitive(PrimitiveType::TimestampNs), + )), + Arc::new(NestedField::optional( + 11, + "string_field", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 12, + "binary_field", + Type::Primitive(PrimitiveType::Binary), + )), + ]); + + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); + + assert_eq!(result, vec![ + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::bool(true)), + Some(Literal::int(1)), + Some(Literal::int(3)), + Some(Literal::long(5)), + Some(Literal::float(1.1)), + Some(Literal::double(3.3)), + Some(Literal::decimal(1000)), + Some(Literal::date(18628)), + Some(Literal::time(123456789)), + Some(Literal::timestamp(1622548800000000)), + Some(Literal::timestamp_nano(1622548800000000000)), + Some(Literal::string("a".to_string())), + Some(Literal::binary(b"abc".to_vec())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::bool(false)), + Some(Literal::int(2)), + Some(Literal::int(4)), + Some(Literal::long(6)), + Some(Literal::float(2.2)), + Some(Literal::double(4.4)), + Some(Literal::decimal(2000)), + Some(Literal::date(18629)), + Some(Literal::time(987654321)), + Some(Literal::timestamp(1622635200000000)), + Some(Literal::timestamp_nano(1622635200000000000)), + Some(Literal::string("b".to_string())), + Some(Literal::binary(b"def".to_vec())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + None, None, None, None, None, None, None, None, None, None, None, None, None, + ]))), + ]); + } + + #[test] + fn test_single_column_nullable_struct() { + let struct_array = StructArray::new_null( + Fields::from(vec![Field::new("bool_field", DataType::Boolean, true)]), + 3, + ); + let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional( + 0, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + ))]); + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); + assert_eq!(result, vec![None; 3]); + } + + #[test] + fn test_empty_struct() { + let struct_array = StructArray::new_null(Fields::empty(), 3); + let iceberg_struct_type = StructType::new(vec![]); + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); + assert_eq!(result, vec![None; 0]); + } + + #[test] + fn test_arrow_struct_to_iceberg_struct_from_field_id() { + let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]); + let int16_array = Int16Array::from(vec![Some(1), Some(2), None]); + let int32_array = Int32Array::from(vec![Some(3), Some(4), None]); + let int64_array = Int64Array::from(vec![Some(5), Some(6), None]); + let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]); + let struct_array = StructArray::from(vec![ + ( + Arc::new( + Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())], + )), + ), + Arc::new(bool_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int16_field", DataType::Int16, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())], + )), + ), + Arc::new(int16_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())], + )), + ), + Arc::new(int32_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())], + )), + ), + Arc::new(int64_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("float32_field", DataType::Float32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]), + ), + ), + Arc::new(float32_array) as ArrayRef, + ), + ]); + let struct_type = StructType::new(vec![ + Arc::new(NestedField::optional( + 1, + "int16_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 3, + "int64_field", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "int32_field", + Type::Primitive(PrimitiveType::Int), + )), + ]); + let result = arrow_struct_to_literal_from_field_id(&struct_array, &struct_type).unwrap(); + assert_eq!(result, vec![ + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(1)), + Some(Literal::bool(true)), + Some(Literal::long(5)), + Some(Literal::int(3)), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(2)), + Some(Literal::bool(false)), + Some(Literal::long(6)), + Some(Literal::int(4)), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + None, None, None, None, + ]))), + ]); + } +} diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index b897d1574..bd301ad38 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -18,7 +18,7 @@ //! Catalog API for Apache Iceberg use std::collections::HashMap; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::mem::take; use std::ops::Deref; @@ -149,6 +149,12 @@ impl NamespaceIdent { } } +impl Display for NamespaceIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.join(".")) + } +} + impl AsRef> for NamespaceIdent { fn as_ref(&self) -> &Vec { &self.0 @@ -232,6 +238,12 @@ impl TableIdent { } } +impl Display for TableIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.namespace, self.name) + } +} + /// TableCreation represents the creation of a table in the catalog. #[derive(Debug, TypedBuilder)] pub struct TableCreation { diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 8365d622c..db35feb2f 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -323,6 +323,14 @@ impl OutputFile { Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?) } + /// Delete output file + pub async fn delete(&self) -> crate::Result<()> { + if self.exists().await? { + self.op.delete(&self.path[self.relative_path_pos..]).await? + } + Ok(()) + } + /// Converts into [`InputFile`]. pub fn to_input_file(self) -> InputFile { InputFile { diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index a868c7b11..a562fef9d 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter}; use bytes::Bytes; +use itertools::Itertools; use serde_derive::{Deserialize, Serialize}; use serde_json::to_vec; use serde_with::{DeserializeFromStr, SerializeDisplay}; @@ -31,7 +32,8 @@ use typed_builder::TypedBuilder; use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ BoundPartitionSpec, Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, - Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, + PrimitiveLiteral, PrimitiveType, Schema, SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, + UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::io::OutputFile; @@ -128,7 +130,59 @@ pub struct ManifestWriter { key_metadata: Vec, - field_summary: HashMap, + partitions: Vec, +} + +struct PartitionFieldStats { + partition_type: PrimitiveType, + summary: FieldSummary, +} + +impl PartitionFieldStats { + pub(crate) fn new(partition_type: PrimitiveType) -> Self { + Self { + partition_type, + summary: Default::default(), + } + } + + pub(crate) fn update(&mut self, value: Option) -> Result<()> { + if let Some(value) = value { + if !self.partition_type.compatible(&value) { + return Err(Error::new( + ErrorKind::DataInvalid, + "value is not compatitable with type", + )); + } + let value = Datum::new(self.partition_type.clone(), value); + if value.is_nan() { + self.summary.contains_nan = Some(true); + } else { + if let Some(lower) = self.summary.lower_bound.as_mut() { + if value < *lower { + *lower = value.clone(); + } + } else { + self.summary.lower_bound = Some(value.clone()); + } + if let Some(upper) = self.summary.upper_bound.as_mut() { + if value < *upper { + *upper = value; + } + } else { + self.summary.lower_bound = Some(value); + } + } + } else { + self.summary.contains_null = true; + } + + Ok(()) + } + + pub(crate) fn finish(self) -> FieldSummary { + self.summary + } } impl ManifestWriter { @@ -145,62 +199,28 @@ impl ManifestWriter { deleted_rows: 0, min_seq_num: None, key_metadata, - field_summary: HashMap::new(), + partitions: vec![], } } - fn update_field_summary(&mut self, entry: &ManifestEntry) { - // Update field summary - for (&k, &v) in &entry.data_file.null_value_counts { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_null = true; - } - } - - for (&k, &v) in &entry.data_file.nan_value_counts { - let field_summary = self.field_summary.entry(k).or_default(); - if v > 0 { - field_summary.contains_nan = Some(true); - } - if v == 0 { - field_summary.contains_nan = Some(false); - } - } - - for (&k, v) in &entry.data_file.lower_bounds { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.lower_bound { - if v < cur { - field_summary.lower_bound = Some(v.clone()); - } - } else { - field_summary.lower_bound = Some(v.clone()); - } - } - - for (&k, v) in &entry.data_file.upper_bounds { - let field_summary = self.field_summary.entry(k).or_default(); - if let Some(cur) = &field_summary.upper_bound { - if v > cur { - field_summary.upper_bound = Some(v.clone()); - } - } else { - field_summary.upper_bound = Some(v.clone()); + fn construct_partition_summaries( + &mut self, + partition_spec: &BoundPartitionSpec, + ) -> Result> { + let partitions = std::mem::take(&mut self.partitions); + let mut field_stats: Vec<_> = partition_spec + .partition_type() + .fields() + .iter() + .map(|f| PartitionFieldStats::new(f.field_type.as_primitive_type().unwrap().clone())) + .collect(); + for partition in partitions { + for (literal, stat) in partition.into_iter().zip_eq(field_stats.iter_mut()) { + let primitve_literal = literal.map(|v| v.as_primitive_literal().unwrap()); + stat.update(primitve_literal)?; } } - } - - fn get_field_summary_vec(&mut self, spec_fields: &[PartitionField]) -> Vec { - let mut partition_summary = Vec::with_capacity(self.field_summary.len()); - for field in spec_fields { - let entry = self - .field_summary - .remove(&field.source_id) - .unwrap_or_default(); - partition_summary.push(entry); - } - partition_summary + Ok(field_stats.into_iter().map(|stat| stat.finish()).collect()) } /// Write a manifest. @@ -276,7 +296,7 @@ impl ManifestWriter { } } - self.update_field_summary(&entry); + self.partitions.push(entry.data_file.partition.clone()); let value = match manifest.metadata.format_version { FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( @@ -299,7 +319,7 @@ impl ManifestWriter { self.output.write(Bytes::from(content)).await?; let partition_summary = - self.get_field_summary_vec(manifest.metadata.partition_spec.fields()); + self.construct_partition_summaries(&manifest.metadata.partition_spec)?; Ok(ManifestFile { manifest_path: self.output.location().to_string(), @@ -1157,6 +1177,10 @@ impl DataFile { pub fn sort_order_id(&self) -> Option { self.sort_order_id } + + pub(crate) fn rewrite_partition(&mut self, partition: Struct) { + self.partition = partition; + } } /// Type of content stored by the data file: data, equality deletes, or /// position deletes (all v1 files are data files) @@ -1223,6 +1247,8 @@ impl std::fmt::Display for DataFileFormat { } } +pub use _serde::DataFile as SerializedDataFile; + mod _serde { use std::collections::HashMap; @@ -1299,9 +1325,10 @@ mod _serde { } } + /// DataFile can used to be serialize. #[serde_as] #[derive(Serialize, Deserialize)] - pub(super) struct DataFile { + pub struct DataFile { #[serde(default)] content: i32, file_path: String, @@ -1325,6 +1352,7 @@ mod _serde { } impl DataFile { + /// Create a SerializedDataFile from a DataFile pub fn try_from( value: super::DataFile, partition_type: &StructType, @@ -1354,6 +1382,8 @@ mod _serde { sort_order_id: value.sort_order_id, }) } + + /// Convert a SerializedDataFile to a DataFile pub fn try_into( self, partition_type: &StructType, diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 5768b79d5..31ff9302e 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -857,12 +857,22 @@ pub(super) mod _serde { contains_nan: self.contains_nan, lower_bound: self .lower_bound - .map(|v| Datum::try_from_bytes(&v, r#type.clone())) - .transpose()?, + .as_ref() + .map(|v| Datum::try_from_bytes(v, r#type.clone())) + .transpose() + .map_err(|err| { + err.with_context("type", format!("{:?}", r#type)) + .with_context("bytes", format!("{:?}", self.lower_bound)) + })?, upper_bound: self .upper_bound + .as_ref() .map(|v| Datum::try_from_bytes(&v, r#type.clone())) - .transpose()?, + .transpose() + .map_err(|err| { + err.with_context("type", format!("{:?}", r#type)) + .with_context("bytes", format!("{:?}", self.upper_bound)) + })?, }) } } diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 6b7d03f11..29401f174 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -197,7 +197,27 @@ impl Transform { )) } } - Transform::Year | Transform::Month | Transform::Day => { + Transform::Year | Transform::Month => { + if let Type::Primitive(p) = input_type { + match p { + PrimitiveType::Timestamp + | PrimitiveType::Timestamptz + | PrimitiveType::TimestampNs + | PrimitiveType::TimestamptzNs + | PrimitiveType::Date => Ok(Type::Primitive(PrimitiveType::Int)), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )), + } + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("{input_type} is not a valid input type of {self} transform",), + )) + } + } + Transform::Day => { if let Type::Primitive(p) = input_type { match p { PrimitiveType::Timestamp diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 49748df0d..76adc46d5 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1431,6 +1431,16 @@ impl Literal { Self::Primitive(PrimitiveLiteral::Long(value)) } + /// Creates a timestamp from unix epoch in nanoseconds. + pub fn timestamp_nano(value: i64) -> Self { + Self::Primitive(PrimitiveLiteral::Long(value)) + } + + /// Creates a timestamp with timezone from unix epoch in nanoseconds. + pub fn timestamptz_nano(value: i64) -> Self { + Self::Primitive(PrimitiveLiteral::Long(value)) + } + /// Creates a timestamp from [`DateTime`]. pub fn timestamp_from_datetime(dt: DateTime) -> Self { Self::timestamp(dt.with_timezone(&Utc).timestamp_micros()) @@ -2087,6 +2097,8 @@ mod timestamptz { } mod _serde { + use std::collections::HashMap; + use serde::de::Visitor; use serde::ser::{SerializeMap, SerializeSeq, SerializeStruct}; use serde::{Deserialize, Serialize}; @@ -2676,22 +2688,24 @@ mod _serde { optional: _, }) => match ty { Type::Struct(struct_ty) => { - let iters: Vec> = required - .into_iter() - .map(|(field_name, value)| { - let field = struct_ty - .field_by_name(field_name.as_str()) - .ok_or_else(|| { - invalid_err_with_reason( - "record", - &format!("field {} is not exist", &field_name), - ) - })?; - let value = value.try_into(&field.field_type)?; - Ok(value) + let mut value_map: HashMap = + required.into_iter().collect(); + let values = struct_ty + .fields() + .iter() + .map(|f| { + if let Some(raw_value) = value_map.remove(&f.name) { + let value = raw_value.try_into(&f.field_type)?; + Ok(value) + } else { + Err(invalid_err_with_reason( + "record", + &format!("field {} is not exist", &f.name), + )) + } }) - .collect::>()?; - Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) + .collect::, Error>>()?; + Ok(Some(Literal::Struct(super::Struct::from_iter(values)))) } Type::Map(map_ty) => { if *map_ty.key_field.field_type != Type::Primitive(PrimitiveType::String) { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index edf1a8596..4f8af177e 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -28,10 +28,10 @@ use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, - Summary, Transform, MAIN_BRANCH, + DataContentType, DataFile, DataFileFormat, FormatVersion, Manifest, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + Struct, StructType, Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; @@ -170,6 +170,17 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + /// Commit transaction with dynamic catalog. + pub async fn commit_dyn(self, catalog: &dyn Catalog) -> Result { + let table_commit = TableCommit::builder() + .ident(self.table.identifier().clone()) + .updates(self.updates) + .requirements(self.requirements) + .build(); + + catalog.update_table(table_commit).await + } } /// FastAppendAction is a transaction action for fast append data files to the table. @@ -284,6 +295,7 @@ struct SnapshotProduceAction<'a> { commit_uuid: Uuid, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -304,6 +316,7 @@ impl<'a> SnapshotProduceAction<'a> { commit_uuid, snapshot_properties, added_data_files: vec![], + added_delete_files: vec![], manifest_counter: (0..), key_metadata, }) @@ -335,7 +348,12 @@ impl<'a> SnapshotProduceAction<'a> { return Err(Error::new( ErrorKind::DataInvalid, "Partition value is not compatitable partition type", - )); + ) + .with_context( + "partition value", + format!("{:?}", &value.as_primitive_literal().unwrap()), + ) + .with_context("partition type", format!("{:?}", field.field_type))); } } Ok(()) @@ -347,13 +365,7 @@ impl<'a> SnapshotProduceAction<'a> { data_files: impl IntoIterator, ) -> Result<&mut Self> { let data_files: Vec = data_files.into_iter().collect(); - for data_file in &data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } + for data_file in data_files { Self::validate_partition_value( data_file.partition(), self.tx @@ -362,8 +374,12 @@ impl<'a> SnapshotProduceAction<'a> { .default_partition_spec() .partition_type(), )?; + if data_file.content_type() == DataContentType::Data { + self.added_data_files.push(data_file); + } else { + self.added_delete_files.push(data_file); + } } - self.added_data_files.extend(data_files); Ok(self) } @@ -380,8 +396,31 @@ impl<'a> SnapshotProduceAction<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); + async fn write_added_manifest( + &mut self, + added_data_files: Vec, + ) -> Result { + let content_type = { + let mut data_num = 0; + let mut delete_num = 0; + for f in &added_data_files { + match f.content_type() { + DataContentType::Data => data_num = data_num + 1, + DataContentType::PositionDeletes => delete_num = delete_num + 1, + DataContentType::EqualityDeletes => delete_num = delete_num + 1, + } + } + if data_num == added_data_files.len() { + ManifestContentType::Data + } else if delete_num == added_data_files.len() { + ManifestContentType::Deletes + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "added DataFile for a ManifestFile should be same type (Data or Delete)", + )); + } + }; let manifest_entries = added_data_files .into_iter() .map(|data_file| { @@ -410,7 +449,7 @@ impl<'a> SnapshotProduceAction<'a> { .as_ref() .clone(), ) - .content(crate::spec::ManifestContentType::Data) + .content(content_type) .build(); let manifest = Manifest::new(manifest_meta, manifest_entries); let writer = ManifestWriter::new( @@ -426,12 +465,13 @@ impl<'a> SnapshotProduceAction<'a> { snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; + let data_files = std::mem::take(&mut self.added_data_files); + let delete_files = std::mem::take(&mut self.added_delete_files); + let added_manifest = self.write_added_manifest(data_files).await?; + let added_delete_manifest = self.write_added_manifest(delete_files).await?; let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - // # TODO - // Support process delete entries. - let mut manifest_files = vec![added_manifest]; + let mut manifest_files = vec![added_manifest, added_delete_manifest]; manifest_files.extend(existing_manifests); let manifest_files = manifest_process.process_manifeset(manifest_files); Ok(manifest_files) diff --git a/crates/iceberg/src/transform/temporal.rs b/crates/iceberg/src/transform/temporal.rs index f326cfed6..7528e0735 100644 --- a/crates/iceberg/src/transform/temporal.rs +++ b/crates/iceberg/src/transform/temporal.rs @@ -272,7 +272,7 @@ impl Day { impl TransformFunction for Day { fn transform(&self, input: ArrayRef) -> Result { - let res: Int32Array = match input.data_type() { + let res: Date32Array = match input.data_type() { DataType::Timestamp(TimeUnit::Microsecond, _) => input .as_any() .downcast_ref::() @@ -324,7 +324,7 @@ impl TransformFunction for Day { )) } }; - Ok(Some(Datum::int(val))) + Ok(Some(Datum::date(val))) } } diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index c32c98bbc..4f3337116 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -20,7 +20,7 @@ use arrow_array::RecordBatch; use itertools::Itertools; -use crate::spec::{DataContentType, DataFile, Struct}; +use crate::spec::{DataContentType, DataFile, SchemaRef, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; use crate::Result; @@ -29,25 +29,17 @@ use crate::Result; #[derive(Clone)] pub struct DataFileWriterBuilder { inner: B, + partition_value: Option, + schema: SchemaRef, } impl DataFileWriterBuilder { /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B) -> Self { - Self { inner } - } -} - -/// Config for `DataFileWriter`. -pub struct DataFileWriterConfig { - partition_value: Struct, -} - -impl DataFileWriterConfig { - /// Create a new `DataFileWriterConfig` with partition value. - pub fn new(partition_value: Option) -> Self { + pub fn new(schema: SchemaRef, inner: B, partition_value: Option) -> Self { Self { - partition_value: partition_value.unwrap_or(Struct::empty()), + inner, + partition_value, + schema, } } } @@ -55,12 +47,11 @@ impl DataFileWriterConfig { #[async_trait::async_trait] impl IcebergWriterBuilder for DataFileWriterBuilder { type R = DataFileWriter; - type C = DataFileWriterConfig; - async fn build(self, config: Self::C) -> Result { + async fn build(self) -> Result { Ok(DataFileWriter { - inner_writer: Some(self.inner.clone().build().await?), - partition_value: config.partition_value, + inner_writer: Some(self.inner.clone().build(self.schema).await?), + partition_value: self.partition_value.unwrap_or(Struct::empty()), }) } } @@ -104,6 +95,10 @@ impl CurrentFileStatus for DataFileWriter { fn current_written_size(&self) -> usize { self.inner_writer.as_ref().unwrap().current_written_size() } + + fn current_schema(&self) -> SchemaRef { + self.inner_writer.as_ref().unwrap().current_schema() + } } #[cfg(test)] @@ -115,9 +110,7 @@ mod test { use crate::io::FileIOBuilder; use crate::spec::{DataContentType, DataFileFormat, Schema, Struct}; - use crate::writer::base_writer::data_file_writer::{ - DataFileWriterBuilder, DataFileWriterConfig, - }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; @@ -135,14 +128,14 @@ mod test { let pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), - Arc::new(Schema::builder().build().unwrap()), file_io.clone(), location_gen, file_name_gen, ); - let mut data_file_writer = DataFileWriterBuilder::new(pw) - .build(DataFileWriterConfig::new(None)) - .await?; + let mut data_file_writer = + DataFileWriterBuilder::new(Arc::new(Schema::builder().build().unwrap()), pw, None) + .build() + .await?; let data_file = data_file_writer.close().await.unwrap(); assert_eq!(data_file.len(), 1); diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 222961fc4..5c3b4473f 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -20,12 +20,12 @@ use std::sync::Arc; use arrow_array::RecordBatch; -use arrow_schema::{DataType, Field, SchemaRef as ArrowSchemaRef}; +use arrow_schema::{DataType, Field}; use itertools::Itertools; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::record_batch_projector::RecordBatchProjector; -use crate::arrow::schema_to_arrow_schema; +use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; use crate::spec::{DataFile, SchemaRef, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{IcebergWriter, IcebergWriterBuilder}; @@ -35,17 +35,6 @@ use crate::{Error, ErrorKind, Result}; #[derive(Clone)] pub struct EqualityDeleteFileWriterBuilder { inner: B, -} - -impl EqualityDeleteFileWriterBuilder { - /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. - pub fn new(inner: B) -> Self { - Self { inner } - } -} - -/// Config for `EqualityDeleteWriter`. -pub struct EqualityDeleteWriterConfig { // Field ids used to determine row equality in equality delete files. equality_ids: Vec, // Projector used to project the data chunk into specific fields. @@ -53,25 +42,25 @@ pub struct EqualityDeleteWriterConfig { partition_value: Struct, } -impl EqualityDeleteWriterConfig { - /// Create a new `DataFileWriterConfig` with equality ids. +impl EqualityDeleteFileWriterBuilder { + /// Create a new `EqualityDeleteFileWriterBuilder` using a `FileWriterBuilder`. pub fn new( + inner: B, equality_ids: Vec, original_schema: SchemaRef, partition_value: Option, ) -> Result { let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); let projector = RecordBatchProjector::new( - original_arrow_schema, + &original_arrow_schema, &equality_ids, // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids + // and https://iceberg.apache.org/spec/#equality-delete-files // - The identifier field ids must be used for primitive types. // - The identifier field ids must not be used for floating point types or nullable fields. - // - The identifier field ids can be nested field of struct but not nested field of nullable struct. |field| { // Only primitive type is allowed to be used for identifier field ids - if field.is_nullable() - || !field.data_type().is_primitive() + if field.data_type().is_nested() || matches!( field.data_type(), DataType::Float16 | DataType::Float32 | DataType::Float64 @@ -90,32 +79,30 @@ impl EqualityDeleteWriterConfig { .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?, )) }, - |field: &Field| !field.is_nullable(), + |_field: &Field| true, )?; Ok(Self { + inner, equality_ids, projector, partition_value: partition_value.unwrap_or(Struct::empty()), }) } - - /// Return projected Schema - pub fn projected_arrow_schema_ref(&self) -> &ArrowSchemaRef { - self.projector.projected_schema_ref() - } } #[async_trait::async_trait] impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { type R = EqualityDeleteFileWriter; - type C = EqualityDeleteWriterConfig; - async fn build(self, config: Self::C) -> Result { + async fn build(self) -> Result { + let schema = Arc::new(arrow_schema_to_schema( + self.projector.projected_schema_ref(), + )?); Ok(EqualityDeleteFileWriter { - inner_writer: Some(self.inner.clone().build().await?), - projector: config.projector, - equality_ids: config.equality_ids, - partition_value: config.partition_value, + inner_writer: Some(self.inner.clone().build(schema).await?), + projector: self.projector, + equality_ids: self.equality_ids, + partition_value: self.partition_value, }) } } @@ -169,7 +156,7 @@ mod test { use std::sync::Arc; use arrow_array::types::Int32Type; - use arrow_array::{ArrayRef, Int32Array, RecordBatch, StructArray}; + use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StructArray}; use arrow_schema::DataType; use arrow_select::concat::concat_batches; use itertools::Itertools; @@ -177,15 +164,13 @@ mod test { use parquet::file::properties::WriterProperties; use tempfile::TempDir; - use crate::arrow::{arrow_schema_to_schema, schema_to_arrow_schema}; + use crate::arrow::schema_to_arrow_schema; use crate::io::{FileIO, FileIOBuilder}; use crate::spec::{ DataFile, DataFileFormat, ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, }; - use crate::writer::base_writer::equality_delete_writer::{ - EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, - }; + use crate::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder; use crate::writer::file_writer::location_generator::test::MockLocationGenerator; use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; use crate::writer::file_writer::ParquetWriterBuilder; @@ -380,25 +365,19 @@ mod test { let columns = vec![col0, col1, col2, col3, col4]; let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap(); - let equality_ids = vec![0_i32, 8]; - let equality_config = - EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap(); - let delete_schema = - arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); - let projector = equality_config.projector.clone(); - // prepare writer let pb = ParquetWriterBuilder::new( WriterProperties::builder().build(), - Arc::new(delete_schema), file_io.clone(), location_gen, file_name_gen, ); - - let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb) - .build(equality_config) - .await?; + let mut equality_delete_writer = + EqualityDeleteFileWriterBuilder::new(pb, vec![0_i32, 8], Arc::new(schema), None) + .unwrap() + .build() + .await?; + let projector = equality_delete_writer.projector.clone(); // write equality_delete_writer.write(to_write.clone()).await?; @@ -419,6 +398,19 @@ mod test { #[tokio::test] async fn test_equality_delete_unreachable_column() -> Result<(), anyhow::Error> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pb = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let schema = Arc::new( Schema::builder() .with_schema_id(1) @@ -480,23 +472,42 @@ mod test { .unwrap(), ); // Float and Double are not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err()); - // Int is nullable, not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![2], schema.clone(), None).is_err()); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![0], schema.clone(), None) + .is_err() + ); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![1], schema.clone(), None) + .is_err() + ); // Struct is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err()); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![3], schema.clone(), None) + .is_err() + ); // Nested field of struct is allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok()); - // Nested field of optional struct is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![6], schema.clone(), None).is_err()); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![4], schema.clone(), None).is_ok() + ); // Nested field of map is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err()); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![7], schema.clone(), None) + .is_err() + ); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![8], schema.clone(), None) + .is_err() + ); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![9], schema.clone(), None) + .is_err() + ); // Nested field of list is not allowed to be used for equality delete - assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err()); - assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err()); + assert!( + EqualityDeleteFileWriterBuilder::new(pb.clone(), vec![10], schema.clone(), None) + .is_err() + ); + assert!(EqualityDeleteFileWriterBuilder::new(pb, vec![11], schema.clone(), None).is_err()); Ok(()) } diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6..ff1b8a463 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,4 @@ pub mod data_file_writer; pub mod equality_delete_writer; +pub mod sort_position_delete_writer; diff --git a/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs new file mode 100644 index 000000000..b10f24b35 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort position delete file writer. +use std::collections::BTreeMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use once_cell::sync::Lazy; + +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{DataFile, NestedField, PrimitiveType, Schema, SchemaRef, Struct, Type}; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::Result; + +/// Builder for `MemoryPositionDeleteWriter`. +#[derive(Clone)] +pub struct SortPositionDeleteWriterBuilder { + inner: B, + cache_num: usize, + partition_value: Option, +} + +impl SortPositionDeleteWriterBuilder { + /// Create a new `SortPositionDeleteWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B, cache_num: usize, partition_value: Option) -> Self { + Self { + inner, + cache_num, + partition_value, + } + } +} + +/// Schema for position delete file. +pub static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { + Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 2147483546, + "file_path", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::required( + 2147483545, + "pos", + Type::Primitive(PrimitiveType::Long), + )), + ]) + .build() + .unwrap(), + ) +}); + +/// Arrow schema for position delete file. +pub static POSITION_DELETE_ARROW_SCHEMA: Lazy = + Lazy::new(|| Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap())); + +#[async_trait::async_trait] +impl IcebergWriterBuilder> + for SortPositionDeleteWriterBuilder +{ + type R = SortPositionDeleteWriter; + + async fn build(self) -> Result { + Ok(SortPositionDeleteWriter { + inner_writer_builder: self.inner.clone(), + cache_num: self.cache_num, + cache: BTreeMap::new(), + data_files: Vec::new(), + partition_value: self.partition_value.unwrap_or(Struct::empty()), + }) + } +} + +/// Position delete input. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +pub struct PositionDeleteInput { + /// The path of the file. + pub path: String, + /// The offset of the position delete. + pub offset: i64, +} + +/// The memory position delete writer. +pub struct SortPositionDeleteWriter { + inner_writer_builder: B, + cache_num: usize, + cache: BTreeMap>, + data_files: Vec, + partition_value: Struct, +} + +impl SortPositionDeleteWriter { + /// Get the current number of cache rows. + pub fn current_cache_number(&self) -> usize { + self.cache.len() + } +} + +impl SortPositionDeleteWriter { + async fn write_cache_out(&mut self) -> Result<()> { + let mut keys = Vec::new(); + let mut values = Vec::new(); + let mut cache = std::mem::take(&mut self.cache); + for (key, offsets) in cache.iter_mut() { + offsets.sort(); + let key_ref = key.as_str(); + for offset in offsets { + keys.push(key_ref); + values.push(*offset); + } + } + let key_array = Arc::new(StringArray::from(keys)) as ArrayRef; + let value_array = Arc::new(Int64Array::from(values)) as ArrayRef; + let record_batch = RecordBatch::try_new(POSITION_DELETE_ARROW_SCHEMA.clone(), vec![ + key_array, + value_array, + ])?; + let mut writer = self + .inner_writer_builder + .clone() + .build(POSITION_DELETE_SCHEMA.clone()) + .await?; + writer.write(&record_batch).await?; + self.data_files + .extend(writer.close().await?.into_iter().map(|mut res| { + res.content(crate::spec::DataContentType::PositionDeletes); + res.partition(self.partition_value.clone()); + res.build().expect("Guaranteed to be valid") + })); + Ok(()) + } +} + +/// Implement `IcebergWriter` for `PositionDeleteWriter`. +#[async_trait::async_trait] +impl IcebergWriter for SortPositionDeleteWriter { + async fn write(&mut self, input: PositionDeleteInput) -> Result<()> { + if let Some(v) = self.cache.get_mut(&input.path) { + v.push(input.offset); + } else { + self.cache + .insert(input.path.to_string(), vec![input.offset]); + } + + if self.cache.len() >= self.cache_num { + self.write_cache_out().await?; + } + Ok(()) + } + + async fn close(&mut self) -> Result> { + self.write_cache_out().await?; + Ok(std::mem::take(&mut self.data_files)) + } +} + +#[cfg(test)] +mod test { + use arrow_array::{Int64Array, StringArray}; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{DataContentType, DataFileFormat, Struct}; + use crate::writer::base_writer::sort_position_delete_writer::{ + PositionDeleteInput, SortPositionDeleteWriterBuilder, + }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_position_delete_writer() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None) + .build() + .await?; + + // Write some position delete inputs + let mut inputs = [ + PositionDeleteInput { + path: "file2.parquet".to_string(), + offset: 2, + }, + PositionDeleteInput { + path: "file2.parquet".to_string(), + offset: 1, + }, + PositionDeleteInput { + path: "file2.parquet".to_string(), + offset: 3, + }, + PositionDeleteInput { + path: "file3.parquet".to_string(), + offset: 2, + }, + PositionDeleteInput { + path: "file1.parquet".to_string(), + offset: 5, + }, + PositionDeleteInput { + path: "file1.parquet".to_string(), + offset: 4, + }, + PositionDeleteInput { + path: "file1.parquet".to_string(), + offset: 1, + }, + ]; + for input in inputs.iter() { + position_delete_writer.write(input.clone()).await?; + } + + let data_files = position_delete_writer.close().await.unwrap(); + assert_eq!(data_files.len(), 1); + assert_eq!(data_files[0].file_format, DataFileFormat::Parquet); + assert_eq!(data_files[0].content, DataContentType::PositionDeletes); + assert_eq!(data_files[0].partition, Struct::empty()); + + let parquet_file = file_io + .new_input(&data_files[0].file_path)? + .read() + .await + .unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap(); + let reader = builder.build().unwrap(); + let batches = reader.map(|x| x.unwrap()).collect::>(); + + let path_column = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let offset_column = batches[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + inputs.sort_by(|a, b| a.path.cmp(&b.path).then_with(|| a.offset.cmp(&b.offset))); + for (i, input) in inputs.iter().enumerate() { + assert_eq!(path_column.value(i), input.path); + assert_eq!(offset_column.value(i), input.offset); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 4a0fffcc1..3d00af72a 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -21,7 +21,7 @@ use arrow_array::RecordBatch; use futures::Future; use super::CurrentFileStatus; -use crate::spec::DataFileBuilder; +use crate::spec::{DataFileBuilder, SchemaRef}; use crate::Result; mod parquet_writer; @@ -37,7 +37,7 @@ pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. - fn build(self) -> impl Future> + Send; + fn build(self, schema: SchemaRef) -> impl Future> + Send; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index f4cec63ed..e2b7ce5f8 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -49,8 +49,6 @@ use crate::{Error, ErrorKind, Result}; #[derive(Clone)] pub struct ParquetWriterBuilder { props: WriterProperties, - schema: SchemaRef, - file_io: FileIO, location_generator: T, file_name_generator: F, @@ -61,14 +59,12 @@ impl ParquetWriterBuilder { /// To construct the write result, the schema should contain the `PARQUET_FIELD_ID_META_KEY` metadata for each field. pub fn new( props: WriterProperties, - schema: SchemaRef, file_io: FileIO, location_generator: T, file_name_generator: F, ) -> Self { Self { props, - schema, file_io, location_generator, file_name_generator, @@ -79,8 +75,7 @@ impl ParquetWriterBuilder { impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self) -> crate::Result { - let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); + async fn build(self, schema: SchemaRef) -> crate::Result { let written_size = Arc::new(AtomicI64::new(0)); let out_file = self.file_io.new_output( self.location_generator @@ -88,15 +83,15 @@ impl FileWriterBuilder for ParquetWr )?; let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone()); let async_writer = AsyncFileWriter::new(inner_writer); - let writer = - AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), Some(self.props)) - .map_err(|err| { - Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") - .with_source(err) - })?; + let arrow_schema: ArrowSchemaRef = Arc::new(schema.as_ref().try_into()?); + let writer = AsyncArrowWriter::try_new(async_writer, arrow_schema, Some(self.props)) + .map_err(|err| { + Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") + .with_source(err) + })?; Ok(ParquetWriter { - schema: self.schema.clone(), + schema: schema.clone(), writer, written_size, current_row_num: 0, @@ -411,6 +406,11 @@ impl FileWriter for ParquetWriter { Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err) })?; + if self.current_row_num == 0 { + self.out_file.delete().await?; + return Ok(vec![]); + } + let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed); Ok(vec![Self::to_data_file_builder( @@ -434,6 +434,10 @@ impl CurrentFileStatus for ParquetWriter { fn current_written_size(&self) -> usize { self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize } + + fn current_schema(&self) -> SchemaRef { + self.schema.clone() + } } /// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite. @@ -538,6 +542,17 @@ mod tests { NestedField::optional(14, "uuid", Type::Primitive(PrimitiveType::Uuid)).into(), NestedField::optional(15, "fixed", Type::Primitive(PrimitiveType::Fixed(10))) .into(), + // Parquet Statistics will use different representation for Decimal with precision 38 and scale 5, + // so we need to add a new field for it. + NestedField::optional( + 16, + "decimal_38", + Type::Primitive(PrimitiveType::Decimal { + precision: 38, + scale: 5, + }), + ) + .into(), ]) .build() .unwrap() @@ -656,12 +671,11 @@ mod tests { // write data let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), - Arc::new(to_write.schema().as_ref().try_into().unwrap()), file_io.clone(), location_gen, file_name_gen, ) - .build() + .build(Arc::new(to_write.schema().as_ref().try_into().unwrap())) .await?; pw.write(&to_write).await?; pw.write(&to_write_null).await?; @@ -852,12 +866,11 @@ mod tests { // write data let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), - Arc::new(schema), file_io.clone(), location_gen, file_name_gen, ) - .build() + .build(Arc::new(schema)) .await?; pw.write(&to_write).await?; let res = pw.close().await?; @@ -1028,21 +1041,25 @@ mod tests { ) .unwrap(), ) as ArrayRef; + let col16 = Arc::new( + arrow_array::Decimal128Array::from(vec![Some(1), Some(2), None, Some(100)]) + .with_precision_and_scale(38, 5) + .unwrap(), + ) as ArrayRef; let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, - col14, col15, + col14, col15, col16, ]) .unwrap(); // write data let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), - Arc::new(schema), file_io.clone(), loccation_gen, file_name_gen, ) - .build() + .build(Arc::new(schema)) .await?; pw.write(&to_write).await?; let res = pw.close().await?; @@ -1092,6 +1109,16 @@ mod tests { ), (14, Datum::uuid(Uuid::from_u128(0))), (15, Datum::fixed(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10])), + ( + 16, + Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 5 + }, + PrimitiveLiteral::Int128(1) + ) + ), ]) ); assert_eq!( @@ -1125,6 +1152,16 @@ mod tests { 15, Datum::fixed(vec![21, 22, 23, 24, 25, 26, 27, 28, 29, 30]) ), + ( + 16, + Datum::new( + PrimitiveType::Decimal { + precision: 38, + scale: 5 + }, + PrimitiveLiteral::Int128(100) + ) + ), ]) ); diff --git a/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs new file mode 100644 index 000000000..b29107306 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs @@ -0,0 +1,436 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the equality delta writer. + +use std::collections::HashMap; + +use arrow_array::builder::BooleanBuilder; +use arrow_array::{Int32Array, RecordBatch}; +use arrow_ord::partition::partition; +use arrow_row::{OwnedRow, RowConverter, Rows, SortField}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::record_batch_projector::RecordBatchProjector; +use crate::arrow::schema_to_arrow_schema; +use crate::spec::DataFile; +use crate::writer::base_writer::sort_position_delete_writer::PositionDeleteInput; +use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Insert operation. +pub const INSERT_OP: i32 = 1; +/// Delete operation. +pub const DELETE_OP: i32 = 2; + +/// Builder for `EqualityDeltaWriter`. +#[derive(Clone)] +pub struct EqualityDeltaWriterBuilder { + data_writer_builder: DB, + position_delete_writer_builder: PDB, + equality_delete_writer_builder: EDB, + unique_column_ids: Vec, +} + +impl EqualityDeltaWriterBuilder { + /// Create a new `EqualityDeltaWriterBuilder`. + pub fn new( + data_writer_builder: DB, + position_delete_writer_builder: PDB, + equality_delete_writer_builder: EDB, + unique_column_ids: Vec, + ) -> Self { + Self { + data_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + unique_column_ids, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for EqualityDeltaWriterBuilder +where + DB: IcebergWriterBuilder, + PDB: IcebergWriterBuilder, + EDB: IcebergWriterBuilder, + DB::R: CurrentFileStatus, +{ + type R = EqualityDeltaWriter; + + async fn build(self) -> Result { + Self::R::try_new( + self.data_writer_builder.build().await?, + self.position_delete_writer_builder.build().await?, + self.equality_delete_writer_builder.build().await?, + self.unique_column_ids, + ) + } +} + +/// Equality delta writer. +pub struct EqualityDeltaWriter { + data_writer: D, + position_delete_writer: PD, + equality_delete_writer: ED, + projector: RecordBatchProjector, + inserted_row: HashMap, + row_converter: RowConverter, +} + +impl EqualityDeltaWriter +where + D: IcebergWriter + CurrentFileStatus, + PD: IcebergWriter, + ED: IcebergWriter, +{ + pub(crate) fn try_new( + data_writer: D, + position_delete_writer: PD, + equality_delete_writer: ED, + unique_column_ids: Vec, + ) -> Result { + let projector = RecordBatchProjector::new( + &schema_to_arrow_schema(&data_writer.current_schema())?, + &unique_column_ids, + |field| { + if field.data_type().is_nested() { + return Ok(None); + } + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|s| { + s.parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string())) + }) + .transpose() + }, + |_| true, + )?; + let row_converter = RowConverter::new( + projector + .projected_schema_ref() + .fields() + .iter() + .map(|field| SortField::new(field.data_type().clone())) + .collect(), + )?; + Ok(Self { + data_writer, + position_delete_writer, + equality_delete_writer, + projector, + inserted_row: HashMap::new(), + row_converter, + }) + } + /// Write the batch. + /// 1. If a row with the same unique column is not written, then insert it. + /// 2. If a row with the same unique column is written, then delete the previous row and insert the new row. + async fn insert(&mut self, batch: RecordBatch) -> Result<()> { + let rows = self.extract_unique_column(&batch)?; + let current_file_path = self.data_writer.current_file_path(); + let current_file_offset = self.data_writer.current_row_num(); + for (idx, row) in rows.iter().enumerate() { + let previous_input = self.inserted_row.insert(row.owned(), PositionDeleteInput { + path: current_file_path.clone(), + offset: (current_file_offset + idx) as i64, + }); + if let Some(previous_input) = previous_input { + self.position_delete_writer.write(previous_input).await?; + } + } + + self.data_writer.write(batch).await?; + + Ok(()) + } + + async fn delete(&mut self, batch: RecordBatch) -> Result<()> { + let rows = self.extract_unique_column(&batch)?; + let mut delete_row = BooleanBuilder::new(); + for row in rows.iter() { + if let Some(previous_input) = self.inserted_row.remove(&row.owned()) { + self.position_delete_writer.write(previous_input).await?; + delete_row.append_value(false); + } else { + delete_row.append_value(true); + } + } + let delete_batch = filter_record_batch(&batch, &delete_row.finish()).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to filter record batch, error: {}", err), + ) + })?; + self.equality_delete_writer.write(delete_batch).await?; + Ok(()) + } + + fn extract_unique_column(&mut self, batch: &RecordBatch) -> Result { + self.row_converter + .convert_columns(&self.projector.project_column(batch.columns())?) + .map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to convert columns, error: {}", err), + ) + }) + } +} + +#[async_trait::async_trait] +impl IcebergWriter for EqualityDeltaWriter +where + D: IcebergWriter + CurrentFileStatus, + PD: IcebergWriter, + ED: IcebergWriter, +{ + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + // check the last column is int32 array. + let ops = batch + .column(batch.num_columns() - 1) + .as_any() + .downcast_ref::() + .ok_or(Error::new(ErrorKind::DataInvalid, ""))?; + + // partition the ops. + let partitions = + partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to partition ops, error: {}", err), + ) + })?; + for range in partitions.ranges() { + let batch = batch + .project(&(0..batch.num_columns() - 1).collect_vec()) + .unwrap() + .slice(range.start, range.end - range.start); + match ops.value(range.start) { + // Insert + INSERT_OP => self.insert(batch).await?, + // Delete + DELETE_OP => self.delete(batch).await?, + op => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid ops: {op}"), + )) + } + } + } + Ok(()) + } + + async fn close(&mut self) -> Result> { + let data_files = self.data_writer.close().await?; + let position_delete_files = self.position_delete_writer.close().await?; + let equality_delete_files = self.equality_delete_writer.close().await?; + Ok(data_files + .into_iter() + .chain(position_delete_files) + .chain(equality_delete_files) + .collect()) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{DataContentType, DataFileFormat, NestedField, PrimitiveType, Schema, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::base_writer::equality_delete_writer::EqualityDeleteFileWriterBuilder; + use crate::writer::base_writer::sort_position_delete_writer::{ + SortPositionDeleteWriterBuilder, POSITION_DELETE_ARROW_SCHEMA, + }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::equality_delta_writer::{ + EqualityDeltaWriterBuilder, DELETE_OP, INSERT_OP, + }; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_equality_delta_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(schema.clone(), pw.clone(), None); + let position_delete_writer_builder = + SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None); + let equality_delete_writer_builder = + EqualityDeleteFileWriterBuilder::new(pw, vec![1, 2], schema, None).unwrap(); + let mut equality_delta_writer = EqualityDeltaWriterBuilder::new( + data_file_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + vec![1, 2], + ) + .build() + .await + .unwrap(); + + // write data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + Field::new("op", DataType::Int32, false), + ])); + { + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + Arc::new(Int32Array::from(vec![INSERT_OP; 7])), + ]) + .expect("Failed to create RecordBatch"); + equality_delta_writer.write(batch).await?; + } + { + let id_array = Int64Array::from(vec![1, 2, 3, 4]); + let data_array = StringArray::from(vec!["a", "b", "k", "l"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + Arc::new(Int32Array::from(vec![ + DELETE_OP, DELETE_OP, DELETE_OP, INSERT_OP, + ])), + ]) + .expect("Failed to create RecordBatch"); + equality_delta_writer.write(batch).await?; + } + + let data_files = equality_delta_writer.close().await?; + assert_eq!(data_files.len(), 3); + // data file + let data_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let data_file = data_files + .iter() + .find(|file| file.content == DataContentType::Data) + .unwrap(); + let data_file_path = data_file.file_path().to_string(); + let input_file = file_io.new_input(&data_file_path).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&data_schema, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(data_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1, 4])), + Arc::new(StringArray::from(vec![ + "a", "b", "c", "d", "e", "f", "g", "l", + ])), + ]) + .unwrap(); + assert_eq!(expected_batches, res); + + // position delete file + let position_delete_file = data_files + .iter() + .find(|file| file.content == DataContentType::PositionDeletes) + .unwrap(); + let input_file = file_io + .new_input(position_delete_file.file_path.clone()) + .unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&POSITION_DELETE_ARROW_SCHEMA, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(POSITION_DELETE_ARROW_SCHEMA.clone(), vec![ + Arc::new(StringArray::from(vec![ + data_file_path.clone(), + data_file_path.clone(), + ])), + Arc::new(Int64Array::from(vec![0, 1])), + ]) + .unwrap(); + assert_eq!(expected_batches, res); + + // equality delete file + let equality_delete_file = data_files + .iter() + .find(|file| file.content == DataContentType::EqualityDeletes) + .unwrap(); + let input_file = file_io + .new_input(equality_delete_file.file_path.clone()) + .unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&data_schema, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(data_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3])), + Arc::new(StringArray::from(vec!["k"])), + ]) + .unwrap(); + assert_eq!(expected_batches, res); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs new file mode 100644 index 000000000..c089607c8 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs @@ -0,0 +1,272 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the fanout partition writer. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_row::OwnedRow; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use itertools::Itertools; + +use crate::arrow::{schema_to_arrow_schema, RecordBatchPartitionSpliter}; +use crate::spec::{BoundPartitionSpecRef, DataFile}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::Result; + +/// The builder for `FanoutPartitionWriter`. +#[derive(Clone)] +pub struct FanoutPartitionWriterBuilder { + inner_builder: B, + partition_specs: BoundPartitionSpecRef, + arrow_schema: ArrowSchemaRef, +} + +impl FanoutPartitionWriterBuilder { + /// Create a new `FanoutPartitionWriterBuilder` with the default arrow schema. + pub fn new(inner_builder: B, partition_specs: BoundPartitionSpecRef) -> Result { + let arrow_schema = Arc::new(schema_to_arrow_schema(partition_specs.schema())?); + Ok(Self::new_with_custom_schema( + inner_builder, + partition_specs, + arrow_schema, + )) + } + + /// Create a new `FanoutPartitionWriterBuilder` with a custom arrow schema. + /// This function is useful for the user who has the input with extral columns. + pub fn new_with_custom_schema( + inner_builder: B, + partition_specs: BoundPartitionSpecRef, + arrow_schema: ArrowSchemaRef, + ) -> Self { + Self { + inner_builder, + partition_specs, + arrow_schema, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for FanoutPartitionWriterBuilder { + type R = FanoutPartitionWriter; + + async fn build(self) -> Result { + let partition_splitter = + RecordBatchPartitionSpliter::new(&self.arrow_schema, self.partition_specs)?; + Ok(FanoutPartitionWriter { + inner_writer_builder: self.inner_builder, + partition_splitter, + partition_writers: HashMap::new(), + }) + } +} + +/// The fanout partition writer. +/// It will split the input record batch by the partition specs, and write the splitted record batches to the inner writers. +pub struct FanoutPartitionWriter { + inner_writer_builder: B, + partition_splitter: RecordBatchPartitionSpliter, + partition_writers: HashMap, +} + +impl FanoutPartitionWriter { + /// Get the current number of partition writers. + pub fn partition_num(&self) -> usize { + self.partition_writers.len() + } +} + +#[async_trait::async_trait] +impl IcebergWriter for FanoutPartitionWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + let splits = self.partition_splitter.split(&input)?; + + for (partition, record_batch) in splits { + match self.partition_writers.entry(partition) { + Entry::Occupied(entry) => { + entry.into_mut().write(record_batch).await?; + } + Entry::Vacant(entry) => { + let writer = entry.insert(self.inner_writer_builder.clone().build().await?); + writer.write(record_batch).await?; + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let (partition_rows, writers): (Vec<_>, Vec<_>) = self.partition_writers.drain().unzip(); + let partition_values = self.partition_splitter.convert_row(partition_rows)?; + + let mut result = Vec::new(); + for (partition_value, mut writer) in partition_values.into_iter().zip_eq(writers) { + let mut data_files = writer.close().await?; + for data_file in data_files.iter_mut() { + data_file.rewrite_partition(partition_value.clone()); + } + result.append(&mut data_files); + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{ + BoundPartitionSpec, DataFileFormat, Literal, NestedField, PrimitiveLiteral, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionField, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_fanout_partition_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(schema, pw, None); + let mut fanout_partition_writer = + FanoutPartitionWriterBuilder::new(data_file_writer_builder, Arc::new(partition_spec)) + .unwrap() + .build() + .await + .unwrap(); + + // prepare data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + + fanout_partition_writer.write(batch).await?; + let data_files = fanout_partition_writer.close().await?; + assert_eq!(data_files.len(), 3); + let expected_partitions = vec![ + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(1)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(2)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(3)))]), + ]; + let expected_batches = vec![ + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 1, 1])), + Arc::new(StringArray::from(vec!["a", "c", "g"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![2, 2])), + Arc::new(StringArray::from(vec!["b", "e"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3, 3])), + Arc::new(StringArray::from(vec!["d", "f"])), + ]) + .unwrap(), + ]; + for (partition, batch) in expected_partitions + .into_iter() + .zip_eq(expected_batches.into_iter()) + { + assert!(data_files.iter().any(|file| file.partition == partition)); + let data_file = data_files + .iter() + .find(|file| file.partition == partition) + .unwrap(); + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(batch, res); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs new file mode 100644 index 000000000..58e2f7671 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the functional writer. + +pub mod equality_delta_writer; +pub mod fanout_partition_writer; +pub mod precompute_partition_writer; diff --git a/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs new file mode 100644 index 000000000..6896ee854 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs @@ -0,0 +1,277 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the precompute partition writer. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +use arrow_array::{RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::DataType; +use itertools::Itertools; + +use crate::arrow::{convert_row_to_struct, split_with_partition, type_to_arrow_type}; +use crate::spec::{BoundPartitionSpecRef, DataFile, Type}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// The builder for precompute partition writer. +#[derive(Clone)] +pub struct PrecomputePartitionWriterBuilder { + inner_writer_builder: B, + partition_spec: BoundPartitionSpecRef, +} + +impl PrecomputePartitionWriterBuilder { + /// Create a new precompute partition writer builder. + pub fn new(inner_writer_builder: B, partition_spec: BoundPartitionSpecRef) -> Self { + Self { + inner_writer_builder, + partition_spec, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder<(StructArray, RecordBatch)> + for PrecomputePartitionWriterBuilder +{ + type R = PrecomputePartitionWriter; + + async fn build(self) -> Result { + let arrow_type = + type_to_arrow_type(&Type::Struct(self.partition_spec.partition_type().clone()))?; + let DataType::Struct(fields) = &arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partition type is not a struct", + )); + }; + let partition_row_converter = RowConverter::new( + fields + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; + Ok(PrecomputePartitionWriter { + inner_writer_builder: self.inner_writer_builder, + partition_row_converter, + partition_spec: self.partition_spec, + partition_writers: HashMap::new(), + }) + } +} + +/// The precompute partition writer. +pub struct PrecomputePartitionWriter { + inner_writer_builder: B, + partition_writers: HashMap, + partition_row_converter: RowConverter, + partition_spec: BoundPartitionSpecRef, +} + +#[async_trait::async_trait] +impl IcebergWriter<(StructArray, RecordBatch)> + for PrecomputePartitionWriter +{ + async fn write(&mut self, input: (StructArray, RecordBatch)) -> Result<()> { + let splits = + split_with_partition(&self.partition_row_converter, input.0.columns(), &input.1)?; + + for (partition, record_batch) in splits { + match self.partition_writers.entry(partition) { + Entry::Occupied(entry) => { + entry.into_mut().write(record_batch).await?; + } + Entry::Vacant(entry) => { + let writer = entry.insert(self.inner_writer_builder.clone().build().await?); + writer.write(record_batch).await?; + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let (partition_rows, writers): (Vec<_>, Vec<_>) = self.partition_writers.drain().unzip(); + let partition_values = convert_row_to_struct( + &self.partition_row_converter, + self.partition_spec.partition_type(), + partition_rows, + )?; + + let mut result = Vec::new(); + for (partition_value, mut writer) in partition_values.into_iter().zip_eq(writers) { + let mut data_files = writer.close().await?; + for data_file in data_files.iter_mut() { + data_file.rewrite_partition(partition_value.clone()); + } + result.append(&mut data_files); + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{ + BoundPartitionSpec, DataFileFormat, Literal, NestedField, PrimitiveLiteral, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionField, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::precompute_partition_writer::PrecomputePartitionWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_precompute_partition_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(schema, pw, None); + let mut precompute_partition_writer = PrecomputePartitionWriterBuilder::new( + data_file_writer_builder, + Arc::new(partition_spec), + ) + .build() + .await + .unwrap(); + + // prepare data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + let id_bucket_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let partition_batch = StructArray::from(vec![( + Arc::new(Field::new("id_bucket", DataType::Int64, true)), + Arc::new(id_bucket_array) as ArrayRef, + )]); + + precompute_partition_writer + .write((partition_batch, batch)) + .await?; + let data_files = precompute_partition_writer.close().await?; + assert_eq!(data_files.len(), 3); + let expected_partitions = vec![ + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(1)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(2)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(3)))]), + ]; + let expected_batches = vec![ + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 1, 1])), + Arc::new(StringArray::from(vec!["a", "c", "g"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![2, 2])), + Arc::new(StringArray::from(vec!["b", "e"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3, 3])), + Arc::new(StringArray::from(vec!["d", "f"])), + ]) + .unwrap(), + ]; + for (partition, batch) in expected_partitions + .into_iter() + .zip_eq(expected_batches.into_iter()) + { + assert!(data_files.iter().any(|file| file.partition == partition)); + let data_file = data_files + .iter() + .find(|file| file.partition == partition) + .unwrap(); + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(batch, res); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 6cb9aaee6..745c521eb 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -47,10 +47,11 @@ pub mod base_writer; pub mod file_writer; +pub mod function_writer; use arrow_array::RecordBatch; -use crate::spec::DataFile; +use crate::spec::{DataFile, SchemaRef}; use crate::Result; type DefaultInput = RecordBatch; @@ -63,10 +64,8 @@ pub trait IcebergWriterBuilder: { /// The associated writer type. type R: IcebergWriter; - /// The associated writer config type used to build the writer. - type C; /// Build the iceberg writer. - async fn build(self, config: Self::C) -> Result; + async fn build(self) -> Result; } /// The iceberg writer used to write data to iceberg table. @@ -90,6 +89,8 @@ pub trait CurrentFileStatus { fn current_row_num(&self) -> usize; /// Get the current file written size. fn current_written_size(&self) -> usize; + /// Get the schema of the current file. + fn current_schema(&self) -> SchemaRef; } #[cfg(test)] diff --git a/crates/integration_tests/tests/append_data_file_test.rs b/crates/integration_tests/tests/append_data_file_test.rs index 87e805c23..a0ce73a05 100644 --- a/crates/integration_tests/tests/append_data_file_test.rs +++ b/crates/integration_tests/tests/append_data_file_test.rs @@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::Transaction; -use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; @@ -92,16 +92,16 @@ async fn test_append_data_file() { ); let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), - table.metadata().current_schema().clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); - let mut data_file_writer = data_file_writer_builder - .build(DataFileWriterConfig::new(None)) - .await - .unwrap(); + let data_file_writer_builder = DataFileWriterBuilder::new( + table.metadata().current_schema().clone(), + parquet_writer_builder, + None, + ); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index f3dd70f16..f5dce3a56 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -24,7 +24,7 @@ use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::Transaction; -use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; @@ -91,16 +91,16 @@ async fn test_append_data_file_conflict() { ); let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), - table.metadata().current_schema().clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), ); - let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder); - let mut data_file_writer = data_file_writer_builder - .build(DataFileWriterConfig::new(None)) - .await - .unwrap(); + let data_file_writer_builder = DataFileWriterBuilder::new( + table.metadata().current_schema().clone(), + parquet_writer_builder, + None, + ); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);