From df461eb87a5c7ebe6fb3c8f39d1e5c37c2e35521 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Mon, 23 Dec 2024 15:59:55 +0800 Subject: [PATCH] init insert support for iceberg-datafusion --- crates/iceberg/src/transaction.rs | 2 +- .../src/writer/file_writer/parquet_writer.rs | 12 ++ .../src/writer/file_writer/track_writer.rs | 6 + crates/integrations/datafusion/src/lib.rs | 2 + .../integrations/datafusion/src/sink/mod.rs | 118 ++++++++++++++++++ .../integrations/datafusion/src/table/mod.rs | 65 +++++++++- 6 files changed, 203 insertions(+), 2 deletions(-) create mode 100644 crates/integrations/datafusion/src/sink/mod.rs diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index cfd6a8381..01806d937 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -161,7 +161,7 @@ impl<'a> Transaction<'a> { } /// Commit transaction. - pub async fn commit(self, catalog: &impl Catalog) -> Result { + pub async fn commit(self, catalog: &dyn Catalog) -> Result
{ let table_commit = TableCommit::builder() .ident(self.table.identifier().clone()) .updates(self.updates) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 596228f7c..5fa9bc96a 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -218,6 +218,17 @@ pub struct ParquetWriter { current_row_num: usize, } +impl std::fmt::Debug for ParquetWriter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetWriter") + .field("schema", &self.schema) + .field("out_file", &self.out_file) + .field("written_size", &self.written_size) + .field("current_row_num", &self.current_row_num) + .finish() + } +} + /// Used to aggregate min and max value of each column. struct MinMaxColAggregator { lower_bounds: HashMap, @@ -441,6 +452,7 @@ impl CurrentFileStatus for ParquetWriter { /// # NOTES /// /// We keep this wrapper been used inside only. +#[derive(Debug)] struct AsyncFileWriter(W); impl AsyncFileWriter { diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs b/crates/iceberg/src/writer/file_writer/track_writer.rs index 6c60a1aa7..c74f884fc 100644 --- a/crates/iceberg/src/writer/file_writer/track_writer.rs +++ b/crates/iceberg/src/writer/file_writer/track_writer.rs @@ -29,6 +29,12 @@ pub(crate) struct TrackWriter { written_size: Arc, } +impl std::fmt::Debug for TrackWriter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TrackWriter").finish() + } +} + impl TrackWriter { pub fn new(writer: Box, written_size: Arc) -> Self { Self { diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index b7b927fdd..4ed3c4355 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -26,3 +26,5 @@ mod schema; mod table; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; + +mod sink; diff --git a/crates/integrations/datafusion/src/sink/mod.rs b/crates/integrations/datafusion/src/sink/mod.rs new file mode 100644 index 000000000..2f9968ee1 --- /dev/null +++ b/crates/integrations/datafusion/src/sink/mod.rs @@ -0,0 +1,118 @@ +use std::any::Any; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::insert::DataSink; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::DisplayAs; +use futures::StreamExt; +use iceberg::table::Table; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::DataFileWriter; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::IcebergWriter; +use iceberg::Catalog; +use tokio::sync::Mutex; + +#[derive(Debug)] +pub struct IcebergSink { + data_file_writer: Mutex< + DataFileWriter>, + >, + row_count: AtomicU64, + table: Table, + catalog: Arc, +} + +impl IcebergSink { + pub(crate) fn new( + data_file_writer: DataFileWriter< + ParquetWriterBuilder, + >, + catalog: Arc, + table: Table, + ) -> Self { + Self { + data_file_writer: Mutex::new(data_file_writer), + row_count: AtomicU64::new(0), + table, + catalog, + } + } + + pub(crate) async fn commit(&self) -> Result<()> { + let mut writer = self.data_file_writer.lock().await; + let data_files = writer + .close() + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let mut append_action = Transaction::new(&self.table) + .fast_append(None, vec![]) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + append_action + .add_data_files(data_files) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let tx = append_action + .apply() + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + tx.commit(self.catalog.as_ref()) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + Ok(()) + } +} + +impl DisplayAs for IcebergSink { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + f.write_str("IcebergSink") + } +} + +#[async_trait] +impl DataSink for IcebergSink { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + // # TODO: Implement metrics for IcebergSink + None + } + + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> Result { + while let Some(data_chunk) = data.as_mut().next().await { + if let Ok(data_chunk) = data_chunk { + let row_num = data_chunk.num_rows(); + self.data_file_writer + .lock() + .await + .write(data_chunk) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + self.row_count + .fetch_add(row_num as u64, std::sync::atomic::Ordering::Relaxed); + } else { + // # TODO + // Add warn log here + } + } + self.commit().await?; + Ok(self.row_count.load(std::sync::atomic::Ordering::Relaxed)) + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e1322..f672d10a0 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -24,19 +24,33 @@ use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::Result as DFResult; +use datafusion::error::{DataFusionError as DFError, Result as DFResult}; +use datafusion::logical_expr::dml::InsertOp; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::parquet::file::properties::WriterProperties; +use datafusion::physical_plan::insert::DataSinkExec; use datafusion::physical_plan::ExecutionPlan; use iceberg::arrow::schema_to_arrow_schema; +use iceberg::spec::DataFileFormat; use iceberg::table::Table; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::IcebergWriterBuilder; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; +use crate::sink::IcebergSink; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. #[derive(Debug, Clone)] pub struct IcebergTableProvider { + /// catalog for this table provider. + /// catalog is optional which is used for insert operation. + catalog: Option>, /// A table in the catalog. table: Table, /// Table snapshot id that will be queried via this provider. @@ -48,11 +62,13 @@ pub struct IcebergTableProvider { impl IcebergTableProvider { pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { IcebergTableProvider { + catalog: None, table, snapshot_id: None, schema, } } + /// Asynchronously tries to construct a new [`IcebergTableProvider`] /// using the given client and table name to fetch an actual [`Table`] /// in the provided namespace. @@ -67,6 +83,7 @@ impl IcebergTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { + catalog: Some(client), table, snapshot_id: None, schema, @@ -78,6 +95,7 @@ impl IcebergTableProvider { pub async fn try_new_from_table(table: Table) -> Result { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { + catalog: None, table, snapshot_id: None, schema, @@ -102,6 +120,7 @@ impl IcebergTableProvider { let schema = snapshot.schema(table.metadata())?; let schema = Arc::new(schema_to_arrow_schema(&schema)?); Ok(IcebergTableProvider { + catalog: None, table, snapshot_id: Some(snapshot_id), schema, @@ -147,6 +166,50 @@ impl TableProvider for IcebergTableProvider { // Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) } + + async fn insert_into( + &self, + _state: &dyn Session, + input: Arc, + insert_op: InsertOp, + ) -> DFResult> { + let Some(catalog) = &self.catalog else { + return Err(DFError::Internal( + "Static table can't not be inserted".to_string(), + )); + }; + + if !matches!(insert_op, InsertOp::Append) { + return Err(DFError::Internal( + "Only support append only table".to_string(), + )); + } + + // create data file writer + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + self.table.metadata().current_schema().clone(), + self.table.file_io().clone(), + DefaultLocationGenerator::new(self.table.metadata().clone()) + .map_err(|err| DFError::External(Box::new(err)))?, + DefaultFileNameGenerator::new("".to_string(), None, DataFileFormat::Parquet), + ); + let data_file_writer = DataFileWriterBuilder::new(parquet_writer_builder, None) + .build() + .await + .map_err(|err| DFError::External(Box::new(err)))?; + + let iceberg_sink = IcebergSink::new(data_file_writer, catalog.clone(), self.table.clone()); + + Ok(Arc::new(DataSinkExec::new( + input, + Arc::new(iceberg_sink), + schema_to_arrow_schema(self.table.metadata().current_schema()) + .map_err(|err| DFError::External(Box::new(err)))? + .into(), + None, + ))) + } } #[cfg(test)]