From 98cd34dc03cd87b330c7bff8fe9f3241746062ac Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Fri, 29 Nov 2024 16:23:53 +0800 Subject: [PATCH] support fanout partition writer --- Cargo.toml | 1 + crates/iceberg/Cargo.toml | 1 + crates/iceberg/src/arrow/mod.rs | 2 + .../arrow/record_batch_partition_spliter.rs | 432 ++++++++++++++++++ .../src/arrow/record_batch_projector.rs | 14 +- crates/iceberg/src/arrow/value.rs | 18 +- crates/iceberg/src/spec/manifest.rs | 4 + .../fanout_partition_writer.rs | 126 +++++ .../iceberg/src/writer/function_writer/mod.rs | 22 + 9 files changed, 602 insertions(+), 18 deletions(-) create mode 100644 crates/iceberg/src/arrow/record_batch_partition_spliter.rs create mode 100644 crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs create mode 100644 crates/iceberg/src/writer/function_writer/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 95057c59a..a1d661db6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ arrow-ord = { version = "53" } arrow-schema = { version = "53" } arrow-select = { version = "53" } arrow-string = { version = "53" } +arrow-row = { version = "53" } async-stream = "0.3.5" async-trait = "0.1" async-std = "1.12" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 68a8658b0..a712fdb32 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -48,6 +48,7 @@ arrow-arith = { workspace = true } arrow-array = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index 0c885e65f..c4d96d59d 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -25,3 +25,5 @@ pub(crate) mod record_batch_transformer; mod value; pub use reader::*; pub use value::*; +mod record_batch_partition_spliter; +pub(crate) use record_batch_partition_spliter::*; diff --git a/crates/iceberg/src/arrow/record_batch_partition_spliter.rs b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs new file mode 100644 index 000000000..eb5fba7d6 --- /dev/null +++ b/crates/iceberg/src/arrow/record_batch_partition_spliter.rs @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StructArray}; +use arrow_row::{OwnedRow, RowConverter, SortField}; +use arrow_schema::{DataType, Schema}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use super::record_batch_projector::RecordBatchProjector; +use crate::arrow::{arrow_struct_to_iceberg_struct, type_to_arrow_type}; +use crate::spec::{BoundPartitionSpecRef, Literal, Struct, StructType, Type}; +use crate::transform::{create_transform_function, BoxedTransformFunction}; +use crate::{Error, ErrorKind, Result}; + +/// A helper function to split the record batch into multiple record batches using computed partition columns. +pub(crate) fn split_with_partition( + row_converter: &RowConverter, + partition_columns: &[ArrayRef], + batch: &RecordBatch, +) -> Result> { + let rows = row_converter + .convert_columns(partition_columns) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{}", e)))?; + + // Group the batch by row value. + let mut group_ids = HashMap::new(); + rows.into_iter().enumerate().for_each(|(row_id, row)| { + group_ids.entry(row.owned()).or_insert(vec![]).push(row_id); + }); + + // Partition the batch with same partition partition_values + let mut partition_batches = Vec::with_capacity(group_ids.len()); + for (row, row_ids) in group_ids.into_iter() { + // generate the bool filter array from column_ids + let filter_array: BooleanArray = { + let mut filter = vec![false; batch.num_rows()]; + row_ids.into_iter().for_each(|row_id| { + filter[row_id] = true; + }); + filter.into() + }; + + // filter the RecordBatch + partition_batches.push(( + row, + filter_record_batch(batch, &filter_array) + .expect("We should guarantee the filter array is valid"), + )); + } + + Ok(partition_batches) +} + +pub(crate) fn convert_row_to_struct( + row_converter: &RowConverter, + struct_type: &StructType, + rows: Vec, +) -> Result> { + let arrow_struct_array = { + let partition_columns = row_converter + .convert_rows(rows.iter().map(|row| row.row())) + .map_err(|e| Error::new(ErrorKind::DataInvalid, format!("{e}")))?; + let partition_arrow_fields = { + let partition_arrow_type = type_to_arrow_type(&Type::Struct(struct_type.clone()))?; + let DataType::Struct(fields) = partition_arrow_type else { + return Err(Error::new( + ErrorKind::DataInvalid, + "The partition arrow type is not a struct type", + )); + }; + fields + }; + StructArray::try_new(partition_arrow_fields, partition_columns, None)? + }; + let struct_array = { + let struct_array = arrow_struct_to_iceberg_struct(&arrow_struct_array, struct_type)?; + struct_array + .into_iter() + .map(|s| { + if let Some(s) = s { + if let Literal::Struct(s) = s { + Ok(s) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + "The struct is not a struct literal", + )) + } + } else { + Err(Error::new(ErrorKind::DataInvalid, "The struct is null")) + } + }) + .collect::>>()? + }; + + Ok(struct_array) +} + +/// The spliter used to split the record batch into multiple record batches by the partition spec. +pub(crate) struct RecordBatchPartitionSpliter { + partition_spec: BoundPartitionSpecRef, + projector: RecordBatchProjector, + transform_functions: Vec, + row_converter: RowConverter, +} + +impl RecordBatchPartitionSpliter { + pub(crate) fn new( + original_schema: &Schema, + partition_spec: BoundPartitionSpecRef, + ) -> Result { + let projector = RecordBatchProjector::new( + original_schema, + &partition_spec + .fields() + .iter() + .map(|field| field.source_id) + .collect::>(), + // The source columns, selected by ids, must be a primitive type and cannot be contained in a map or list, but may be nested in a struct. + // ref: https://iceberg.apache.org/spec/#partitioning + |field| { + if !field.data_type().is_primitive() { + return Ok(None); + } + field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .map(|s| { + s.parse::() + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string())) + }) + .transpose() + }, + |_| true, + )?; + let transform_functions = partition_spec + .fields() + .iter() + .map(|field| create_transform_function(&field.transform)) + .collect::>>()?; + let row_converter = RowConverter::new( + projector + .projected_schema_ref() + .fields() + .iter() + .map(|field| SortField::new(field.data_type().clone())) + .collect(), + )?; + Ok(Self { + partition_spec, + projector, + transform_functions, + row_converter, + }) + } + + /// Split the record batch into multiple record batches by the partition spec. + pub(crate) fn split(&self, batch: &RecordBatch) -> Result> { + // get array using partition spec + let source_columns = self.projector.project_column(batch.columns())?; + let partition_columns = source_columns + .into_iter() + .zip_eq(self.transform_functions.iter()) + .map(|(source_column, transform_function)| transform_function.transform(source_column)) + .collect::>>()?; + + split_with_partition(&self.row_converter, &partition_columns, batch) + } + + pub(crate) fn convert_row(&self, rows: Vec) -> Result> { + convert_row_to_struct( + &self.row_converter, + self.partition_spec.partition_type(), + rows, + ) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema as ArrowSchema}; + + use super::*; + use crate::arrow::schema_to_arrow_schema; + use crate::spec::{BoundPartitionSpec, NestedField, Schema, Transform, UnboundPartitionField}; + + #[test] + fn test_record_batch_partition_spliter() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + let partition_spliter = RecordBatchPartitionSpliter::new( + &schema_to_arrow_schema(&schema).unwrap(), + Arc::new(partition_spec), + ) + .expect("Failed to create spliter"); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("data", DataType::Utf8, true), + ])); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(id_array), + Arc::new(data_array), + ]) + .expect("Failed to create RecordBatch"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_id_array), + Arc::new(expected_data_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } + + #[test] + fn test_record_batch_partition_spliter_with_extra_columns() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = BoundPartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("extra_column1", DataType::Utf8, true), + Field::new("id", DataType::Int32, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("data", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("extra_column2", DataType::Utf8, true), + ])); + let extra_column1_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); + let data_array = StringArray::from(vec!["a", "b", "c", "d", "e", "f", "g"]); + let extra_column2_array = StringArray::from(vec![ + "extra1", "extra2", "extra1", "extra3", "extra2", "extra3", "extra1", + ]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(extra_column1_array), + Arc::new(id_array), + Arc::new(data_array), + Arc::new(extra_column2_array), + ]) + .expect("Failed to create RecordBatch"); + let partition_spliter = RecordBatchPartitionSpliter::new(&schema, Arc::new(partition_spec)) + .expect("Failed to create spliter"); + + let mut partitioned_batches = partition_spliter + .split(&batch) + .expect("Failed to split RecordBatch"); + assert_eq!(partitioned_batches.len(), 3); + partitioned_batches.sort_by_key(|(row, _)| row.clone()); + { + // check the first partition + let expected_extra_column1_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_id_array = Int32Array::from(vec![1, 1, 1]); + let expected_data_array = StringArray::from(vec!["a", "c", "g"]); + let expected_extra_column2_array = + StringArray::from(vec!["extra1", "extra1", "extra1"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[0].1, expected_batch); + } + { + // check the second partition + let expected_extra_column1_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_id_array = Int32Array::from(vec![2, 2]); + let expected_data_array = StringArray::from(vec!["b", "e"]); + let expected_extra_column2_array = StringArray::from(vec!["extra2", "extra2"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[1].1, expected_batch); + } + { + // check the third partition + let expected_id_array = Int32Array::from(vec![3, 3]); + let expected_data_array = StringArray::from(vec!["d", "f"]); + let expected_extra_column1_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_extra_column2_array = StringArray::from(vec!["extra3", "extra3"]); + let expected_batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(expected_extra_column1_array), + Arc::new(expected_id_array), + Arc::new(expected_data_array), + Arc::new(expected_extra_column2_array), + ]) + .expect("Failed to create expected RecordBatch"); + assert_eq!(partitioned_batches[2].1, expected_batch); + } + + let partition_values = partition_spliter + .convert_row( + partitioned_batches + .iter() + .map(|(row, _)| row.clone()) + .collect(), + ) + .unwrap(); + // check partition value is struct(1), struct(2), struct(3) + assert_eq!(partition_values, vec![ + Struct::from_iter(vec![Some(Literal::int(1))]), + Struct::from_iter(vec![Some(Literal::int(2))]), + Struct::from_iter(vec![Some(Literal::int(3))]), + ]); + } +} diff --git a/crates/iceberg/src/arrow/record_batch_projector.rs b/crates/iceberg/src/arrow/record_batch_projector.rs index f218983aa..4f2267db6 100644 --- a/crates/iceberg/src/arrow/record_batch_projector.rs +++ b/crates/iceberg/src/arrow/record_batch_projector.rs @@ -42,7 +42,7 @@ impl RecordBatchProjector { /// This function will iterate through the nested fields if the field is a struct, `searchable_field_func` can be used to control whether /// iterate into the nested fields. pub(crate) fn new( - original_schema: SchemaRef, + original_schema: &Schema, field_ids: &[i32], field_id_fetch_func: F1, searchable_field_func: F2, @@ -187,8 +187,7 @@ mod test { _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; let projector = - RecordBatchProjector::new(schema.clone(), &[1, 3], field_id_fetch_func, |_| true) - .unwrap(); + RecordBatchProjector::new(&schema, &[1, 3], field_id_fetch_func, |_| true).unwrap(); assert!(projector.field_indices.len() == 2); assert_eq!(projector.field_indices[0], vec![0]); @@ -250,8 +249,7 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[1, 5], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[1, 5], field_id_fetch_func, |_| true); assert!(projector.is_err()); } @@ -280,12 +278,10 @@ mod test { "inner_field2" => Ok(Some(4)), _ => Err(Error::new(ErrorKind::Unexpected, "Field id not found")), }; - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| false); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| false); assert!(projector.is_err()); - let projector = - RecordBatchProjector::new(schema.clone(), &[3], field_id_fetch_func, |_| true); + let projector = RecordBatchProjector::new(&schema, &[3], field_id_fetch_func, |_| true); assert!(projector.is_ok()); } } diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 95712018b..62241ecf7 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -546,19 +546,19 @@ impl ToIcebergLiteralArray for StructArray { let mut columns = Vec::with_capacity(self.columns().len()); - for ((array, arrow_type), iceberg_field) in self + for ((array, arrow_field), iceberg_field) in self .columns() .iter() - .zip_eq(arrow_struct_fields.iter().map(|field| field.data_type())) + .zip_eq(arrow_struct_fields.iter()) .zip_eq(iceberg_type.fields().iter()) { - if array.is_nullable() == iceberg_field.required { + if arrow_field.is_nullable() == iceberg_field.required { return Err(Error::new( ErrorKind::DataInvalid, "The nullable field of arrow struct array is not compatitable with iceberg type", )); } - match (arrow_type, iceberg_field.field_type.as_ref()) { + match (arrow_field.data_type(), iceberg_field.field_type.as_ref()) { (DataType::Null, _) => { if iceberg_field.required { return Err(Error::new( @@ -682,9 +682,9 @@ impl ToIcebergLiteralArray for StructArray { /// Convert arrow struct array to iceberg struct value array. pub fn arrow_struct_to_iceberg_struct( struct_array: &StructArray, - ty: StructType, + ty: &StructType, ) -> Result>> { - struct_array.to_struct_literal_array(struct_array.data_type(), &ty) + struct_array.to_struct_literal_array(struct_array.data_type(), ty) } #[cfg(test)] @@ -870,7 +870,7 @@ mod test { )), ]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_iceberg_struct(&struct_array, &iceberg_struct_type).unwrap(); assert_eq!(result, vec![ Some(Literal::Struct(Struct::from_iter(vec![ @@ -920,7 +920,7 @@ mod test { "bool_field", Type::Primitive(PrimitiveType::Boolean), ))]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_iceberg_struct(&struct_array, &iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 3]); } @@ -928,7 +928,7 @@ mod test { fn test_empty_struct() { let struct_array = StructArray::new_null(Fields::empty(), 3); let iceberg_struct_type = StructType::new(vec![]); - let result = arrow_struct_to_iceberg_struct(&struct_array, iceberg_struct_type).unwrap(); + let result = arrow_struct_to_iceberg_struct(&struct_array, &iceberg_struct_type).unwrap(); assert_eq!(result, vec![None; 0]); } } diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index a868c7b11..cad7221cf 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -1157,6 +1157,10 @@ impl DataFile { pub fn sort_order_id(&self) -> Option { self.sort_order_id } + + pub(crate) fn rewrite_partition(&mut self, partition: Struct) { + self.partition = partition; + } } /// Type of content stored by the data file: data, equality deletes, or /// position deletes (all v1 files are data files) diff --git a/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs new file mode 100644 index 000000000..70fbcdada --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/fanout_partition_writer.rs @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the fanout partition writer. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_row::OwnedRow; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use itertools::Itertools; + +use crate::arrow::{schema_to_arrow_schema, RecordBatchPartitionSpliter}; +use crate::spec::{BoundPartitionSpecRef, DataFile}; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::Result; + +/// The builder for `FanoutPartitionWriter`. +#[derive(Clone)] +pub struct FanoutPartitionWriterBuilder { + inner_builder: B, + partition_specs: BoundPartitionSpecRef, + arrow_schema: ArrowSchemaRef, +} + +impl FanoutPartitionWriterBuilder { + /// Create a new `FanoutPartitionWriterBuilder` with the default arrow schema. + pub fn new(inner_builder: B, partition_specs: BoundPartitionSpecRef) -> Result { + let arrow_schema = Arc::new(schema_to_arrow_schema(partition_specs.schema())?); + Ok(Self::new_with_custom_schema( + inner_builder, + partition_specs, + arrow_schema, + )) + } + + /// Create a new `FanoutPartitionWriterBuilder` with a custom arrow schema. + /// This function is useful for the user who has the input with extral columns. + pub fn new_with_custom_schema( + inner_builder: B, + partition_specs: BoundPartitionSpecRef, + arrow_schema: ArrowSchemaRef, + ) -> Self { + Self { + inner_builder, + partition_specs, + arrow_schema, + } + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for FanoutPartitionWriterBuilder { + type R = FanoutPartitionWriter; + + async fn build(self) -> Result { + let partition_splitter = + RecordBatchPartitionSpliter::new(&self.arrow_schema, self.partition_specs)?; + Ok(FanoutPartitionWriter { + inner_writer_builder: self.inner_builder, + partition_splitter, + partition_writers: HashMap::new(), + }) + } +} + +/// The fanout partition writer. +/// It will split the input record batch by the partition specs, and write the splitted record batches to the inner writers. +pub struct FanoutPartitionWriter { + inner_writer_builder: B, + partition_splitter: RecordBatchPartitionSpliter, + partition_writers: HashMap, +} + +#[async_trait::async_trait] +impl IcebergWriter for FanoutPartitionWriter { + async fn write(&mut self, input: RecordBatch) -> Result<()> { + let splits = self.partition_splitter.split(&input)?; + + for (partition, record_batch) in splits { + match self.partition_writers.entry(partition) { + Entry::Occupied(entry) => { + entry.into_mut().write(record_batch).await?; + } + Entry::Vacant(entry) => { + let writer = entry.insert(self.inner_writer_builder.clone().build().await?); + writer.write(record_batch).await?; + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let (partition_rows, writers): (Vec<_>, Vec<_>) = self.partition_writers.drain().unzip(); + let partition_values = self.partition_splitter.convert_row(partition_rows)?; + + let mut result = Vec::new(); + for (partition_value, mut writer) in partition_values.into_iter().zip_eq(writers) { + let mut data_files = writer.close().await?; + for data_file in data_files.iter_mut() { + data_file.rewrite_partition(partition_value.clone()); + } + result.append(&mut data_files); + } + + Ok(result) + } +} diff --git a/crates/iceberg/src/writer/function_writer/mod.rs b/crates/iceberg/src/writer/function_writer/mod.rs new file mode 100644 index 000000000..58e2f7671 --- /dev/null +++ b/crates/iceberg/src/writer/function_writer/mod.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains the functional writer. + +pub mod equality_delta_writer; +pub mod fanout_partition_writer; +pub mod precompute_partition_writer;