diff --git a/Cargo.lock b/Cargo.lock index 8a456f274..87bf36bd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2855,8 +2855,10 @@ dependencies = [ "array-init", "arrow-arith", "arrow-array", + "arrow-buffer", "arrow-cast", "arrow-ord", + "arrow-row", "arrow-schema", "arrow-select", "arrow-string", @@ -3041,6 +3043,7 @@ dependencies = [ "arrow-array", "arrow-schema", "futures", + "futures-util", "iceberg", "iceberg-catalog-rest", "iceberg_test_utils", diff --git a/Cargo.toml b/Cargo.toml index b796308be..6289ba235 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,11 +43,13 @@ apache-avro = "0.17" array-init = "2" arrow-arith = { version = "53" } arrow-array = { version = "53" } +arrow-buffer = { version = "53" } arrow-cast = { version = "53" } arrow-ord = { version = "53" } arrow-schema = { version = "53" } arrow-select = { version = "53" } arrow-string = { version = "53" } +arrow-row = { version = "53" } async-stream = "0.3.5" async-trait = "0.1" async-std = "1.12" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index baa2401f4..41b754340 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -46,8 +46,10 @@ apache-avro = { workspace = true } array-init = { workspace = true } arrow-arith = { workspace = true } arrow-array = { workspace = true } +arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 0f01324cb..c4d96d59d 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -22,5 +22,8 @@ pub use schema::*; mod reader; pub(crate) mod record_batch_projector; pub(crate) mod record_batch_transformer; - +mod value; pub use reader::*; +pub use value::*; +mod record_batch_partition_spliter; +pub(crate) use record_batch_partition_spliter::*; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 7fba2179a..47aca5244 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -39,6 +39,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use super::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; @@ -51,8 +52,6 @@ use crate::spec::{DataContentType, Datum, PrimitiveType, Schema}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; -use super::record_batch_transformer::RecordBatchTransformer; - /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -345,14 +344,19 @@ impl ArrowReader { if iceberg_field.is_none() || parquet_iceberg_field.is_none() { return; } - - if !type_promotion_is_valid( - parquet_iceberg_field - .unwrap() - .field_type - .as_primitive_type(), - iceberg_field.unwrap().field_type.as_primitive_type(), - ) { + if iceberg_field + .unwrap() + .field_type + .as_primitive_type() + .is_some() + && !type_promotion_is_valid( + parquet_iceberg_field + .unwrap() + .field_type + .as_primitive_type(), + iceberg_field.unwrap().field_type.as_primitive_type(), + ) + { return; } diff --git a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs new file mode 100644 index 000000000..2494a63ca --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs @@ -0,0 +1,473 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::{DataType, Schema as ArrowSchema}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use super::record_batch_projector::RecordBatchProjector; +use crate::arrow::{arrow_struct_to_literal, type_to_arrow_type}; +use crate::spec::{Literal, PartitionSpecRef, SchemaRef, Struct, StructType, Type}; +use crate::transform::{create_transform_function, BoxedTransformFunction}; +use crate::{Error, ErrorKind, Result}; + +/// A helper function to split the record batch into multiple record batches using computed partition columns. +pub(crate) fn split_with_partition( + row_converter: &RowConverter, + partition_columns: &[ArrayRef], + batch: &RecordBatch, +) -> Result> { + let rows = row_converter + .convert_columns(partition_columns) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{}", e)))?; + + // Group the batch by row value. + let mut group_ids = HashMap::new(); + rows.into_iter().enumerate().for_each(|(row_id, row)| { + group_ids.entry(row.owned()).or_insert(vec![]).push(row_id); + }); + + // Partition the batch with same partition partition_values + let mut partition_batches = Vec::with_capacity(group_ids.len()); + for (row, row_ids) in group_ids.into_iter() { + // generate the bool filter array from column_ids + let filter_array: BooleanArray = { + let mut filter = vec![false; batch.num_rows()]; + row_ids.into_iter().for_each(|row_id| { + filter[row_id] = true; + }); + filter.into() + }; + + // filter the RecordBatch + partition_batches.push(( + row, + filter_record_batch(batch, &filter_array) + .expect("We should guarantee the filter array is valid"), + )); + } + + Ok(partition_batches) +} + +pub(crate) fn convert_row_to_struct( + row_converter: &RowConverter, + struct_type: &StructType, + rows: Vec, +) -> Result> { + let arrow_struct_array = { + let partition_columns = row_converter + .convert_rows(rows.iter().map(|row| row.row())) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{e}")))?; + let partition_arrow_fields = { + let partition_arrow_type = type_to_arrow_type(&Type::Struct(struct_type.clone()))?; + let DataType::Struct(fields) = partition_arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partition arrow type is not a struct type", + )); + }; + fields + }; + StructArray::try_new(partition_arrow_fields, partition_columns, None)? + }; + let struct_array = { + let struct_array = arrow_struct_to_literal(&arrow_struct_array, struct_type)?; + struct_array + .into_iter() + .map(|s| { + if let Some(s) = s { + if let Literal::Struct(s) = s { + Ok(s) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "The struct is not a struct literal", + )) + } + } else { + Err(Error::new(ErrorKind::DataInvalid, "The struct is null")) + } + }) + .collect::>>()? + }; + + Ok(struct_array) +} + +/// The spliter used to split the record batch into multiple record batches by the partition spec. +pub(crate) struct RecordBatchPartitionSpliter { + partition_spec: PartitionSpecRef, + schema: SchemaRef, + projector: RecordBatchProjector, + transform_functions: Vec, + row_converter: RowConverter, +} + +impl RecordBatchPartitionSpliter { + pub(crate) fn new( + arrow_schema: &ArrowSchema, + table_schema: SchemaRef, + partition_spec: PartitionSpecRef, + ) -> Result { + if partition_spec.fields().is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Fail to create partition spliter using empty partition spec", + )); + } + let projector = RecordBatchProjector::new( + arrow_schema, + &partition_spec + .fields() + .iter() + .map(|field| field.source_id) + .collect::>(), + // The source columns, selected by ids, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct. + // ref: https://iceberg.apache.org/spec/#partitioning + |field| { + if field.data_type().is_nested() { + return Ok(None); + } + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|s| { + s.parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string())) + }) + .transpose() + }, + |_| true, + )?; + let transform_functions = partition_spec + .fields() + .iter() + .map(|field| create_transform_function(&field.transform)) + .collect::>>()?; + let row_converter = RowConverter::new( + partition_spec + .partition_type(&table_schema)? + .fields() + .iter() + .map(|f| Ok(SortField::new(type_to_arrow_type(&f.field_type)?))) + .collect::>>()?, + )?; + Ok(Self { + partition_spec, + schema: table_schema, + projector, + transform_functions, + row_converter, + }) + } + + /// Split the record batch into multiple record batches by the partition spec. + pub(crate) fn split(&self, batch: &RecordBatch) -> Result> { + // get array using partition spec + let source_columns = self.projector.project_column(batch.columns())?; + let partition_columns = source_columns + .into_iter() + .zip_eq(self.transform_functions.iter()) + .map(|(source_column, transform_function)| transform_function.transform(source_column)) + .collect::>>()?; + + split_with_partition(&self.row_converter, &partition_columns, batch) + } + + pub(crate) fn convert_row(&self, rows: Vec) -> Result> { + convert_row_to_struct( + &self.row_converter, + &self.partition_spec.partition_type(&self.schema)?, + rows, + ) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + + use super::*; + use crate::arrow::schema_to_arrow_schema; + use crate::spec::{NestedField, PartitionSpec, Schema, Transform, UnboundPartitionField}; + + #[test] + fn test_record_batch_partition_spliter() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let partition_spliter = RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(schema), + Arc::new(partition_spec), + ) + .expect("Failed to create spliter"); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("data", DataType::Utf8, false), + ])); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } + + #[test] + fn test_record_batch_partition_spliter_with_extra_columns() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("extra_column1", DataType::Utf8, true), + Field::new("id", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("extra_column2", DataType::Utf8, true), + ])); + let extra_column1_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let extra_column2_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(extra_column1_array), + Arc::new(id_array), + Arc::new(data_array), + Arc::new(extra_column2_array), + ]) + .expect("Failed to create RecordBatch"); + let partition_spliter = RecordBatchPartitionSpliter::new( + &arrow_schema, + Arc::new(schema), + Arc::new(partition_spec), + ) + .expect("Failed to create spliter"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_extra_column1_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_extra_column2_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_extra_column1_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_extra_column2_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_extra_column1_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_extra_column2_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } + + #[test] + fn test_empty_partition() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .build() + .unwrap(); + assert!(RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(schema), + Arc::new(partition_spec), + ) + .is_err()) + } +} diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index c311da1f1..2758bae3e 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use arrow_array::{ArrayRef, RecordBatch, StructArray}; +use arrow_array::{make_array, ArrayRef, RecordBatch, StructArray}; +use arrow_buffer::NullBuffer; use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaRef}; use crate::error::Result; @@ -42,7 +43,7 @@ impl RecordBatchProjector { /// This function will iterate through the nested fields if the field is a struct, `searchable_field_func` can be used to control whether /// iterate into the nested fields. pub(crate) fn new( - original_schema: SchemaRef, + original_schema: &Schema, field_ids: &[i32], field_id_fetch_func: F1, searchable_field_func: F2, @@ -138,6 +139,7 @@ impl RecordBatchProjector { fn get_column_by_field_index(batch: &[ArrayRef], field_index: &[usize]) -> Result { let mut rev_iterator = field_index.iter().rev(); let mut array = batch[*rev_iterator.next().unwrap()].clone(); + let mut null_buffer = array.logical_nulls(); for idx in rev_iterator { array = array .as_any() @@ -148,8 +150,11 @@ impl RecordBatchProjector { ))? .column(*idx) .clone(); + null_buffer = NullBuffer::union(null_buffer.as_ref(), array.logical_nulls().as_ref()); } - Ok(array) + Ok(make_array( + array.to_data().into_builder().nulls(null_buffer).build()?, + )) } } @@ -187,8 +192,7 @@ mod test { _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; let projector = - RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func, |_| true) - .unwrap(); + RecordBatchProjector::new(&schema, &[1, 3], field_id_fetch_func, |_| true).unwrap(); assert!(projector.field_indices.len() == 2); assert_eq!(projector.field_indices[0], vec![0]); @@ -250,8 +254,7 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[1, 5], field_id_fetch_func, |_| true); assert!(projector.is_err()); } @@ -280,12 +283,10 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| false); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| false); assert!(projector.is_err()); - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| true); assert!(projector.is_ok()); } } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index 91dfe85e9..41c3b5a59 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -30,6 +30,7 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::data_type::ByteArray; use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; use uuid::Uuid; @@ -680,6 +681,31 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result Result<[u8; 16]> { + if array.len() > 16 { + return Err(Error::new( + ErrorKind::DataInvalid, + "fail to extend array with len > 16 to array with 16", + )); + } + + // Check the sign bit: if the first byte's MSB is 1, it's negative + let is_negative = array.data().first().map_or(false, |&b| b & 0x80 != 0); + + // Create a buffer of 16 bytes filled with the sign extension value + let mut extended = if is_negative { + [0xFF; 16] // Fill with 0xFF for negative numbers + } else { + [0x00; 16] // Fill with 0x00 for positive numbers + }; + + // Copy the Vec into the rightmost part of the buffer + let start = 16 - array.len(); + extended[start..].copy_from_slice(array.data()); + + Ok(extended) +} + macro_rules! get_parquet_stat_as_datum { ($limit_type:tt) => { paste::paste! { @@ -741,7 +767,7 @@ macro_rules! get_parquet_stat_as_datum { }; Some(Datum::new( primitive_type.clone(), - PrimitiveLiteral::Int128(i128::from_be_bytes(bytes.try_into()?)), + PrimitiveLiteral::Int128(i128::from_be_bytes(extend_to_i128_big_endian(bytes.into())?)), )) } ( diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs new file mode 100644 index 000000000..702beff4d --- /dev/null +++ b/crates/iceberg/src/arrow/value.rs @@ -0,0 +1,1015 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::{ + Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, NullArray, StringArray, + StructArray, Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, +}; +use arrow_schema::{DataType, TimeUnit}; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::spec::{Literal, PrimitiveType, Struct, StructType, Type}; +use crate::{Error, ErrorKind, Result}; + +/// A post order arrow array visitor. +/// # TODO +/// - Add support for ListArray, MapArray +trait ArrowArrayVistor { + type T; + fn null(&self, array: &NullArray, iceberg_type: &PrimitiveType) -> Result>; + fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result>; + fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result>; + fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result>; + fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result>; + fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result>; + fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result>; + fn decimal( + &self, + array: &Decimal128Array, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result>; + fn time( + &self, + array: &Time64MicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp( + &self, + array: &TimestampMicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn timestamp_nano( + &self, + array: &TimestampNanosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result>; + fn large_string( + &self, + array: &LargeStringArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result>; + fn large_binary( + &self, + array: &LargeBinaryArray, + iceberg_type: &PrimitiveType, + ) -> Result>; + fn r#struct( + &self, + array: &StructArray, + iceberg_type: &StructType, + columns: Vec>, + ) -> Result>; +} + +struct ArrowArrayConvert; + +impl ArrowArrayVistor for ArrowArrayConvert { + type T = Option; + + fn null(&self, array: &NullArray, _iceberg_type: &PrimitiveType) -> Result> { + Ok(vec![None; array.len()]) + } + + fn boolean(&self, array: &BooleanArray, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Boolean => Ok(array.iter().map(|v| v.map(Literal::bool)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow boolean array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn int16(&self, array: &Int16Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int16 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn int32(&self, array: &Int32Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Int => Ok(array.iter().map(|v| v.map(Literal::int)).collect()), + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int32 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn int64(&self, array: &Int64Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Long => Ok(array.iter().map(|v| v.map(Literal::long)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow int64 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn float(&self, array: &Float32Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Float => Ok(array.iter().map(|v| v.map(Literal::float)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow float16 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn double(&self, array: &Float64Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Double => Ok(array.iter().map(|v| v.map(Literal::double)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow float64 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn decimal( + &self, + array: &Decimal128Array, + iceberg_type: &PrimitiveType, + ) -> Result> { + let DataType::Decimal128(arrow_precision, arrow_scale) = array.data_type() else { + unreachable!() + }; + match iceberg_type { + PrimitiveType::Decimal { precision, scale } => { + if *arrow_precision as u32 != *precision || *arrow_scale as u32 != *scale { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The precision or scale ({},{}) of arrow decimal128 array is not compatitable with iceberg decimal type ({},{})", + arrow_precision, arrow_scale, precision, scale + ), + )); + } + Ok(array.iter().map(|v| v.map(Literal::decimal)).collect()) + } + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow decimal128 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn date(&self, array: &Date32Array, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Date => Ok(array.iter().map(|v| v.map(Literal::date)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow date32 array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn time( + &self, + array: &Time64MicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::Time => Ok(array + .iter() + .map(|v| v.map(Literal::time)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow time64 microsecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn timestamp( + &self, + array: &TimestampMicrosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::Timestamp => Ok(array + .iter() + .map(|v| v.map(Literal::timestamp)) + .collect()), + PrimitiveType::Timestamptz => Ok(array + .iter() + .map(|v| v.map(Literal::timestamptz)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow timestamp microsecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn timestamp_nano( + &self, + array: &TimestampNanosecondArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::TimestampNs => Ok(array + .iter() + .map(|v| v.map(Literal::timestamp_nano)) + .collect()), + PrimitiveType::TimestamptzNs => Ok(array + .iter() + .map(|v| v.map(Literal::timestamptz_nano)) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow timestamp nanosecond array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn string(&self, array: &StringArray, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow string array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn large_string( + &self, + array: &LargeStringArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::String => Ok(array.iter().map(|v| v.map(Literal::string)).collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow large string array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn binary(&self, array: &BinaryArray, iceberg_type: &PrimitiveType) -> Result> { + match iceberg_type { + PrimitiveType::Binary => Ok(array + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow binary array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn large_binary( + &self, + array: &LargeBinaryArray, + iceberg_type: &PrimitiveType, + ) -> Result> { + match iceberg_type { + PrimitiveType::Binary => Ok(array + .iter() + .map(|v| v.map(|v| Literal::binary(v.to_vec()))) + .collect()), + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The type of arrow binary array is not compatitable with iceberg type {}", + iceberg_type + ), + )), + } + } + + fn r#struct( + &self, + array: &StructArray, + _iceberg_type: &StructType, + columns: Vec>, + ) -> Result> { + let struct_literal_len = columns.first().map(|column| column.len()).unwrap_or(0); + let mut struct_literals = Vec::with_capacity(struct_literal_len); + let mut columns_iters = columns + .into_iter() + .map(|column| column.into_iter()) + .collect::>(); + + for row_idx in 0..struct_literal_len { + if array.is_null(row_idx) { + struct_literals.push(None); + continue; + } + let mut literals = Vec::with_capacity(columns_iters.len()); + for column_iter in columns_iters.iter_mut() { + literals.push(column_iter.next().unwrap()); + } + struct_literals.push(Some(Literal::Struct(Struct::from_iter(literals)))); + } + + Ok(struct_literals) + } +} + +fn visit_arrow_struct_array( + array: &StructArray, + iceberg_type: &StructType, + visitor: &V, +) -> Result> { + let DataType::Struct(arrow_struct_fields) = array.data_type() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() != iceberg_type.fields().len() + || arrow_struct_fields.len() != iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for ((array, arrow_field), iceberg_field) in array + .columns() + .iter() + .zip_eq(arrow_struct_fields.iter()) + .zip_eq(iceberg_type.fields().iter()) + { + let arrow_type = arrow_field.data_type(); + if arrow_field.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.null(array, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.boolean(array, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int16(array, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int32(array, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int64(array, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.float(array, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.double(array, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.decimal(array, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.date(array, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.time(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp_nano(array, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.string(array, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_string(array, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.binary(array, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_binary(array, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } + } + + visitor.r#struct(array, iceberg_type, columns) +} + +// # TODO +// Add support for fulfill the missing field in arrow struct array +fn visit_arrow_struct_array_from_field_id( + array: &StructArray, + iceberg_type: &StructType, + visitor: &V, +) -> Result> { + let DataType::Struct(arrow_struct_fields) = array.data_type() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not a struct type", + )); + }; + + if array.columns().len() < iceberg_type.fields().len() + || arrow_struct_fields.len() < iceberg_type.fields().len() + { + return Err(Error::new( + ErrorKind::DataInvalid, + "The type of arrow struct array is not compatitable with iceberg struct type", + )); + } + + let mut columns = Vec::with_capacity(array.columns().len()); + + for iceberg_field in iceberg_type.fields() { + let Some((idx, field)) = arrow_struct_fields.iter().enumerate().find(|(_idx, f)| { + f.metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|id| id.parse::().ok().map(|id: i32| id == iceberg_field.id)) + .unwrap_or(false) + }) else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "The field {} in iceberg struct type is not found in arrow struct type", + iceberg_field.name + ), + )); + }; + let array = array.column(idx); + let arrow_type = field.data_type(); + if array.is_nullable() == iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "The nullable field of arrow struct array is not compatitable with iceberg type", + )); + } + match (arrow_type, iceberg_field.field_type.as_ref()) { + (DataType::Null, Type::Primitive(primitive_type)) => { + if iceberg_field.required { + return Err(Error::new( + ErrorKind::DataInvalid, + "column in arrow array should not be optional", + )); + } + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.null(array, primitive_type)?); + } + (DataType::Boolean, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.boolean(array, primitive_type)?); + } + (DataType::Int16, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int16(array, primitive_type)?); + } + (DataType::Int32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int32(array, primitive_type)?); + } + (DataType::Int64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.int64(array, primitive_type)?); + } + (DataType::Float32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.float(array, primitive_type)?); + } + (DataType::Float64, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.double(array, primitive_type)?); + } + (DataType::Decimal128(_, _), Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.decimal(array, primitive_type)?); + } + (DataType::Date32, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.date(array, primitive_type)?); + } + (DataType::Time64(TimeUnit::Microsecond), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.time(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Microsecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp(array, primitive_type)?); + } + (DataType::Timestamp(TimeUnit::Nanosecond, _), Type::Primitive(primitive_type)) => { + let array = array + .as_any() + .downcast_ref::() + .unwrap(); + columns.push(visitor.timestamp_nano(array, primitive_type)?); + } + (DataType::Utf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.string(array, primitive_type)?); + } + (DataType::LargeUtf8, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_string(array, primitive_type)?); + } + (DataType::Binary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.binary(array, primitive_type)?); + } + (DataType::LargeBinary, Type::Primitive(primitive_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visitor.large_binary(array, primitive_type)?); + } + (DataType::Struct(_), Type::Struct(struct_type)) => { + let array = array.as_any().downcast_ref::().unwrap(); + columns.push(visit_arrow_struct_array(array, struct_type, visitor)?); + } + (arrow_type, iceberg_field_type) => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + format!( + "Unsupported convert arrow type {} to iceberg type: {}", + arrow_type, iceberg_field_type + ), + )) + } + } + } + + visitor.r#struct(array, iceberg_type, columns) +} + +/// Convert arrow struct array to iceberg struct value array. +/// This function will assume the schema of arrow struct array is the same as iceberg struct type. +pub fn arrow_struct_to_literal( + struct_array: &StructArray, + ty: &StructType, +) -> Result>> { + visit_arrow_struct_array(struct_array, ty, &ArrowArrayConvert) +} + +/// Convert arrow struct array to iceberg struct value array. +/// This function will use field id to find the corresponding field in arrow struct array. +pub fn arrow_struct_to_literal_from_field_id( + struct_array: &StructArray, + ty: &StructType, +) -> Result>> { + visit_arrow_struct_array_from_field_id(struct_array, ty, &ArrowArrayConvert) +} + +#[cfg(test)] +mod test { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{ + ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, StringArray, StructArray, + Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, + }; + use arrow_schema::{DataType, Field, Fields, TimeUnit}; + + use super::*; + use crate::spec::{Literal, NestedField, PrimitiveType, StructType, Type}; + + #[test] + fn test_arrow_struct_to_iceberg_struct() { + let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]); + let int16_array = Int16Array::from(vec![Some(1), Some(2), None]); + let int32_array = Int32Array::from(vec![Some(3), Some(4), None]); + let int64_array = Int64Array::from(vec![Some(5), Some(6), None]); + let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]); + let float64_array = Float64Array::from(vec![Some(3.3), Some(4.4), None]); + let decimal_array = Decimal128Array::from(vec![Some(1000), Some(2000), None]) + .with_precision_and_scale(10, 2) + .unwrap(); + let date_array = Date32Array::from(vec![Some(18628), Some(18629), None]); + let time_array = Time64MicrosecondArray::from(vec![Some(123456789), Some(987654321), None]); + let timestamp_micro_array = TimestampMicrosecondArray::from(vec![ + Some(1622548800000000), + Some(1622635200000000), + None, + ]); + let timestamp_nano_array = TimestampNanosecondArray::from(vec![ + Some(1622548800000000000), + Some(1622635200000000000), + None, + ]); + let string_array = StringArray::from(vec![Some("a"), Some("b"), None]); + let binary_array = + BinaryArray::from(vec![Some(b"abc".as_ref()), Some(b"def".as_ref()), None]); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("bool_field", DataType::Boolean, true)), + Arc::new(bool_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int16_field", DataType::Int16, true)), + Arc::new(int16_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int32_field", DataType::Int32, true)), + Arc::new(int32_array) as ArrayRef, + ), + ( + Arc::new(Field::new("int64_field", DataType::Int64, true)), + Arc::new(int64_array) as ArrayRef, + ), + ( + Arc::new(Field::new("float32_field", DataType::Float32, true)), + Arc::new(float32_array) as ArrayRef, + ), + ( + Arc::new(Field::new("float64_field", DataType::Float64, true)), + Arc::new(float64_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "decimal_field", + DataType::Decimal128(10, 2), + true, + )), + Arc::new(decimal_array) as ArrayRef, + ), + ( + Arc::new(Field::new("date_field", DataType::Date32, true)), + Arc::new(date_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "time_field", + DataType::Time64(TimeUnit::Microsecond), + true, + )), + Arc::new(time_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "timestamp_micro_field", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + )), + Arc::new(timestamp_micro_array) as ArrayRef, + ), + ( + Arc::new(Field::new( + "timestamp_nano_field", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )), + Arc::new(timestamp_nano_array) as ArrayRef, + ), + ( + Arc::new(Field::new("string_field", DataType::Utf8, true)), + Arc::new(string_array) as ArrayRef, + ), + ( + Arc::new(Field::new("binary_field", DataType::Binary, true)), + Arc::new(binary_array) as ArrayRef, + ), + ]); + + let iceberg_struct_type = StructType::new(vec![ + Arc::new(NestedField::optional( + 0, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 1, + "int16_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "int32_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "int64_field", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "float32_field", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "float64_field", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "decimal_field", + Type::Primitive(PrimitiveType::Decimal { + precision: 10, + scale: 2, + }), + )), + Arc::new(NestedField::optional( + 7, + "date_field", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 8, + "time_field", + Type::Primitive(PrimitiveType::Time), + )), + Arc::new(NestedField::optional( + 9, + "timestamp_micro_field", + Type::Primitive(PrimitiveType::Timestamp), + )), + Arc::new(NestedField::optional( + 10, + "timestamp_nao_field", + Type::Primitive(PrimitiveType::TimestampNs), + )), + Arc::new(NestedField::optional( + 11, + "string_field", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 12, + "binary_field", + Type::Primitive(PrimitiveType::Binary), + )), + ]); + + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); + + assert_eq!(result, vec![ + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::bool(true)), + Some(Literal::int(1)), + Some(Literal::int(3)), + Some(Literal::long(5)), + Some(Literal::float(1.1)), + Some(Literal::double(3.3)), + Some(Literal::decimal(1000)), + Some(Literal::date(18628)), + Some(Literal::time(123456789)), + Some(Literal::timestamp(1622548800000000)), + Some(Literal::timestamp_nano(1622548800000000000)), + Some(Literal::string("a".to_string())), + Some(Literal::binary(b"abc".to_vec())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::bool(false)), + Some(Literal::int(2)), + Some(Literal::int(4)), + Some(Literal::long(6)), + Some(Literal::float(2.2)), + Some(Literal::double(4.4)), + Some(Literal::decimal(2000)), + Some(Literal::date(18629)), + Some(Literal::time(987654321)), + Some(Literal::timestamp(1622635200000000)), + Some(Literal::timestamp_nano(1622635200000000000)), + Some(Literal::string("b".to_string())), + Some(Literal::binary(b"def".to_vec())), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + None, None, None, None, None, None, None, None, None, None, None, None, None, + ]))), + ]); + } + + #[test] + fn test_single_column_nullable_struct() { + let struct_array = StructArray::new_null( + Fields::from(vec![Field::new("bool_field", DataType::Boolean, true)]), + 3, + ); + let iceberg_struct_type = StructType::new(vec![Arc::new(NestedField::optional( + 0, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + ))]); + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); + assert_eq!(result, vec![None; 3]); + } + + #[test] + fn test_empty_struct() { + let struct_array = StructArray::new_null(Fields::empty(), 3); + let iceberg_struct_type = StructType::new(vec![]); + let result = arrow_struct_to_literal(&struct_array, &iceberg_struct_type).unwrap(); + assert_eq!(result, vec![None; 0]); + } + + #[test] + fn test_arrow_struct_to_iceberg_struct_from_field_id() { + let bool_array = BooleanArray::from(vec![Some(true), Some(false), None]); + let int16_array = Int16Array::from(vec![Some(1), Some(2), None]); + let int32_array = Int32Array::from(vec![Some(3), Some(4), None]); + let int64_array = Int64Array::from(vec![Some(5), Some(6), None]); + let float32_array = Float32Array::from(vec![Some(1.1), Some(2.2), None]); + let struct_array = StructArray::from(vec![ + ( + Arc::new( + Field::new("bool_field", DataType::Boolean, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string())], + )), + ), + Arc::new(bool_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int16_field", DataType::Int16, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())], + )), + ), + Arc::new(int16_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int32_field", DataType::Int32, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string())], + )), + ), + Arc::new(int32_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("int64_field", DataType::Int64, true).with_metadata(HashMap::from( + [(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string())], + )), + ), + Arc::new(int64_array) as ArrayRef, + ), + ( + Arc::new( + Field::new("float32_field", DataType::Float32, true).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "5".to_string())]), + ), + ), + Arc::new(float32_array) as ArrayRef, + ), + ]); + let struct_type = StructType::new(vec![ + Arc::new(NestedField::optional( + 1, + "int16_field", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "bool_field", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 3, + "int64_field", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "int32_field", + Type::Primitive(PrimitiveType::Int), + )), + ]); + let result = arrow_struct_to_literal_from_field_id(&struct_array, &struct_type).unwrap(); + assert_eq!(result, vec![ + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(1)), + Some(Literal::bool(true)), + Some(Literal::long(5)), + Some(Literal::int(3)), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + Some(Literal::int(2)), + Some(Literal::bool(false)), + Some(Literal::long(6)), + Some(Literal::int(4)), + ]))), + Some(Literal::Struct(Struct::from_iter(vec![ + None, None, None, None, + ]))), + ]); + } +} diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index cbda6c905..917a0fff3 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -18,7 +18,7 @@ //! Catalog API for Apache Iceberg use std::collections::HashMap; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::mem::take; use std::ops::Deref; @@ -149,6 +149,12 @@ impl NamespaceIdent { } } +impl Display for NamespaceIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.join(".")) + } +} + impl AsRef> for NamespaceIdent { fn as_ref(&self) -> &Vec { &self.0 @@ -232,6 +238,12 @@ impl TableIdent { } } +impl Display for TableIdent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}.{}", self.namespace, self.name) + } +} + /// TableCreation represents the creation of a table in the catalog. #[derive(Debug, TypedBuilder)] pub struct TableCreation { diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 330f210c6..5efbffba9 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -334,6 +334,14 @@ impl OutputFile { Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?) } + /// Delete output file + pub async fn delete(&self) -> crate::Result<()> { + if self.exists().await? { + self.op.delete(&self.path[self.relative_path_pos..]).await? + } + Ok(()) + } + /// Converts into [`InputFile`]. pub fn to_input_file(self) -> InputFile { InputFile { diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index 086c63080..7e7078e5e 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1188,6 +1188,10 @@ impl DataFile { pub fn sort_order_id(&self) -> Option { self.sort_order_id } + + pub(crate) fn rewrite_partition(&mut self, partition: Struct) { + self.partition = partition; + } } /// Type of content stored by the data file: data, equality deletes, or /// position deletes (all v1 files are data files) @@ -1254,6 +1258,8 @@ impl std::fmt::Display for DataFileFormat { } } +pub use _serde::DataFile as SerializedDataFile; + mod _serde { use std::collections::HashMap; @@ -1330,9 +1336,10 @@ mod _serde { } } + /// DataFile can used to be serialize. #[serde_as] #[derive(Serialize, Deserialize)] - pub(super) struct DataFile { + pub struct DataFile { #[serde(default)] content: i32, file_path: String, @@ -1356,6 +1363,7 @@ mod _serde { } impl DataFile { + /// Create a SerializedDataFile from a DataFile pub fn try_from( value: super::DataFile, partition_type: &StructType, @@ -1386,6 +1394,7 @@ mod _serde { }) } + /// Convert a SerializedDataFile to a DataFile pub fn try_into( self, partition_type: &StructType, diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index f92ca263d..d3c776464 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1564,6 +1564,16 @@ impl Literal { Self::Primitive(PrimitiveLiteral::Long(value)) } + /// Creates a timestamp from unix epoch in nanoseconds. + pub fn timestamp_nano(value: i64) -> Self { + Self::Primitive(PrimitiveLiteral::Long(value)) + } + + /// Creates a timestamp with timezone from unix epoch in nanoseconds. + pub fn timestamptz_nano(value: i64) -> Self { + Self::Primitive(PrimitiveLiteral::Long(value)) + } + /// Creates a timestamp from [`DateTime`]. pub fn timestamp_from_datetime(dt: DateTime) -> Self { Self::timestamp(dt.with_timezone(&Utc).timestamp_micros()) @@ -2221,6 +2231,8 @@ mod timestamptz { } mod _serde { + use std::collections::HashMap; + use serde::de::Visitor; use serde::ser::{SerializeMap, SerializeSeq, SerializeStruct}; use serde::{Deserialize, Serialize}; @@ -2816,22 +2828,24 @@ mod _serde { optional: _, }) => match ty { Type::Struct(struct_ty) => { - let iters: Vec> = required - .into_iter() - .map(|(field_name, value)| { - let field = struct_ty - .field_by_name(field_name.as_str()) - .ok_or_else(|| { - invalid_err_with_reason( - "record", - &format!("field {} is not exist", &field_name), - ) - })?; - let value = value.try_into(&field.field_type)?; - Ok(value) + let mut value_map: HashMap = + required.into_iter().collect(); + let values = struct_ty + .fields() + .iter() + .map(|f| { + if let Some(raw_value) = value_map.remove(&f.name) { + let value = raw_value.try_into(&f.field_type)?; + Ok(value) + } else { + Err(invalid_err_with_reason( + "record", + &format!("field {} is not exist", &f.name), + )) + } }) - .collect::>()?; - Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) + .collect::, Error>>()?; + Ok(Some(Literal::Struct(super::Struct::from_iter(values)))) } Type::Map(map_ty) => { if *map_ty.key_field.field_type != Type::Primitive(PrimitiveType::String) { diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index cfd6a8381..5dac1ce5a 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -28,10 +28,10 @@ use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, - Summary, Transform, MAIN_BRANCH, + DataContentType, DataFile, DataFileFormat, FormatVersion, Manifest, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + Struct, StructType, Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; @@ -170,6 +170,17 @@ impl<'a> Transaction<'a> { catalog.update_table(table_commit).await } + + /// Commit transaction with dynamic catalog. + pub async fn commit_dyn(self, catalog: &dyn Catalog) -> Result { + let table_commit = TableCommit::builder() + .ident(self.table.identifier().clone()) + .updates(self.updates) + .requirements(self.requirements) + .build(); + + catalog.update_table(table_commit).await + } } /// FastAppendAction is a transaction action for fast append data files to the table. @@ -284,6 +295,7 @@ struct SnapshotProduceAction<'a> { commit_uuid: Uuid, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -304,6 +316,7 @@ impl<'a> SnapshotProduceAction<'a> { commit_uuid, snapshot_properties, added_data_files: vec![], + added_delete_files: vec![], manifest_counter: (0..), key_metadata, }) @@ -347,19 +360,17 @@ impl<'a> SnapshotProduceAction<'a> { data_files: impl IntoIterator, ) -> Result<&mut Self> { let data_files: Vec = data_files.into_iter().collect(); - for data_file in &data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } + for data_file in data_files { Self::validate_partition_value( data_file.partition(), self.tx.table.metadata().default_partition_type(), )?; + if data_file.content_type() == DataContentType::Data { + self.added_data_files.push(data_file); + } else { + self.added_delete_files.push(data_file); + } } - self.added_data_files.extend(data_files); Ok(self) } @@ -376,8 +387,31 @@ impl<'a> SnapshotProduceAction<'a> { } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn write_added_manifest(&mut self) -> Result { - let added_data_files = std::mem::take(&mut self.added_data_files); + async fn write_added_manifest( + &mut self, + added_data_files: Vec, + ) -> Result { + let content_type = { + let mut data_num = 0; + let mut delete_num = 0; + for f in &added_data_files { + match f.content_type() { + DataContentType::Data => data_num += 1, + DataContentType::PositionDeletes => delete_num += 1, + DataContentType::EqualityDeletes => delete_num += 1, + } + } + if data_num == added_data_files.len() { + ManifestContentType::Data + } else if delete_num == added_data_files.len() { + ManifestContentType::Deletes + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "added DataFile for a ManifestFile should be same type (Data or Delete)", + )); + } + }; let manifest_entries = added_data_files .into_iter() .map(|data_file| { @@ -406,7 +440,7 @@ impl<'a> SnapshotProduceAction<'a> { .as_ref() .clone(), ) - .content(crate::spec::ManifestContentType::Data) + .content(content_type) .build(); let manifest = Manifest::new(manifest_meta, manifest_entries); let writer = ManifestWriter::new( @@ -422,12 +456,19 @@ impl<'a> SnapshotProduceAction<'a> { snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - let added_manifest = self.write_added_manifest().await?; + let mut manifest_files = vec![]; + let data_files = std::mem::take(&mut self.added_data_files); + let delete_files = std::mem::take(&mut self.added_delete_files); + if data_files.len() > 0 { + let added_manifest = self.write_added_manifest(data_files).await?; + manifest_files.push(added_manifest); + } + if delete_files.len() > 0 { + let added_delete_manifest = self.write_added_manifest(delete_files).await?; + manifest_files.push(added_delete_manifest); + } let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - // # TODO - // Support process delete entries. - let mut manifest_files = vec![added_manifest]; manifest_files.extend(existing_manifests); let manifest_files = manifest_process.process_manifeset(manifest_files); Ok(manifest_files) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 940aa1584..69fe114a6 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -20,7 +20,7 @@ use arrow_array::RecordBatch; use itertools::Itertools; -use crate::spec::{DataContentType, DataFile, Struct}; +use crate::spec::{DataContentType, DataFile, SchemaRef, Struct}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; use crate::Result; @@ -94,6 +94,10 @@ impl CurrentFileStatus for DataFileWriter { fn current_written_size(&self) -> usize { self.inner_writer.as_ref().unwrap().current_written_size() } + + fn current_schema(&self) -> SchemaRef { + self.inner_writer.as_ref().unwrap().current_schema() + } } #[cfg(test)] diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 069928fa8..b94b52edf 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -64,16 +64,14 @@ impl EqualityDeleteWriterConfig { ) -> Result { let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?); let projector = RecordBatchProjector::new( - original_arrow_schema, + &original_arrow_schema, &equality_ids, // The following rule comes from https://iceberg.apache.org/spec/#identifier-field-ids // - The identifier field ids must be used for primitive types. // - The identifier field ids must not be used for floating point types or nullable fields. - // - The identifier field ids can be nested field of struct but not nested field of nullable struct. |field| { // Only primitive type is allowed to be used for identifier field ids - if field.is_nullable() - || field.data_type().is_nested() + if field.data_type().is_nested() || matches!( field.data_type(), DataType::Float16 | DataType::Float32 | DataType::Float64 diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6..ff1b8a463 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,4 @@ pub mod data_file_writer; pub mod equality_delete_writer; +pub mod sort_position_delete_writer; diff --git a/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs new file mode 100644 index 000000000..ecdb617bd --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/sort_position_delete_writer.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort position delete file writer. +use std::collections::BTreeMap; +use std::sync::Arc; + +use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use once_cell::sync::Lazy; + +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{DataFile, NestedField, PrimitiveType, Schema, SchemaRef, Struct, Type}; +use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::Result; + +/// Builder for `MemoryPositionDeleteWriter`. +#[derive(Clone)] +pub struct SortPositionDeleteWriterBuilder { + inner: B, + cache_num: usize, + partition_value: Option, +} + +impl SortPositionDeleteWriterBuilder { + /// Create a new `SortPositionDeleteWriterBuilder` using a `FileWriterBuilder`. + pub fn new(inner: B, cache_num: usize, partition_value: Option) -> Self { + Self { + inner, + cache_num, + partition_value, + } + } +} + +/// Schema for position delete file. +pub static POSITION_DELETE_SCHEMA: Lazy = Lazy::new(|| { + Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 2147483546, + "file_path", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::required( + 2147483545, + "pos", + Type::Primitive(PrimitiveType::Long), + )), + ]) + .build() + .unwrap(), + ) +}); + +/// Arrow schema for position delete file. +pub static POSITION_DELETE_ARROW_SCHEMA: Lazy = + Lazy::new(|| Arc::new(schema_to_arrow_schema(&POSITION_DELETE_SCHEMA).unwrap())); + +#[async_trait::async_trait] +impl IcebergWriterBuilder> + for SortPositionDeleteWriterBuilder +{ + type R = SortPositionDeleteWriter; + + async fn build(self) -> Result { + Ok(SortPositionDeleteWriter { + inner_writer_builder: self.inner.clone(), + cache_num: self.cache_num, + cache: BTreeMap::new(), + data_files: Vec::new(), + partition_value: self.partition_value.unwrap_or(Struct::empty()), + }) + } +} + +/// Position delete input. +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] +pub struct PositionDeleteInput { + /// The path of the file. + pub path: String, + /// The offset of the position delete. + pub offset: i64, +} + +/// The memory position delete writer. +pub struct SortPositionDeleteWriter { + inner_writer_builder: B, + cache_num: usize, + cache: BTreeMap>, + data_files: Vec, + partition_value: Struct, +} + +impl SortPositionDeleteWriter { + /// Get the current number of cache rows. + pub fn current_cache_number(&self) -> usize { + self.cache.len() + } +} + +impl SortPositionDeleteWriter { + async fn write_cache_out(&mut self) -> Result<()> { + let mut keys = Vec::new(); + let mut values = Vec::new(); + let mut cache = std::mem::take(&mut self.cache); + for (key, offsets) in cache.iter_mut() { + offsets.sort(); + let key_ref = key.as_str(); + for offset in offsets { + keys.push(key_ref); + values.push(*offset); + } + } + let key_array = Arc::new(StringArray::from(keys)) as ArrayRef; + let value_array = Arc::new(Int64Array::from(values)) as ArrayRef; + let record_batch = RecordBatch::try_new(POSITION_DELETE_ARROW_SCHEMA.clone(), vec![ + key_array, + value_array, + ])?; + let mut writer = self.inner_writer_builder.clone().build().await?; + writer.write(&record_batch).await?; + self.data_files + .extend(writer.close().await?.into_iter().map(|mut res| { + res.content(crate::spec::DataContentType::PositionDeletes); + res.partition(self.partition_value.clone()); + res.build().expect("Guaranteed to be valid") + })); + Ok(()) + } +} + +/// Implement `IcebergWriter` for `PositionDeleteWriter`. +#[async_trait::async_trait] +impl IcebergWriter for SortPositionDeleteWriter { + async fn write(&mut self, input: PositionDeleteInput) -> Result<()> { + if let Some(v) = self.cache.get_mut(&input.path) { + v.push(input.offset); + } else { + self.cache + .insert(input.path.to_string(), vec![input.offset]); + } + + if self.cache.len() >= self.cache_num { + self.write_cache_out().await?; + } + Ok(()) + } + + async fn close(&mut self) -> Result> { + self.write_cache_out().await?; + Ok(std::mem::take(&mut self.data_files)) + } +} + +#[cfg(test)] +mod test { + use arrow_array::{Int64Array, StringArray}; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::POSITION_DELETE_SCHEMA; + use crate::io::FileIOBuilder; + use crate::spec::{DataContentType, DataFileFormat, Struct}; + use crate::writer::base_writer::sort_position_delete_writer::{ + PositionDeleteInput, SortPositionDeleteWriterBuilder, + }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_position_delete_writer() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + POSITION_DELETE_SCHEMA.clone(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut position_delete_writer = SortPositionDeleteWriterBuilder::new(pw, 10, None) + .build() + .await?; + + // Write some position delete inputs + let mut inputs = [ + PositionDeleteInput { + path: "file2.parquet".to_string(), + offset: 2, + }, + PositionDeleteInput { + path: "file2.parquet".to_string(), + offset: 1, + }, + PositionDeleteInput { + path: "file2.parquet".to_string(), + offset: 3, + }, + PositionDeleteInput { + path: "file3.parquet".to_string(), + offset: 2, + }, + PositionDeleteInput { + path: "file1.parquet".to_string(), + offset: 5, + }, + PositionDeleteInput { + path: "file1.parquet".to_string(), + offset: 4, + }, + PositionDeleteInput { + path: "file1.parquet".to_string(), + offset: 1, + }, + ]; + for input in inputs.iter() { + position_delete_writer.write(input.clone()).await?; + } + + let data_files = position_delete_writer.close().await.unwrap(); + assert_eq!(data_files.len(), 1); + assert_eq!(data_files[0].file_format, DataFileFormat::Parquet); + assert_eq!(data_files[0].content, DataContentType::PositionDeletes); + assert_eq!(data_files[0].partition, Struct::empty()); + + let parquet_file = file_io + .new_input(&data_files[0].file_path)? + .read() + .await + .unwrap(); + let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_file).unwrap(); + let reader = builder.build().unwrap(); + let batches = reader.map(|x| x.unwrap()).collect::>(); + + let path_column = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let offset_column = batches[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + inputs.sort_by(|a, b| a.path.cmp(&b.path).then_with(|| a.offset.cmp(&b.offset))); + for (i, input) in inputs.iter().enumerate() { + assert_eq!(path_column.value(i), input.path); + assert_eq!(offset_column.value(i), input.offset); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 596228f7c..f7db54d1d 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -411,6 +411,11 @@ impl FileWriter for ParquetWriter { Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err) })?; + if self.current_row_num == 0 { + self.out_file.delete().await?; + return Ok(vec![]); + } + let written_size = self.written_size.load(std::sync::atomic::Ordering::Relaxed); Ok(vec![Self::to_data_file_builder( @@ -434,6 +439,10 @@ impl CurrentFileStatus for ParquetWriter { fn current_written_size(&self) -> usize { self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize } + + fn current_schema(&self) -> SchemaRef { + self.schema.clone() + } } /// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite. diff --git a/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs new file mode 100644 index 000000000..c988e3da6 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/equality_delta_writer.rs @@ -0,0 +1,462 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the equality delta writer. + +use std::collections::HashMap; + +use arrow_array::builder::BooleanBuilder; +use arrow_array::{Int32Array, RecordBatch}; +use arrow_ord::partition::partition; +use arrow_row::{OwnedRow, RowConverter, Rows, SortField}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::record_batch_projector::RecordBatchProjector; +use crate::arrow::schema_to_arrow_schema; +use crate::spec::DataFile; +use crate::writer::base_writer::sort_position_delete_writer::PositionDeleteInput; +use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Insert operation. +pub const INSERT_OP: i32 = 1; +/// Delete operation. +pub const DELETE_OP: i32 = 2; + +/// Builder for `EqualityDeltaWriter`. +#[derive(Clone)] +pub struct EqualityDeltaWriterBuilder { + data_writer_builder: DB, + position_delete_writer_builder: PDB, + equality_delete_writer_builder: EDB, + unique_column_ids: Vec, +} + +impl EqualityDeltaWriterBuilder { + /// Create a new `EqualityDeltaWriterBuilder`. + pub fn new( + data_writer_builder: DB, + position_delete_writer_builder: PDB, + equality_delete_writer_builder: EDB, + unique_column_ids: Vec, + ) -> Self { + Self { + data_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + unique_column_ids, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for EqualityDeltaWriterBuilder +where + DB: IcebergWriterBuilder, + PDB: IcebergWriterBuilder, + EDB: IcebergWriterBuilder, + DB::R: CurrentFileStatus, +{ + type R = EqualityDeltaWriter; + + async fn build(self) -> Result { + Self::R::try_new( + self.data_writer_builder.build().await?, + self.position_delete_writer_builder.build().await?, + self.equality_delete_writer_builder.build().await?, + self.unique_column_ids, + ) + } +} + +/// Equality delta writer. +pub struct EqualityDeltaWriter { + data_writer: D, + position_delete_writer: PD, + equality_delete_writer: ED, + projector: RecordBatchProjector, + inserted_row: HashMap, + row_converter: RowConverter, +} + +impl EqualityDeltaWriter +where + D: IcebergWriter + CurrentFileStatus, + PD: IcebergWriter, + ED: IcebergWriter, +{ + pub(crate) fn try_new( + data_writer: D, + position_delete_writer: PD, + equality_delete_writer: ED, + unique_column_ids: Vec, + ) -> Result { + let projector = RecordBatchProjector::new( + &schema_to_arrow_schema(&data_writer.current_schema())?, + &unique_column_ids, + |field| { + if field.data_type().is_nested() { + return Ok(None); + } + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|s| { + s.parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string())) + }) + .transpose() + }, + |_| true, + )?; + let row_converter = RowConverter::new( + projector + .projected_schema_ref() + .fields() + .iter() + .map(|field| SortField::new(field.data_type().clone())) + .collect(), + )?; + Ok(Self { + data_writer, + position_delete_writer, + equality_delete_writer, + projector, + inserted_row: HashMap::new(), + row_converter, + }) + } + /// Write the batch. + /// 1. If a row with the same unique column is not written, then insert it. + /// 2. If a row with the same unique column is written, then delete the previous row and insert the new row. + async fn insert(&mut self, batch: RecordBatch) -> Result<()> { + let rows = self.extract_unique_column(&batch)?; + let current_file_path = self.data_writer.current_file_path(); + let current_file_offset = self.data_writer.current_row_num(); + for (idx, row) in rows.iter().enumerate() { + let previous_input = self.inserted_row.insert(row.owned(), PositionDeleteInput { + path: current_file_path.clone(), + offset: (current_file_offset + idx) as i64, + }); + if let Some(previous_input) = previous_input { + self.position_delete_writer.write(previous_input).await?; + } + } + + self.data_writer.write(batch).await?; + + Ok(()) + } + + async fn delete(&mut self, batch: RecordBatch) -> Result<()> { + let rows = self.extract_unique_column(&batch)?; + let mut delete_row = BooleanBuilder::new(); + for row in rows.iter() { + if let Some(previous_input) = self.inserted_row.remove(&row.owned()) { + self.position_delete_writer.write(previous_input).await?; + delete_row.append_value(false); + } else { + delete_row.append_value(true); + } + } + let delete_batch = filter_record_batch(&batch, &delete_row.finish()).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to filter record batch, error: {}", err), + ) + })?; + self.equality_delete_writer.write(delete_batch).await?; + Ok(()) + } + + fn extract_unique_column(&mut self, batch: &RecordBatch) -> Result { + self.row_converter + .convert_columns(&self.projector.project_column(batch.columns())?) + .map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to convert columns, error: {}", err), + ) + }) + } +} + +#[async_trait::async_trait] +impl IcebergWriter for EqualityDeltaWriter +where + D: IcebergWriter + CurrentFileStatus, + PD: IcebergWriter, + ED: IcebergWriter, +{ + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + // check the last column is int32 array. + let ops = batch + .column(batch.num_columns() - 1) + .as_any() + .downcast_ref::() + .ok_or(Error::new(ErrorKind::DataInvalid, ""))?; + + // partition the ops. + let partitions = + partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to partition ops, error: {}", err), + ) + })?; + for range in partitions.ranges() { + let batch = batch + .project(&(0..batch.num_columns() - 1).collect_vec()) + .unwrap() + .slice(range.start, range.end - range.start); + match ops.value(range.start) { + // Insert + INSERT_OP => self.insert(batch).await?, + // Delete + DELETE_OP => self.delete(batch).await?, + op => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid ops: {op}"), + )) + } + } + } + Ok(()) + } + + async fn close(&mut self) -> Result> { + let data_files = self.data_writer.close().await?; + let position_delete_files = self.position_delete_writer.close().await?; + let equality_delete_files = self.equality_delete_writer.close().await?; + Ok(data_files + .into_iter() + .chain(position_delete_files) + .chain(equality_delete_files) + .collect()) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::arrow::arrow_schema_to_schema; + use crate::io::FileIOBuilder; + use crate::spec::{DataContentType, DataFileFormat, NestedField, PrimitiveType, Schema, Type}; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, + }; + use crate::writer::base_writer::sort_position_delete_writer::{ + SortPositionDeleteWriterBuilder, POSITION_DELETE_ARROW_SCHEMA, POSITION_DELETE_SCHEMA, + }; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::equality_delta_writer::{ + EqualityDeltaWriterBuilder, DELETE_OP, INSERT_OP, + }; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_equality_delta_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let data_file_writer_builder = { + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + location_gen.clone(), + file_name_gen.clone(), + ); + DataFileWriterBuilder::new(pw.clone(), None) + }; + let position_delete_writer_builder = { + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + POSITION_DELETE_SCHEMA.clone(), + file_io.clone(), + location_gen.clone(), + file_name_gen.clone(), + ); + SortPositionDeleteWriterBuilder::new(pw.clone(), 100, None) + }; + let equality_delete_writer_builder = { + let config = EqualityDeleteWriterConfig::new(vec![1, 2], schema, None)?; + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + arrow_schema_to_schema(config.projected_arrow_schema_ref()) + .unwrap() + .into(), + file_io.clone(), + location_gen, + file_name_gen, + ); + EqualityDeleteFileWriterBuilder::new(pw, config) + }; + let mut equality_delta_writer = EqualityDeltaWriterBuilder::new( + data_file_writer_builder, + position_delete_writer_builder, + equality_delete_writer_builder, + vec![1, 2], + ) + .build() + .await + .unwrap(); + + // write data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + Field::new("op", DataType::Int32, false), + ])); + { + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + Arc::new(Int32Array::from(vec![INSERT_OP; 7])), + ]) + .expect("Failed to create RecordBatch"); + equality_delta_writer.write(batch).await?; + } + { + let id_array = Int64Array::from(vec![1, 2, 3, 4]); + let data_array = StringArray::from(vec!["a", "b", "k", "l"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + Arc::new(Int32Array::from(vec![ + DELETE_OP, DELETE_OP, DELETE_OP, INSERT_OP, + ])), + ]) + .expect("Failed to create RecordBatch"); + equality_delta_writer.write(batch).await?; + } + + let data_files = equality_delta_writer.close().await?; + assert_eq!(data_files.len(), 3); + // data file + let data_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let data_file = data_files + .iter() + .find(|file| file.content == DataContentType::Data) + .unwrap(); + let data_file_path = data_file.file_path().to_string(); + let input_file = file_io.new_input(&data_file_path).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&data_schema, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(data_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1, 4])), + Arc::new(StringArray::from(vec![ + "a", "b", "c", "d", "e", "f", "g", "l", + ])), + ]) + .unwrap(); + assert_eq!(expected_batches, res); + + // position delete file + let position_delete_file = data_files + .iter() + .find(|file| file.content == DataContentType::PositionDeletes) + .unwrap(); + let input_file = file_io + .new_input(position_delete_file.file_path.clone()) + .unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&POSITION_DELETE_ARROW_SCHEMA, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(POSITION_DELETE_ARROW_SCHEMA.clone(), vec![ + Arc::new(StringArray::from(vec![ + data_file_path.clone(), + data_file_path.clone(), + ])), + Arc::new(Int64Array::from(vec![0, 1])), + ]) + .unwrap(); + assert_eq!(expected_batches, res); + + // equality delete file + let equality_delete_file = data_files + .iter() + .find(|file| file.content == DataContentType::EqualityDeletes) + .unwrap(); + let input_file = file_io + .new_input(equality_delete_file.file_path.clone()) + .unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&data_schema, &batches).unwrap(); + let expected_batches = RecordBatch::try_new(data_schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3])), + Arc::new(StringArray::from(vec!["k"])), + ]) + .unwrap(); + assert_eq!(expected_batches, res); + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs new file mode 100644 index 000000000..8309b859f --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs @@ -0,0 +1,286 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the fanout partition writer. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_row::OwnedRow; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use itertools::Itertools; + +use crate::arrow::{schema_to_arrow_schema, RecordBatchPartitionSpliter}; +use crate::spec::{DataFile, PartitionSpecRef, SchemaRef}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::Result; + +/// The builder for `FanoutPartitionWriter`. +#[derive(Clone)] +pub struct FanoutPartitionWriterBuilder { + inner_builder: B, + partition_specs: PartitionSpecRef, + table_schema: SchemaRef, + arrow_schema: ArrowSchemaRef, +} + +impl FanoutPartitionWriterBuilder { + /// Create a new `FanoutPartitionWriterBuilder` with the default arrow schema. + pub fn new( + inner_builder: B, + partition_specs: PartitionSpecRef, + table_schema: SchemaRef, + ) -> Result { + Ok(Self::new_with_custom_schema( + inner_builder, + Arc::new(schema_to_arrow_schema(&table_schema)?), + partition_specs, + table_schema, + )) + } + + /// Create a new `FanoutPartitionWriterBuilder` with a custom arrow schema. + /// This function is useful for the user who has the input with extral columns. + pub fn new_with_custom_schema( + inner_builder: B, + arrow_schema: ArrowSchemaRef, + partition_specs: PartitionSpecRef, + table_schema: SchemaRef, + ) -> Self { + Self { + inner_builder, + partition_specs, + table_schema, + arrow_schema, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for FanoutPartitionWriterBuilder { + type R = FanoutPartitionWriter; + + async fn build(self) -> Result { + let partition_splitter = RecordBatchPartitionSpliter::new( + &self.arrow_schema, + self.table_schema.clone(), + self.partition_specs, + )?; + Ok(FanoutPartitionWriter { + inner_writer_builder: self.inner_builder, + partition_splitter, + partition_writers: HashMap::new(), + }) + } +} + +/// The fanout partition writer. +/// It will split the input record batch by the partition specs, and write the splitted record batches to the inner writers. +pub struct FanoutPartitionWriter { + inner_writer_builder: B, + partition_splitter: RecordBatchPartitionSpliter, + partition_writers: HashMap, +} + +impl FanoutPartitionWriter { + /// Get the current number of partition writers. + pub fn partition_num(&self) -> usize { + self.partition_writers.len() + } +} + +#[async_trait::async_trait] +impl IcebergWriter for FanoutPartitionWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + let splits = self.partition_splitter.split(&input)?; + + for (partition, record_batch) in splits { + match self.partition_writers.entry(partition) { + Entry::Occupied(entry) => { + entry.into_mut().write(record_batch).await?; + } + Entry::Vacant(entry) => { + let writer = entry.insert(self.inner_writer_builder.clone().build().await?); + writer.write(record_batch).await?; + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let (partition_rows, writers): (Vec<_>, Vec<_>) = self.partition_writers.drain().unzip(); + let partition_values = self.partition_splitter.convert_row(partition_rows)?; + + let mut result = Vec::new(); + for (partition_value, mut writer) in partition_values.into_iter().zip_eq(writers) { + let mut data_files = writer.close().await?; + for data_file in data_files.iter_mut() { + data_file.rewrite_partition(partition_value.clone()); + } + result.append(&mut data_files); + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{ + DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionField, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::fanout_partition_writer::FanoutPartitionWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_fanout_partition_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(pw, None); + let mut fanout_partition_writer = FanoutPartitionWriterBuilder::new( + data_file_writer_builder, + Arc::new(partition_spec), + schema, + ) + .unwrap() + .build() + .await + .unwrap(); + + // prepare data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + + fanout_partition_writer.write(batch).await?; + let data_files = fanout_partition_writer.close().await?; + assert_eq!(data_files.len(), 3); + let expected_partitions = vec![ + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(1)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(2)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(3)))]), + ]; + let expected_batches = vec![ + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 1, 1])), + Arc::new(StringArray::from(vec!["a", "c", "g"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![2, 2])), + Arc::new(StringArray::from(vec!["b", "e"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3, 3])), + Arc::new(StringArray::from(vec!["d", "f"])), + ]) + .unwrap(), + ]; + for (partition, batch) in expected_partitions + .into_iter() + .zip_eq(expected_batches.into_iter()) + { + assert!(data_files.iter().any(|file| file.partition == partition)); + let data_file = data_files + .iter() + .find(|file| file.partition == partition) + .unwrap(); + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(batch, res); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs new file mode 100644 index 000000000..58e2f7671 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the functional writer. + +pub mod equality_delta_writer; +pub mod fanout_partition_writer; +pub mod precompute_partition_writer; diff --git a/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs new file mode 100644 index 000000000..7426d50d2 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/precompute_partition_writer.rs @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the precompute partition writer. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; + +use arrow_array::{RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::DataType; +use itertools::Itertools; + +use crate::arrow::{convert_row_to_struct, split_with_partition, type_to_arrow_type}; +use crate::spec::{DataFile, PartitionSpecRef, SchemaRef, Type}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// The builder for precompute partition writer. +#[derive(Clone)] +pub struct PrecomputePartitionWriterBuilder { + inner_writer_builder: B, + partition_spec: PartitionSpecRef, + schema: SchemaRef, +} + +impl PrecomputePartitionWriterBuilder { + /// Create a new precompute partition writer builder. + pub fn new( + inner_writer_builder: B, + partition_spec: PartitionSpecRef, + schema: SchemaRef, + ) -> Self { + Self { + inner_writer_builder, + partition_spec, + schema, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder<(StructArray, RecordBatch)> + for PrecomputePartitionWriterBuilder +{ + type R = PrecomputePartitionWriter; + + async fn build(self) -> Result { + let arrow_type = type_to_arrow_type(&Type::Struct( + self.partition_spec.partition_type(&self.schema)?, + ))?; + let DataType::Struct(fields) = &arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partition type is not a struct", + )); + }; + let partition_row_converter = RowConverter::new( + fields + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; + Ok(PrecomputePartitionWriter { + inner_writer_builder: self.inner_writer_builder, + partition_row_converter, + partition_spec: self.partition_spec, + partition_writers: HashMap::new(), + schema: self.schema, + }) + } +} + +/// The precompute partition writer. +pub struct PrecomputePartitionWriter { + inner_writer_builder: B, + partition_writers: HashMap, + partition_row_converter: RowConverter, + partition_spec: PartitionSpecRef, + schema: SchemaRef, +} + +#[async_trait::async_trait] +impl IcebergWriter<(StructArray, RecordBatch)> + for PrecomputePartitionWriter +{ + async fn write(&mut self, input: (StructArray, RecordBatch)) -> Result<()> { + let splits = + split_with_partition(&self.partition_row_converter, input.0.columns(), &input.1)?; + + for (partition, record_batch) in splits { + match self.partition_writers.entry(partition) { + Entry::Occupied(entry) => { + entry.into_mut().write(record_batch).await?; + } + Entry::Vacant(entry) => { + let writer = entry.insert(self.inner_writer_builder.clone().build().await?); + writer.write(record_batch).await?; + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let (partition_rows, writers): (Vec<_>, Vec<_>) = self.partition_writers.drain().unzip(); + let partition_values = convert_row_to_struct( + &self.partition_row_converter, + &self.partition_spec.partition_type(&self.schema)?, + partition_rows, + )?; + + let mut result = Vec::new(); + for (partition_value, mut writer) in partition_values.into_iter().zip_eq(writers) { + let mut data_files = writer.close().await?; + for data_file in data_files.iter_mut() { + data_file.rewrite_partition(partition_value.clone()); + } + result.append(&mut data_files); + } + + Ok(result) + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray, StructArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + use arrow_select::concat::concat_batches; + use itertools::Itertools; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use crate::io::FileIOBuilder; + use crate::spec::{ + DataFileFormat, Literal, NestedField, PartitionSpec, PrimitiveLiteral, PrimitiveType, + Schema, Struct, Transform, Type, UnboundPartitionField, + }; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::file_writer::location_generator::test::MockLocationGenerator; + use crate::writer::file_writer::location_generator::DefaultFileNameGenerator; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::function_writer::precompute_partition_writer::PrecomputePartitionWriterBuilder; + use crate::writer::{IcebergWriter, IcebergWriterBuilder}; + use crate::Result; + + #[tokio::test] + async fn test_precompute_partition_writer() -> Result<()> { + // prepare writer + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "id".to_string(), + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::required( + 2, + "name".to_string(), + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new("memory").build().unwrap(); + let location_gen = + MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string()); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + let pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + schema.clone(), + file_io.clone(), + location_gen, + file_name_gen, + ); + let data_file_writer_builder = DataFileWriterBuilder::new(pw, None); + let mut precompute_partition_writer = PrecomputePartitionWriterBuilder::new( + data_file_writer_builder, + Arc::new(partition_spec), + schema, + ) + .build() + .await + .unwrap(); + + // prepare data + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("data", DataType::Utf8, true), + ])); + let id_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + let id_bucket_array = Int64Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let partition_batch = StructArray::from(vec![( + Arc::new(Field::new("id_bucket", DataType::Int64, true)), + Arc::new(id_bucket_array) as ArrayRef, + )]); + + precompute_partition_writer + .write((partition_batch, batch)) + .await?; + let data_files = precompute_partition_writer.close().await?; + assert_eq!(data_files.len(), 3); + let expected_partitions = vec![ + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(1)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(2)))]), + Struct::from_iter(vec![Some(Literal::Primitive(PrimitiveLiteral::Long(3)))]), + ]; + let expected_batches = vec![ + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![1, 1, 1])), + Arc::new(StringArray::from(vec!["a", "c", "g"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![2, 2])), + Arc::new(StringArray::from(vec!["b", "e"])), + ]) + .unwrap(), + RecordBatch::try_new(schema.clone(), vec![ + Arc::new(Int64Array::from(vec![3, 3])), + Arc::new(StringArray::from(vec!["d", "f"])), + ]) + .unwrap(), + ]; + for (partition, batch) in expected_partitions + .into_iter() + .zip_eq(expected_batches.into_iter()) + { + assert!(data_files.iter().any(|file| file.partition == partition)); + let data_file = data_files + .iter() + .find(|file| file.partition == partition) + .unwrap(); + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + + // check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let res = concat_batches(&batch.schema(), &batches).unwrap(); + assert_eq!(batch, res); + } + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 64357a0fe..745c521eb 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -47,10 +47,11 @@ pub mod base_writer; pub mod file_writer; +pub mod function_writer; use arrow_array::RecordBatch; -use crate::spec::DataFile; +use crate::spec::{DataFile, SchemaRef}; use crate::Result; type DefaultInput = RecordBatch; @@ -88,6 +89,8 @@ pub trait CurrentFileStatus { fn current_row_num(&self) -> usize; /// Get the current file written size. fn current_written_size(&self) -> usize; + /// Get the schema of the current file. + fn current_schema(&self) -> SchemaRef; } #[cfg(test)] diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index a047d7580..7134aca45 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -28,6 +28,7 @@ rust-version = { workspace = true } arrow-array = { workspace = true } arrow-schema = { workspace = true } futures = { workspace = true } +futures-util = "0.3.31" iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] }