Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add insert support for iceberg-datafusion #833

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<'a> Transaction<'a> {
}

/// Commit transaction.
pub async fn commit(self, catalog: &impl Catalog) -> Result<Table> {
pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
let table_commit = TableCommit::builder()
.ident(self.table.identifier().clone())
.updates(self.updates)
Expand Down
12 changes: 12 additions & 0 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,17 @@ pub struct ParquetWriter {
current_row_num: usize,
}

impl std::fmt::Debug for ParquetWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ParquetWriter")
.field("schema", &self.schema)
.field("out_file", &self.out_file)
.field("written_size", &self.written_size)
.field("current_row_num", &self.current_row_num)
.finish()
}
}

/// Used to aggregate min and max value of each column.
struct MinMaxColAggregator {
lower_bounds: HashMap<i32, Datum>,
Expand Down Expand Up @@ -441,6 +452,7 @@ impl CurrentFileStatus for ParquetWriter {
/// # NOTES
///
/// We keep this wrapper been used inside only.
#[derive(Debug)]
struct AsyncFileWriter<W: FileWrite>(W);

impl<W: FileWrite> AsyncFileWriter<W> {
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/writer/file_writer/track_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ pub(crate) struct TrackWriter {
written_size: Arc<AtomicI64>,
}

impl std::fmt::Debug for TrackWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TrackWriter").finish()
}
}

impl TrackWriter {
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicI64>) -> Self {
Self {
Expand Down
2 changes: 2 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ mod schema;
mod table;
pub use table::table_provider_factory::IcebergTableProviderFactory;
pub use table::*;

mod sink;
118 changes: 118 additions & 0 deletions crates/integrations/datafusion/src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::any::Any;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::DisplayAs;
use futures::StreamExt;
use iceberg::table::Table;
use iceberg::transaction::Transaction;
use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::IcebergWriter;
use iceberg::Catalog;
use tokio::sync::Mutex;

#[derive(Debug)]
pub struct IcebergSink {
data_file_writer: Mutex<
DataFileWriter<ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>>,
>,
row_count: AtomicU64,
table: Table,
catalog: Arc<dyn Catalog>,
}

impl IcebergSink {
pub(crate) fn new(
data_file_writer: DataFileWriter<
ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>,
>,
catalog: Arc<dyn Catalog>,
table: Table,
) -> Self {
Self {
data_file_writer: Mutex::new(data_file_writer),
row_count: AtomicU64::new(0),
table,
catalog,
}
}

pub(crate) async fn commit(&self) -> Result<()> {
let mut writer = self.data_file_writer.lock().await;
let data_files = writer
.close()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let mut append_action = Transaction::new(&self.table)
.fast_append(None, vec![])
.map_err(|e| DataFusionError::External(Box::new(e)))?;
append_action
.add_data_files(data_files)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let tx = append_action
.apply()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
tx.commit(self.catalog.as_ref())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(())
}
}

impl DisplayAs for IcebergSink {
fn fmt_as(
&self,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
f.write_str("IcebergSink")
}
}

#[async_trait]
impl DataSink for IcebergSink {
fn as_any(&self) -> &dyn Any {
self
}

fn metrics(&self) -> Option<MetricsSet> {
// # TODO: Implement metrics for IcebergSink
None
}

async fn write_all(
&self,
mut data: SendableRecordBatchStream,
_context: &Arc<TaskContext>,
) -> Result<u64> {
while let Some(data_chunk) = data.as_mut().next().await {
if let Ok(data_chunk) = data_chunk {
let row_num = data_chunk.num_rows();
self.data_file_writer
.lock()
.await
.write(data_chunk)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
self.row_count
.fetch_add(row_num as u64, std::sync::atomic::Ordering::Relaxed);
} else {
// # TODO
// Add warn log here
}
}
self.commit().await?;
Ok(self.row_count.load(std::sync::atomic::Ordering::Relaxed))
}
}
65 changes: 64 additions & 1 deletion crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,33 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::catalog::Session;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::error::{DataFusionError as DFError, Result as DFResult};
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::insert::DataSinkExec;
use datafusion::physical_plan::ExecutionPlan;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::spec::DataFileFormat;
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::IcebergWriterBuilder;
use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};

use crate::physical_plan::scan::IcebergTableScan;
use crate::sink::IcebergSink;

/// Represents a [`TableProvider`] for the Iceberg [`Catalog`],
/// managing access to a [`Table`].
#[derive(Debug, Clone)]
pub struct IcebergTableProvider {
/// catalog for this table provider.
/// catalog is optional which is used for insert operation.
catalog: Option<Arc<dyn Catalog>>,
/// A table in the catalog.
table: Table,
/// Table snapshot id that will be queried via this provider.
Expand All @@ -48,11 +62,13 @@ pub struct IcebergTableProvider {
impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
IcebergTableProvider {
catalog: None,
table,
snapshot_id: None,
schema,
}
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given client and table name to fetch an actual [`Table`]
/// in the provided namespace.
Expand All @@ -67,6 +83,7 @@ impl IcebergTableProvider {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

Ok(IcebergTableProvider {
catalog: Some(client),
table,
snapshot_id: None,
schema,
Expand All @@ -78,6 +95,7 @@ impl IcebergTableProvider {
pub async fn try_new_from_table(table: Table) -> Result<Self> {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
Ok(IcebergTableProvider {
catalog: None,
table,
snapshot_id: None,
schema,
Expand All @@ -102,6 +120,7 @@ impl IcebergTableProvider {
let schema = snapshot.schema(table.metadata())?;
let schema = Arc::new(schema_to_arrow_schema(&schema)?);
Ok(IcebergTableProvider {
catalog: None,
table,
snapshot_id: Some(snapshot_id),
schema,
Expand Down Expand Up @@ -147,6 +166,50 @@ impl TableProvider for IcebergTableProvider {
// Push down all filters, as a single source of truth, the scanner will drop the filters which couldn't be push down
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}

async fn insert_into(
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let Some(catalog) = &self.catalog else {
return Err(DFError::Internal(
"Static table can't not be inserted".to_string(),
));
};

if !matches!(insert_op, InsertOp::Append) {
return Err(DFError::Internal(
"Only support append only table".to_string(),
));
}

// create data file writer
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
self.table.metadata().current_schema().clone(),
self.table.file_io().clone(),
DefaultLocationGenerator::new(self.table.metadata().clone())
.map_err(|err| DFError::External(Box::new(err)))?,
DefaultFileNameGenerator::new("".to_string(), None, DataFileFormat::Parquet),
);
let data_file_writer = DataFileWriterBuilder::new(parquet_writer_builder, None)
.build()
.await
.map_err(|err| DFError::External(Box::new(err)))?;

let iceberg_sink = IcebergSink::new(data_file_writer, catalog.clone(), self.table.clone());

Ok(Arc::new(DataSinkExec::new(
input,
Arc::new(iceberg_sink),
schema_to_arrow_schema(self.table.metadata().current_schema())
.map_err(|err| DFError::External(Box::new(err)))?
.into(),
None,
)))
}
}

#[cfg(test)]
Expand Down
Loading