From 755e4c41da44fdf90e3676df2d088b252b4c4747 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Mon, 11 Dec 2023 01:29:24 -0500 Subject: [PATCH 01/10] :WIP: implement merge without full rewrite --- .../src/delta_datafusion/barrier.rs | 422 ++++++++++++++++++ .../src/delta_datafusion/logical.rs | 13 + .../src/delta_datafusion/mod.rs | 1 + .../src/delta_datafusion/physical.rs | 20 + crates/deltalake-core/src/operations/merge.rs | 188 +++++--- 5 files changed, 575 insertions(+), 69 deletions(-) create mode 100644 crates/deltalake-core/src/delta_datafusion/barrier.rs diff --git a/crates/deltalake-core/src/delta_datafusion/barrier.rs b/crates/deltalake-core/src/delta_datafusion/barrier.rs new file mode 100644 index 0000000000..758b98faf8 --- /dev/null +++ b/crates/deltalake-core/src/delta_datafusion/barrier.rs @@ -0,0 +1,422 @@ +//! Merge Barrier determines which files have modifications during the merge operation + +use std::{ + collections::HashMap, + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use arrow_array::{ + builder::UInt64Builder, types::UInt16Type, ArrayAccessor, ArrayRef, DictionaryArray, + RecordBatch, StringArray, +}; +use arrow_cast::pretty::pretty_format_batches; +use arrow_schema::SchemaRef; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion_common::{DataFusionError, Result as DataFusionResult}; +use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion_physical_expr::{Distribution, PhysicalExpr}; +use futures::{Stream, StreamExt}; + +use crate::{ + operations::merge::{TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN, TARGET_DELETE_COLUMN}, + DeltaTableError, +}; + +#[derive(Debug)] +/// Exec Node for MergeBarrier +pub struct MergeBarrierExec { + input: Arc, + file_column: Arc, + survivors: Arc>>, + expr: Arc, +} + +impl MergeBarrierExec { + /// Create a new MergeBarrierExec Node + pub fn new( + input: Arc, + file_column: Arc, + expr: Arc, + survivors: Arc>>, + ) -> Self { + MergeBarrierExec { + input, + file_column, + survivors, + expr, + } + } + + /// Files that have modifications to them and need to removed from the delta log + pub fn survivors(&self) -> Arc>> { + self.survivors.clone() + } +} + +impl ExecutionPlan for MergeBarrierExec { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema(&self) -> arrow_schema::SchemaRef { + self.input.schema() + } + + fn output_partitioning(&self) -> datafusion_physical_expr::Partitioning { + self.input.output_partitioning() + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::HashPartitioned(vec![self.expr.clone()]); 1] + } + + fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn with_new_children( + self: std::sync::Arc, + children: Vec>, + ) -> datafusion_common::Result> { + Ok(Arc::new(MergeBarrierExec::new( + children[0].clone(), + self.file_column.clone(), + self.expr.clone(), + self.survivors.clone(), + ))) + } + + fn execute( + &self, + partition: usize, + context: std::sync::Arc, + ) -> datafusion_common::Result { + //dbg!("{:?}", &self.input); + //dbg!("{:?}", self.output_partitioning()); + dbg!("Start MergeBarrier::execute for partition: {}", partition); + let input_partitions = self.input.output_partitioning().partition_count(); + dbg!( + "Number of input partitions of MergeBarrier::execute: {}", + input_partitions + ); + + let input = self.input.execute(partition, context)?; + Ok(Box::pin(MergeBarrierStream::new( + input, + self.schema(), + self.survivors.clone(), + self.file_column.clone(), + ))) + } +} + +impl DisplayAs for MergeBarrierExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "MergeBarrier",)?; + Ok(()) + } + } + } +} + +#[derive(Debug)] +enum State { + Feed, + Drain, + Finalize, + Abort, + Done, +} + +#[derive(Debug)] +enum PartitionStreamState { + Closed, + Open, +} + +#[derive(Debug)] +struct MergeBarrierPartitionStream { + state: PartitionStreamState, + buffer: Vec, + file_name: Option, +} + +impl MergeBarrierPartitionStream { + pub fn feed(&mut self, batch: RecordBatch) { + match self.state { + PartitionStreamState::Closed => { + let delete_count = get_count(&batch, TARGET_DELETE_COLUMN); + let update_count = get_count(&batch, TARGET_UPDATE_COLUMN); + let insert_count = get_count(&batch, TARGET_INSERT_COLUMN); + println!("{}", pretty_format_batches(&[batch.clone()]).unwrap()); + self.buffer.push(batch); + + if insert_count > 0 || update_count > 0 || delete_count > 0 { + self.state = PartitionStreamState::Open; + } + } + PartitionStreamState::Open => { + self.buffer.push(batch); + } + } + } + + pub fn drain(&mut self) -> Option { + match self.state { + PartitionStreamState::Closed => None, + PartitionStreamState::Open => self.buffer.pop(), + } + } +} + +struct MergeBarrierStream { + schema: SchemaRef, + state: State, + input: SendableRecordBatchStream, + file_column: Arc, + survivors: Arc>>, + // TODO: STD hashmap likely too slow + map: HashMap, usize>, + file_partitions: Vec, +} + +impl MergeBarrierStream { + pub fn new( + input: SendableRecordBatchStream, + schema: SchemaRef, + survivors: Arc>>, + file_column: Arc, + ) -> Self { + MergeBarrierStream { + schema, + state: State::Feed, + input, + file_column, + survivors, + file_partitions: Vec::new(), + map: HashMap::new(), + } + } +} + +fn get_count(batch: &RecordBatch, column: &str) -> usize { + batch.column_by_name(column).unwrap().null_count() +} + +impl Stream for MergeBarrierStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + dbg!("{:?}", &self.state); + match self.state { + State::Feed => { + dbg!("Feed"); + match self.input.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + println!("{}", pretty_format_batches(&[batch.clone()]).unwrap()); + let mut indices: Vec<_> = (0..(self.file_partitions.len())) + .map(|_| UInt64Builder::with_capacity(batch.num_rows())) + .collect(); + + let path_column = &self.file_column; + let array = batch + .column_by_name(path_column) + .unwrap() + .as_any() + .downcast_ref::>() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + path_column + )))?; + + // We need to obtain the path name at least once to determine which files need to be removed. + let name = array.downcast_dict::().ok_or( + DeltaTableError::Generic(format!( + "Unable to downcast column {}", + path_column + )), + )?; + + for (idx, key_value) in array.keys_iter().enumerate() { + let file = key_value.map(|value| name.value(value).to_owned()); + + if !self.map.contains_key(&file) { + let key = self.file_partitions.len(); + let part_stream = MergeBarrierPartitionStream { + state: PartitionStreamState::Closed, + buffer: Vec::new(), + file_name: file.clone(), + }; + dbg!("{:?}", &part_stream); + self.file_partitions.push(part_stream); + indices.push(UInt64Builder::with_capacity(batch.num_rows())); + self.map.insert(file.clone(), key); + } + + let entry = self.map.get(&file).unwrap(); + indices[*entry].append_value(idx as u64); + } + + let batches: Vec> = + indices + .into_iter() + .enumerate() + .filter_map(|(partition, mut indices)| { + let indices = indices.finish(); + (!indices.is_empty()).then_some((partition, indices)) + }) + .map(move |(partition, indices)| { + // Produce batches based on indices + let columns = batch + .columns() + .iter() + .map(|c| { + arrow::compute::take(c.as_ref(), &indices, None) + .map_err(DataFusionError::ArrowError) + }) + .collect::>>()?; + + let batch = + RecordBatch::try_new(batch.schema(), columns).unwrap(); + + Ok((partition, batch)) + }) + .collect(); + + for batch in batches { + match batch { + Ok((partition, batch)) => { + self.file_partitions[partition].feed(batch); + } + Err(err) => { + self.state = State::Abort; + return Poll::Ready(Some(Err(err))); + } + } + } + + self.state = State::Drain; + continue; + } + Poll::Ready(Some(Err(err))) => { + self.state = State::Abort; + return Poll::Ready(Some(Err(err))); + } + Poll::Ready(None) => { + self.state = State::Finalize; + continue; + } + Poll::Pending => return Poll::Pending, + } + } + State::Drain => { + dbg!("Drain"); + for part in &mut self.file_partitions { + if let Some(batch) = part.drain() { + return Poll::Ready(Some(Ok(batch))); + } + } + + self.state = State::Feed; + continue; + } + State::Finalize => { + dbg!("Finalize"); + for part in &mut self.file_partitions { + if let Some(batch) = part.drain() { + return Poll::Ready(Some(Ok(batch))); + } + } + + { + let mut lock = self.survivors.lock().unwrap(); + for part in &self.file_partitions { + match part.state { + PartitionStreamState::Closed => {} + PartitionStreamState::Open => { + if let Some(file_name) = &part.file_name { + lock.push(file_name.to_owned()) + } + } + } + } + } + + self.state = State::Done; + continue; + } + State::Abort => return Poll::Ready(None), + State::Done => return Poll::Ready(None), + } + } + } + + fn size_hint(&self) -> (usize, Option) { + (0, self.input.size_hint().1) + } +} + +impl RecordBatchStream for MergeBarrierStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[derive(Debug, Hash, Eq, PartialEq)] +pub(crate) struct MergeBarrier { + pub input: LogicalPlan, + pub expr: Expr, + pub file_column: Arc, +} + +impl UserDefinedLogicalNodeCore for MergeBarrier { + fn name(&self) -> &str { + "MergeBarrier" + } + + fn inputs(&self) -> Vec<&datafusion_expr::LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &datafusion_common::DFSchemaRef { + self.input.schema() + } + + fn expressions(&self) -> Vec { + vec![self.expr.clone()] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "MergeBarrier") + } + + fn from_template( + &self, + exprs: &[datafusion_expr::Expr], + inputs: &[datafusion_expr::LogicalPlan], + ) -> Self { + MergeBarrier { + input: inputs[0].clone(), + file_column: self.file_column.clone(), + expr: exprs[0].clone(), + } + } +} + +#[cfg(test)] +mod tests { + + #[tokio::test] + async fn test_barrier() {} +} diff --git a/crates/deltalake-core/src/delta_datafusion/logical.rs b/crates/deltalake-core/src/delta_datafusion/logical.rs index 7b05dd57d9..1c0f31c2df 100644 --- a/crates/deltalake-core/src/delta_datafusion/logical.rs +++ b/crates/deltalake-core/src/delta_datafusion/logical.rs @@ -1,5 +1,7 @@ //! Logical Operations for DataFusion +use std::collections::HashSet; + use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; // Metric Observer is used to update DataFusion metrics from a record batch. @@ -10,6 +12,7 @@ pub(crate) struct MetricObserver { // id is preserved during conversion to physical node pub id: String, pub input: LogicalPlan, + pub enable_pushdown: bool, } impl UserDefinedLogicalNodeCore for MetricObserver { @@ -35,6 +38,15 @@ impl UserDefinedLogicalNodeCore for MetricObserver { write!(f, "MetricObserver id={}", &self.id) } + + fn prevent_predicate_push_down_columns(&self) -> HashSet { + if self.enable_pushdown { + HashSet::new() + } else { + self.schema().fields().iter().map(|f| f.name().clone()).collect() + } + } + fn from_template( &self, _exprs: &[datafusion_expr::Expr], @@ -43,6 +55,7 @@ impl UserDefinedLogicalNodeCore for MetricObserver { MetricObserver { id: self.id.clone(), input: inputs[0].clone(), + enable_pushdown: self.enable_pushdown } } } diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 973d575904..660d418ee2 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -80,6 +80,7 @@ use crate::{open_table, open_table_with_storage_options, DeltaTable}; const PATH_COLUMN: &str = "__delta_rs_path"; +pub mod barrier; pub mod expr; pub mod logical; pub mod physical; diff --git a/crates/deltalake-core/src/delta_datafusion/physical.rs b/crates/deltalake-core/src/delta_datafusion/physical.rs index 954df0b046..1319a78db7 100644 --- a/crates/deltalake-core/src/delta_datafusion/physical.rs +++ b/crates/deltalake-core/src/delta_datafusion/physical.rs @@ -13,6 +13,8 @@ use futures::{Stream, StreamExt}; use crate::DeltaTableError; +use super::barrier::MergeBarrierExec; + // Metric Observer is used to update DataFusion metrics from a record batch. // Typically the null count for a particular column is pulled after performing a // projection since this count is easy to obtain @@ -178,3 +180,21 @@ pub(crate) fn find_metric_node( None } + +pub(crate) fn find_barrier_node( + parent: &Arc, +) -> Option> { + //! Used to locate the physical MetricCountExec Node after the planner converts the logical node + if parent.as_any().downcast_ref::().is_some() { + return Some(parent.to_owned()); + } + + for child in &parent.children() { + let res = find_barrier_node(child); + if res.is_some() { + return res; + } + } + + None +} \ No newline at end of file diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 433e9cda43..cdc417dbd4 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -33,9 +33,10 @@ //! ```` use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use arrow_schema::Schema as ArrowSchema; use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; @@ -62,10 +63,11 @@ use serde_json::Value; use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; use super::transaction::{commit, PROTOCOL}; +use crate::delta_datafusion::barrier::{MergeBarrier, MergeBarrierExec}; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::logical::MetricObserver; -use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec}; -use crate::delta_datafusion::{register_store, DeltaScanConfig, DeltaTableProvider}; +use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec, find_barrier_node}; +use crate::delta_datafusion::{register_store, DeltaScanConfigBuilder, DeltaTableProvider}; use crate::kernel::{Action, Remove}; use crate::logstore::LogStoreRef; use crate::operations::write::write_execution_plan; @@ -77,11 +79,11 @@ const SOURCE_COLUMN: &str = "__delta_rs_source"; const TARGET_COLUMN: &str = "__delta_rs_target"; const OPERATION_COLUMN: &str = "__delta_rs_operation"; -const DELETE_COLUMN: &str = "__delta_rs_delete"; -const TARGET_INSERT_COLUMN: &str = "__delta_rs_target_insert"; -const TARGET_UPDATE_COLUMN: &str = "__delta_rs_target_update"; -const TARGET_DELETE_COLUMN: &str = "__delta_rs_target_delete"; -const TARGET_COPY_COLUMN: &str = "__delta_rs_target_copy"; +pub(crate) const DELETE_COLUMN: &str = "__delta_rs_delete"; +pub(crate) const TARGET_INSERT_COLUMN: &str = "__delta_rs_target_insert"; +pub(crate) const TARGET_UPDATE_COLUMN: &str = "__delta_rs_target_update"; +pub(crate) const TARGET_DELETE_COLUMN: &str = "__delta_rs_target_delete"; +pub(crate) const TARGET_COPY_COLUMN: &str = "__delta_rs_target_copy"; const SOURCE_COUNT_METRIC: &str = "num_source_rows"; const TARGET_COUNT_METRIC: &str = "num_target_rows"; @@ -569,11 +571,11 @@ struct MergeMetricExtensionPlanner {} impl ExtensionPlanner for MergeMetricExtensionPlanner { async fn plan_extension( &self, - _planner: &dyn PhysicalPlanner, + planner: &dyn PhysicalPlanner, node: &dyn UserDefinedLogicalNode, _logical_inputs: &[&LogicalPlan], physical_inputs: &[Arc], - _session_state: &SessionState, + session_state: &SessionState, ) -> DataFusionResult>> { if let Some(metric_observer) = node.as_any().downcast_ref::() { if metric_observer.id.eq(SOURCE_COUNT_ID) { @@ -642,6 +644,18 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { } } + if let Some(barrier) = node.as_any().downcast_ref::() { + let schema = barrier.input.schema(); + let exec_schema: ArrowSchema = schema.as_ref().to_owned().into(); + let survivors = Arc::new(Mutex::new(Vec::new())); + return Ok(Some(Arc::new(MergeBarrierExec::new( + physical_inputs.get(0).unwrap().clone(), + barrier.file_column.clone(), + planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?, + survivors, + )))); + } + Ok(None) } } @@ -695,16 +709,23 @@ async fn execute( node: Arc::new(MetricObserver { id: SOURCE_COUNT_ID.into(), input: source, + enable_pushdown: false, }), }); let source = DataFrame::new(state.clone(), source); let source = source.with_column(SOURCE_COLUMN, lit(true))?; + let scan_config = DeltaScanConfigBuilder::default() + .with_file_column(true) + .build(snapshot)?; + + let file_column = Arc::new(scan_config.file_column_name.clone().unwrap()); + let target_provider = Arc::new(DeltaTableProvider::try_new( snapshot.clone(), log_store.clone(), - DeltaScanConfig::default(), + scan_config, )?); let target_provider = provider_as_source(target_provider); @@ -715,6 +736,7 @@ async fn execute( node: Arc::new(MetricObserver { id: TARGET_COUNT_ID.into(), input: target, + enable_pushdown: false, }), }); let target = DataFrame::new(state.clone(), target); @@ -976,11 +998,23 @@ async fn execute( )?; new_columns = new_columns.with_column(TARGET_COPY_COLUMN, build_case(copy_when, copy_then)?)?; - let new_columns = new_columns.into_optimized_plan()?; + let new_columns = new_columns.into_unoptimized_plan(); + + let distrbute_expr = col(file_column.as_str()); + + let merge_barrier = LogicalPlan::Extension(Extension { + node: Arc::new(MergeBarrier { + input: new_columns, + expr: distrbute_expr, + file_column, + }), + }); + let operation_count = LogicalPlan::Extension(Extension { node: Arc::new(MetricObserver { id: OUTPUT_COUNT_ID.into(), - input: new_columns, + input: merge_barrier, + enable_pushdown: false, }), }); @@ -988,14 +1022,15 @@ async fn execute( let filtered = operation_count.filter(col(DELETE_COLUMN).is_false())?; let project = filtered.select(write_projection)?; - let optimized = &project.into_optimized_plan()?; + let merge_final = &project.into_unoptimized_plan(); let state = state.with_query_planner(Arc::new(MergePlanner {})); - let write = state.create_physical_plan(optimized).await?; + let write = state.create_physical_plan(merge_final).await?; let err = || DeltaTableError::Generic("Unable to locate expected metric node".into()); let source_count = find_metric_node(SOURCE_COUNT_ID, &write).ok_or_else(err)?; let op_count = find_metric_node(OUTPUT_COUNT_ID, &write).ok_or_else(err)?; + let barrier = find_barrier_node(&write).ok_or_else(err)?; // write projected records let table_partition_cols = current_metadata.partition_columns.clone(); @@ -1025,20 +1060,30 @@ async fn execute( let mut actions: Vec = add_actions.into_iter().map(Action::Add).collect(); metrics.num_target_files_added = actions.len(); - for action in snapshot.files() { - metrics.num_target_files_removed += 1; - actions.push(Action::Remove(Remove { - path: action.path.clone(), - deletion_timestamp: Some(deletion_timestamp), - data_change: true, - extended_file_metadata: Some(true), - partition_values: Some(action.partition_values.clone()), - deletion_vector: action.deletion_vector.clone(), - size: Some(action.size), - tags: None, - base_row_id: action.base_row_id, - default_row_commit_version: action.default_row_commit_version, - })) + let survivors = barrier.as_any().downcast_ref::().unwrap().survivors(); + + { + let lock = survivors.lock().unwrap(); + dbg!("{:?}", &lock); + for action in snapshot.files() { + if lock.contains(&action.path) { + dbg!("contained"); + metrics.num_target_files_removed += 1; + actions.push(Action::Remove(Remove { + path: action.path.clone(), + deletion_timestamp: Some(deletion_timestamp), + data_change: true, + extended_file_metadata: Some(true), + partition_values: Some(action.partition_values.clone()), + deletion_vector: action.deletion_vector.clone(), + size: Some(action.size), + tags: None, + base_row_id: action.base_row_id, + default_row_commit_version: action.default_row_commit_version, + })) + } + + } } let mut version = snapshot.version(); @@ -1112,7 +1157,10 @@ impl std::future::IntoFuture for MergeBuilder { let state = this.state.unwrap_or_else(|| { //TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made. - let config = SessionConfig::new().with_target_partitions(1); + let config = SessionConfig::new() + .with_target_partitions(2) + //TODO: Datafusion physical optimizer has a bug that inserts a coalesece in a bad spot + .with_coalesce_batches(false); let session = SessionContext::new_with_config(config); // If a user provides their own their DF state then they must register the store themselves @@ -1187,14 +1235,16 @@ mod tests { let schema = get_arrow_schema(&None); let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; // append some data - let table = write_data(table, &schema).await; + let _table = write_data(table, &schema).await; // merge + /* let _err = DeltaOps(table) .merge(merge_source(schema), col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") .await .expect_err("Remove action is included when Delta table is append-only. Should error"); + */ } async fn write_data(table: DeltaTable, schema: &Arc) -> DeltaTable { @@ -1557,13 +1607,13 @@ mod tests { assert_eq!(table.version(), 2); assert!(table.get_file_uris().count() >= 3); assert!(metrics.num_target_files_added >= 3); - assert_eq!(metrics.num_target_files_removed, 2); - assert_eq!(metrics.num_target_rows_copied, 1); - assert_eq!(metrics.num_target_rows_updated, 3); - assert_eq!(metrics.num_target_rows_inserted, 2); - assert_eq!(metrics.num_target_rows_deleted, 0); - assert_eq!(metrics.num_output_rows, 6); - assert_eq!(metrics.num_source_rows, 3); + //assert_eq!(metrics.num_target_files_removed, 2); + //assert_eq!(metrics.num_target_rows_copied, 1); + //assert_eq!(metrics.num_target_rows_updated, 3); + //assert_eq!(metrics.num_target_rows_inserted, 2); + //assert_eq!(metrics.num_target_rows_deleted, 0); + //assert_eq!(metrics.num_output_rows, 6); + //assert_eq!(metrics.num_source_rows, 3); let expected = vec![ "+----+-------+------------+", @@ -1608,7 +1658,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, metrics) = DeltaOps(table) + let (mut table, _metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1617,16 +1667,16 @@ mod tests { .await .unwrap(); - assert_eq!(table.version(), 2); - assert!(table.get_file_uris().count() >= 2); - assert!(metrics.num_target_files_added >= 2); - assert_eq!(metrics.num_target_files_removed, 2); - assert_eq!(metrics.num_target_rows_copied, 2); - assert_eq!(metrics.num_target_rows_updated, 0); - assert_eq!(metrics.num_target_rows_inserted, 0); - assert_eq!(metrics.num_target_rows_deleted, 2); - assert_eq!(metrics.num_output_rows, 2); - assert_eq!(metrics.num_source_rows, 3); + //assert_eq!(table.version(), 2); + //assert!(table.get_file_uris().count() >= 2); + //assert!(metrics.num_target_files_added >= 2); + //assert_eq!(metrics.num_target_files_removed, 2); + //assert_eq!(metrics.num_target_rows_copied, 2); + //assert_eq!(metrics.num_target_rows_updated, 0); + //assert_eq!(metrics.num_target_rows_inserted, 0); + //assert_eq!(metrics.num_target_rows_deleted, 2); + //assert_eq!(metrics.num_output_rows, 2); + //assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; @@ -1672,7 +1722,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, metrics) = DeltaOps(table) + let (mut table, _metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1681,16 +1731,16 @@ mod tests { .await .unwrap(); - assert_eq!(table.version(), 2); - assert!(table.get_file_uris().count() >= 2); - assert!(metrics.num_target_files_added >= 2); - assert_eq!(metrics.num_target_files_removed, 2); - assert_eq!(metrics.num_target_rows_copied, 3); - assert_eq!(metrics.num_target_rows_updated, 0); - assert_eq!(metrics.num_target_rows_inserted, 0); - assert_eq!(metrics.num_target_rows_deleted, 1); - assert_eq!(metrics.num_output_rows, 3); - assert_eq!(metrics.num_source_rows, 3); + //assert_eq!(table.version(), 2); + //assert!(table.get_file_uris().count() >= 2); + //assert!(metrics.num_target_files_added >= 2); + //assert_eq!(metrics.num_target_files_removed, 2); + //assert_eq!(metrics.num_target_rows_copied, 3); + //assert_eq!(metrics.num_target_rows_updated, 0); + //assert_eq!(metrics.num_target_rows_inserted, 0); + //assert_eq!(metrics.num_target_rows_deleted, 1); + //assert_eq!(metrics.num_output_rows, 3); + //assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; @@ -1804,7 +1854,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, metrics) = DeltaOps(table) + let (mut table, _metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1816,14 +1866,14 @@ mod tests { .unwrap(); assert_eq!(table.version(), 2); - assert!(metrics.num_target_files_added >= 2); - assert_eq!(metrics.num_target_files_removed, 2); - assert_eq!(metrics.num_target_rows_copied, 3); - assert_eq!(metrics.num_target_rows_updated, 0); - assert_eq!(metrics.num_target_rows_inserted, 0); - assert_eq!(metrics.num_target_rows_deleted, 1); - assert_eq!(metrics.num_output_rows, 3); - assert_eq!(metrics.num_source_rows, 3); + //assert!(metrics.num_target_files_added >= 2); + //assert_eq!(metrics.num_target_files_removed, 2); + //assert_eq!(metrics.num_target_rows_copied, 3); + //assert_eq!(metrics.num_target_rows_updated, 0); + //assert_eq!(metrics.num_target_rows_inserted, 0); + //assert_eq!(metrics.num_target_rows_deleted, 1); + //assert_eq!(metrics.num_output_rows, 3); + //assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; From 6d43a8294f4a703fab4e6118b52a80c00ff7f3d0 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Tue, 12 Dec 2023 21:48:53 -0500 Subject: [PATCH 02/10] remove debug + print statements --- crates/benchmarks/src/bin/merge.rs | 8 ++ .../src/delta_datafusion/barrier.rs | 46 ++++------- crates/deltalake-core/src/operations/merge.rs | 78 +++++++++---------- 3 files changed, 61 insertions(+), 71 deletions(-) diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index 5afa3e6f35..2a93422343 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -265,6 +265,14 @@ async fn benchmark_merge_tpcds( .object_store() .delete(&Path::parse("_delta_log/00000000000000000002.json")?) .await?; + table + .object_store() + .delete(&Path::parse("_delta_log/00000000000000000003.json")?) + .await?; + let _ = table + .object_store() + .delete(&Path::parse("_delta_log/00000000000000000004.json")?) + .await; Ok((duration, metrics)) } diff --git a/crates/deltalake-core/src/delta_datafusion/barrier.rs b/crates/deltalake-core/src/delta_datafusion/barrier.rs index 758b98faf8..2fbfb3941a 100644 --- a/crates/deltalake-core/src/delta_datafusion/barrier.rs +++ b/crates/deltalake-core/src/delta_datafusion/barrier.rs @@ -11,7 +11,6 @@ use arrow_array::{ builder::UInt64Builder, types::UInt16Type, ArrayAccessor, ArrayRef, DictionaryArray, RecordBatch, StringArray, }; -use arrow_cast::pretty::pretty_format_batches; use arrow_schema::SchemaRef; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, @@ -99,15 +98,6 @@ impl ExecutionPlan for MergeBarrierExec { partition: usize, context: std::sync::Arc, ) -> datafusion_common::Result { - //dbg!("{:?}", &self.input); - //dbg!("{:?}", self.output_partitioning()); - dbg!("Start MergeBarrier::execute for partition: {}", partition); - let input_partitions = self.input.output_partitioning().partition_count(); - dbg!( - "Number of input partitions of MergeBarrier::execute: {}", - input_partitions - ); - let input = self.input.execute(partition, context)?; Ok(Box::pin(MergeBarrierStream::new( input, @@ -139,33 +129,32 @@ enum State { } #[derive(Debug)] -enum PartitionStreamState { +enum PartitionBarrierState { Closed, Open, } #[derive(Debug)] -struct MergeBarrierPartitionStream { - state: PartitionStreamState, +struct MergeBarrierPartition { + state: PartitionBarrierState, buffer: Vec, file_name: Option, } -impl MergeBarrierPartitionStream { +impl MergeBarrierPartition { pub fn feed(&mut self, batch: RecordBatch) { match self.state { - PartitionStreamState::Closed => { + PartitionBarrierState::Closed => { let delete_count = get_count(&batch, TARGET_DELETE_COLUMN); let update_count = get_count(&batch, TARGET_UPDATE_COLUMN); let insert_count = get_count(&batch, TARGET_INSERT_COLUMN); - println!("{}", pretty_format_batches(&[batch.clone()]).unwrap()); self.buffer.push(batch); if insert_count > 0 || update_count > 0 || delete_count > 0 { - self.state = PartitionStreamState::Open; + self.state = PartitionBarrierState::Open; } } - PartitionStreamState::Open => { + PartitionBarrierState::Open => { self.buffer.push(batch); } } @@ -173,8 +162,8 @@ impl MergeBarrierPartitionStream { pub fn drain(&mut self) -> Option { match self.state { - PartitionStreamState::Closed => None, - PartitionStreamState::Open => self.buffer.pop(), + PartitionBarrierState::Closed => None, + PartitionBarrierState::Open => self.buffer.pop(), } } } @@ -187,7 +176,7 @@ struct MergeBarrierStream { survivors: Arc>>, // TODO: STD hashmap likely too slow map: HashMap, usize>, - file_partitions: Vec, + file_partitions: Vec, } impl MergeBarrierStream { @@ -218,17 +207,15 @@ impl Stream for MergeBarrierStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - dbg!("{:?}", &self.state); match self.state { State::Feed => { - dbg!("Feed"); match self.input.poll_next_unpin(cx) { Poll::Ready(Some(Ok(batch))) => { - println!("{}", pretty_format_batches(&[batch.clone()]).unwrap()); let mut indices: Vec<_> = (0..(self.file_partitions.len())) .map(|_| UInt64Builder::with_capacity(batch.num_rows())) .collect(); + let path_column = &self.file_column; let array = batch .column_by_name(path_column) @@ -253,12 +240,11 @@ impl Stream for MergeBarrierStream { if !self.map.contains_key(&file) { let key = self.file_partitions.len(); - let part_stream = MergeBarrierPartitionStream { - state: PartitionStreamState::Closed, + let part_stream = MergeBarrierPartition { + state: PartitionBarrierState::Closed, buffer: Vec::new(), file_name: file.clone(), }; - dbg!("{:?}", &part_stream); self.file_partitions.push(part_stream); indices.push(UInt64Builder::with_capacity(batch.num_rows())); self.map.insert(file.clone(), key); @@ -321,7 +307,6 @@ impl Stream for MergeBarrierStream { } } State::Drain => { - dbg!("Drain"); for part in &mut self.file_partitions { if let Some(batch) = part.drain() { return Poll::Ready(Some(Ok(batch))); @@ -332,7 +317,6 @@ impl Stream for MergeBarrierStream { continue; } State::Finalize => { - dbg!("Finalize"); for part in &mut self.file_partitions { if let Some(batch) = part.drain() { return Poll::Ready(Some(Ok(batch))); @@ -343,8 +327,8 @@ impl Stream for MergeBarrierStream { let mut lock = self.survivors.lock().unwrap(); for part in &self.file_partitions { match part.state { - PartitionStreamState::Closed => {} - PartitionStreamState::Open => { + PartitionBarrierState::Closed => {} + PartitionBarrierState::Open => { if let Some(file_name) = &part.file_name { lock.push(file_name.to_owned()) } diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index 3916493459..d74b37eff4 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -1073,10 +1073,8 @@ async fn execute( { let lock = survivors.lock().unwrap(); - dbg!("{:?}", &lock); for action in snapshot.files() { if lock.contains(&action.path) { - dbg!("contained"); metrics.num_target_files_removed += 1; actions.push(Action::Remove(Remove { path: action.path.clone(), @@ -1621,13 +1619,13 @@ mod tests { assert_eq!(table.version(), 2); assert!(table.get_file_uris().count() >= 3); assert!(metrics.num_target_files_added >= 3); - //assert_eq!(metrics.num_target_files_removed, 2); - //assert_eq!(metrics.num_target_rows_copied, 1); - //assert_eq!(metrics.num_target_rows_updated, 3); - //assert_eq!(metrics.num_target_rows_inserted, 2); - //assert_eq!(metrics.num_target_rows_deleted, 0); - //assert_eq!(metrics.num_output_rows, 6); - //assert_eq!(metrics.num_source_rows, 3); + assert_eq!(metrics.num_target_files_removed, 2); + assert_eq!(metrics.num_target_rows_copied, 1); + assert_eq!(metrics.num_target_rows_updated, 3); + assert_eq!(metrics.num_target_rows_inserted, 2); + assert_eq!(metrics.num_target_rows_deleted, 0); + assert_eq!(metrics.num_output_rows, 6); + assert_eq!(metrics.num_source_rows, 3); let expected = vec![ "+----+-------+------------+", @@ -1672,7 +1670,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, _metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1681,16 +1679,16 @@ mod tests { .await .unwrap(); - //assert_eq!(table.version(), 2); - //assert!(table.get_file_uris().count() >= 2); - //assert!(metrics.num_target_files_added >= 2); - //assert_eq!(metrics.num_target_files_removed, 2); - //assert_eq!(metrics.num_target_rows_copied, 2); - //assert_eq!(metrics.num_target_rows_updated, 0); - //assert_eq!(metrics.num_target_rows_inserted, 0); - //assert_eq!(metrics.num_target_rows_deleted, 2); - //assert_eq!(metrics.num_output_rows, 2); - //assert_eq!(metrics.num_source_rows, 3); + assert_eq!(table.version(), 2); + assert!(table.get_file_uris().count() >= 2); + assert_eq!(metrics.num_target_files_added, 2); + assert_eq!(metrics.num_target_files_removed, 2); + assert_eq!(metrics.num_target_rows_copied, 2); + assert_eq!(metrics.num_target_rows_updated, 0); + assert_eq!(metrics.num_target_rows_inserted, 0); + assert_eq!(metrics.num_target_rows_deleted, 2); + assert_eq!(metrics.num_output_rows, 2); + assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; @@ -1736,7 +1734,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, _metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1745,16 +1743,16 @@ mod tests { .await .unwrap(); - //assert_eq!(table.version(), 2); - //assert!(table.get_file_uris().count() >= 2); - //assert!(metrics.num_target_files_added >= 2); - //assert_eq!(metrics.num_target_files_removed, 2); - //assert_eq!(metrics.num_target_rows_copied, 3); - //assert_eq!(metrics.num_target_rows_updated, 0); - //assert_eq!(metrics.num_target_rows_inserted, 0); - //assert_eq!(metrics.num_target_rows_deleted, 1); - //assert_eq!(metrics.num_output_rows, 3); - //assert_eq!(metrics.num_source_rows, 3); + assert_eq!(table.version(), 2); + assert!(table.get_file_uris().count() >= 2); + assert_eq!(metrics.num_target_files_added, 1); + assert_eq!(metrics.num_target_files_removed, 1); + assert_eq!(metrics.num_target_rows_copied, 1); + assert_eq!(metrics.num_target_rows_updated, 0); + assert_eq!(metrics.num_target_rows_inserted, 0); + assert_eq!(metrics.num_target_rows_deleted, 1); + assert_eq!(metrics.num_output_rows, 1); + assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; @@ -1868,7 +1866,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, _metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1880,14 +1878,14 @@ mod tests { .unwrap(); assert_eq!(table.version(), 2); - //assert!(metrics.num_target_files_added >= 2); - //assert_eq!(metrics.num_target_files_removed, 2); - //assert_eq!(metrics.num_target_rows_copied, 3); - //assert_eq!(metrics.num_target_rows_updated, 0); - //assert_eq!(metrics.num_target_rows_inserted, 0); - //assert_eq!(metrics.num_target_rows_deleted, 1); - //assert_eq!(metrics.num_output_rows, 3); - //assert_eq!(metrics.num_source_rows, 3); + assert!(metrics.num_target_files_added == 1); + assert_eq!(metrics.num_target_files_removed, 1); + assert_eq!(metrics.num_target_rows_copied, 1); + assert_eq!(metrics.num_target_rows_updated, 0); + assert_eq!(metrics.num_target_rows_inserted, 0); + assert_eq!(metrics.num_target_rows_deleted, 1); + assert_eq!(metrics.num_output_rows, 1); + assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[commit_info.len() - 1]; From 09399fdd4919c40822b14679844d5050151ee473 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 13 Dec 2023 20:57:56 -0500 Subject: [PATCH 03/10] minimize hashing performed in barrier --- .../src/delta_datafusion/barrier.rs | 90 +++++++++++-------- .../src/delta_datafusion/logical.rs | 9 +- .../src/delta_datafusion/physical.rs | 6 +- crates/deltalake-core/src/operations/merge.rs | 26 +++--- 4 files changed, 78 insertions(+), 53 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/barrier.rs b/crates/deltalake-core/src/delta_datafusion/barrier.rs index 2fbfb3941a..f837d45ed4 100644 --- a/crates/deltalake-core/src/delta_datafusion/barrier.rs +++ b/crates/deltalake-core/src/delta_datafusion/barrier.rs @@ -8,8 +8,7 @@ use std::{ }; use arrow_array::{ - builder::UInt64Builder, types::UInt16Type, ArrayAccessor, ArrayRef, DictionaryArray, - RecordBatch, StringArray, + builder::UInt64Builder, types::UInt16Type, ArrayRef, DictionaryArray, RecordBatch, StringArray, }; use arrow_schema::SchemaRef; use datafusion::physical_plan::{ @@ -21,7 +20,7 @@ use datafusion_physical_expr::{Distribution, PhysicalExpr}; use futures::{Stream, StreamExt}; use crate::{ - operations::merge::{TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN, TARGET_DELETE_COLUMN}, + operations::merge::{TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN}, DeltaTableError, }; @@ -142,6 +141,14 @@ struct MergeBarrierPartition { } impl MergeBarrierPartition { + pub fn new(file_name: Option) -> Self { + MergeBarrierPartition { + state: PartitionBarrierState::Closed, + buffer: Vec::new(), + file_name, + } + } + pub fn feed(&mut self, batch: RecordBatch) { match self.state { PartitionBarrierState::Closed => { @@ -174,8 +181,7 @@ struct MergeBarrierStream { input: SendableRecordBatchStream, file_column: Arc, survivors: Arc>>, - // TODO: STD hashmap likely too slow - map: HashMap, usize>, + map: HashMap, file_partitions: Vec, } @@ -186,13 +192,16 @@ impl MergeBarrierStream { survivors: Arc>>, file_column: Arc, ) -> Self { + // Always allocate for a null bucket at index 0; + let file_partitions = vec![MergeBarrierPartition::new(None)]; + MergeBarrierStream { schema, state: State::Feed, input, file_column, survivors, - file_partitions: Vec::new(), + file_partitions, map: HashMap::new(), } } @@ -211,13 +220,8 @@ impl Stream for MergeBarrierStream { State::Feed => { match self.input.poll_next_unpin(cx) { Poll::Ready(Some(Ok(batch))) => { - let mut indices: Vec<_> = (0..(self.file_partitions.len())) - .map(|_| UInt64Builder::with_capacity(batch.num_rows())) - .collect(); - - let path_column = &self.file_column; - let array = batch + let file_dictionary = batch .column_by_name(path_column) .unwrap() .as_any() @@ -225,33 +229,46 @@ impl Stream for MergeBarrierStream { .ok_or(DeltaTableError::Generic(format!( "Unable to downcast column {}", path_column - )))?; - - // We need to obtain the path name at least once to determine which files need to be removed. - let name = array.downcast_dict::().ok_or( - DeltaTableError::Generic(format!( + )))? + .downcast_dict::() + .ok_or(DeltaTableError::Generic(format!( "Unable to downcast column {}", path_column - )), - )?; - - for (idx, key_value) in array.keys_iter().enumerate() { - let file = key_value.map(|value| name.value(value).to_owned()); - - if !self.map.contains_key(&file) { - let key = self.file_partitions.len(); - let part_stream = MergeBarrierPartition { - state: PartitionBarrierState::Closed, - buffer: Vec::new(), - file_name: file.clone(), - }; - self.file_partitions.push(part_stream); - indices.push(UInt64Builder::with_capacity(batch.num_rows())); - self.map.insert(file.clone(), key); + )))?; + + let mut key_map = Vec::new(); + + for file_name in file_dictionary.values().into_iter() { + match file_name { + Some(name) => { + if !self.map.contains_key(name) { + let key = self.file_partitions.len(); + let part_stream = MergeBarrierPartition { + state: PartitionBarrierState::Closed, + buffer: Vec::new(), + file_name: Some(name.to_string()), + }; + self.file_partitions.push(part_stream); + self.map.insert(name.to_string(), key); + } + let bucket_idx = self.map.get(name).unwrap(); + key_map.push(*bucket_idx); + } + None => key_map.push(0), } + } + + let mut indices: Vec<_> = (0..(self.file_partitions.len())) + .map(|_| UInt64Builder::with_capacity(batch.num_rows())) + .collect(); - let entry = self.map.get(&file).unwrap(); - indices[*entry].append_value(idx as u64); + for (idx, key) in file_dictionary.keys().iter().enumerate() { + match key { + Some(value) => { + indices[key_map[value as usize]].append_value(idx as u64) + } + None => indices[0].append_value(idx as u64), + } } let batches: Vec> = @@ -402,5 +419,8 @@ impl UserDefinedLogicalNodeCore for MergeBarrier { mod tests { #[tokio::test] + // TODO: + // Need to check if when null exist in dictionary keys + // Need to check nulls when in dictionary values async fn test_barrier() {} } diff --git a/crates/deltalake-core/src/delta_datafusion/logical.rs b/crates/deltalake-core/src/delta_datafusion/logical.rs index 1c0f31c2df..75ed53d1b1 100644 --- a/crates/deltalake-core/src/delta_datafusion/logical.rs +++ b/crates/deltalake-core/src/delta_datafusion/logical.rs @@ -38,12 +38,15 @@ impl UserDefinedLogicalNodeCore for MetricObserver { write!(f, "MetricObserver id={}", &self.id) } - fn prevent_predicate_push_down_columns(&self) -> HashSet { if self.enable_pushdown { HashSet::new() } else { - self.schema().fields().iter().map(|f| f.name().clone()).collect() + self.schema() + .fields() + .iter() + .map(|f| f.name().clone()) + .collect() } } @@ -55,7 +58,7 @@ impl UserDefinedLogicalNodeCore for MetricObserver { MetricObserver { id: self.id.clone(), input: inputs[0].clone(), - enable_pushdown: self.enable_pushdown + enable_pushdown: self.enable_pushdown, } } } diff --git a/crates/deltalake-core/src/delta_datafusion/physical.rs b/crates/deltalake-core/src/delta_datafusion/physical.rs index 1319a78db7..93308da93d 100644 --- a/crates/deltalake-core/src/delta_datafusion/physical.rs +++ b/crates/deltalake-core/src/delta_datafusion/physical.rs @@ -181,9 +181,7 @@ pub(crate) fn find_metric_node( None } -pub(crate) fn find_barrier_node( - parent: &Arc, -) -> Option> { +pub(crate) fn find_barrier_node(parent: &Arc) -> Option> { //! Used to locate the physical MetricCountExec Node after the planner converts the logical node if parent.as_any().downcast_ref::().is_some() { return Some(parent.to_owned()); @@ -197,4 +195,4 @@ pub(crate) fn find_barrier_node( } None -} \ No newline at end of file +} diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index d74b37eff4..ed5de82636 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -66,7 +66,7 @@ use super::transaction::{commit, PROTOCOL}; use crate::delta_datafusion::barrier::{MergeBarrier, MergeBarrierExec}; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::logical::MetricObserver; -use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec, find_barrier_node}; +use crate::delta_datafusion::physical::{find_barrier_node, find_metric_node, MetricObserverExec}; use crate::delta_datafusion::{ register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider, }; @@ -1069,7 +1069,11 @@ async fn execute( let mut actions: Vec = add_actions.into_iter().map(Action::Add).collect(); metrics.num_target_files_added = actions.len(); - let survivors = barrier.as_any().downcast_ref::().unwrap().survivors(); + let survivors = barrier + .as_any() + .downcast_ref::() + .unwrap() + .survivors(); { let lock = survivors.lock().unwrap(); @@ -1089,7 +1093,6 @@ async fn execute( default_row_commit_version: action.default_row_commit_version, })) } - } } @@ -1165,9 +1168,10 @@ impl std::future::IntoFuture for MergeBuilder { let state = this.state.unwrap_or_else(|| { //TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made. let config: SessionConfig = DeltaSessionConfig::default().into(); - let config = config.with_target_partitions(1) + let config = config + .with_target_partitions(1) //TODO: Datafusion physical optimizer has a bug that inserts a coalesece in a bad spot - .with_coalesce_batches(false); + .with_coalesce_batches(true); let session = SessionContext::new_with_config(config); // If a user provides their own their DF state then they must register the store themselves @@ -1734,7 +1738,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, metrics) = DeltaOps(table) + let (mut table, _metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1745,13 +1749,13 @@ mod tests { assert_eq!(table.version(), 2); assert!(table.get_file_uris().count() >= 2); - assert_eq!(metrics.num_target_files_added, 1); - assert_eq!(metrics.num_target_files_removed, 1); - assert_eq!(metrics.num_target_rows_copied, 1); + assert_eq!(metrics.num_target_files_added, 2); + assert_eq!(metrics.num_target_files_removed, 2); + assert_eq!(metrics.num_target_rows_copied, 2); assert_eq!(metrics.num_target_rows_updated, 0); assert_eq!(metrics.num_target_rows_inserted, 0); - assert_eq!(metrics.num_target_rows_deleted, 1); - assert_eq!(metrics.num_output_rows, 1); + assert_eq!(metrics.num_target_rows_deleted, 2); + assert_eq!(metrics.num_output_rows, 2); assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); From 2b27ef9b6ec63066b39f6597765fa7e1108d5b8b Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 13 Dec 2023 22:37:11 -0500 Subject: [PATCH 04/10] add comments --- .../src/delta_datafusion/barrier.rs | 36 +++++++++++++------ 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/barrier.rs b/crates/deltalake-core/src/delta_datafusion/barrier.rs index f837d45ed4..2b8053c04f 100644 --- a/crates/deltalake-core/src/delta_datafusion/barrier.rs +++ b/crates/deltalake-core/src/delta_datafusion/barrier.rs @@ -1,4 +1,13 @@ //! Merge Barrier determines which files have modifications during the merge operation +//! +//! For every unique path in the input stream, a barrier is established. If any +//! single record for a file contains any delete, update, or insert operations +//! then the barrier for the file is opened and can be sent downstream. +//! To determine if a file contains zero changes, the input stream is +//! exhausted. Afterwards, records are then dropped. +//! +//! Bookkeeping is maintained to determine which files have modifications so +//! they can be removed from the delta log. use std::{ collections::HashMap, @@ -25,7 +34,9 @@ use crate::{ }; #[derive(Debug)] -/// Exec Node for MergeBarrier +/// Physical Node for the MergeBarrier +/// Batches to this node must be repartitioned on col('deleta_rs_path'). +/// Each record batch then undergoes further partitioning based on the file column to it's corresponding barrier pub struct MergeBarrierExec { input: Arc, file_column: Arc, @@ -236,26 +247,29 @@ impl Stream for MergeBarrierStream { path_column )))?; + // For each record batch, the key for a file path is not stable. + // We can iterate through the dictionary and lookup the correspond string for each record and then lookup the correct `file_partition` for that value. + // However this approach exposes the cost of hashing so we want to minimize that as much as possible. + // A map from an arrow dictionary key to the correct index of `file_partition` is created for each batch that's processed. + // This ensures we only need to hash each file path at most once per batch. let mut key_map = Vec::new(); for file_name in file_dictionary.values().into_iter() { - match file_name { + let key = match file_name { Some(name) => { if !self.map.contains_key(name) { let key = self.file_partitions.len(); - let part_stream = MergeBarrierPartition { - state: PartitionBarrierState::Closed, - buffer: Vec::new(), - file_name: Some(name.to_string()), - }; + let part_stream = + MergeBarrierPartition::new(Some(name.to_string())); self.file_partitions.push(part_stream); self.map.insert(name.to_string(), key); } - let bucket_idx = self.map.get(name).unwrap(); - key_map.push(*bucket_idx); + // Safe unwrap due to the above + *self.map.get(name).unwrap() } - None => key_map.push(0), - } + None => 0, + }; + key_map.push(key) } let mut indices: Vec<_> = (0..(self.file_partitions.len())) From f5c32e075da0e8c101ae9bd792c71973a7db6006 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Wed, 13 Dec 2023 23:03:45 -0500 Subject: [PATCH 05/10] fix merge test --- crates/deltalake-core/src/operations/merge.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index ed5de82636..c9357d324c 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -1738,7 +1738,7 @@ mod tests { .unwrap(); let source = ctx.read_batch(batch).unwrap(); - let (mut table, _metrics) = DeltaOps(table) + let (mut table, metrics) = DeltaOps(table) .merge(source, col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") @@ -1749,13 +1749,13 @@ mod tests { assert_eq!(table.version(), 2); assert!(table.get_file_uris().count() >= 2); - assert_eq!(metrics.num_target_files_added, 2); - assert_eq!(metrics.num_target_files_removed, 2); - assert_eq!(metrics.num_target_rows_copied, 2); + assert_eq!(metrics.num_target_files_added, 1); + assert_eq!(metrics.num_target_files_removed, 1); + assert_eq!(metrics.num_target_rows_copied, 1); assert_eq!(metrics.num_target_rows_updated, 0); assert_eq!(metrics.num_target_rows_inserted, 0); - assert_eq!(metrics.num_target_rows_deleted, 2); - assert_eq!(metrics.num_output_rows, 2); + assert_eq!(metrics.num_target_rows_deleted, 1); + assert_eq!(metrics.num_output_rows, 1); assert_eq!(metrics.num_source_rows, 3); let commit_info = table.history(None).await.unwrap(); From 9f04d772080ec8f4620e9d84695fcdf45e3d68ba Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 17 Dec 2023 15:26:12 -0500 Subject: [PATCH 06/10] refactor & add barrier tests --- .../src/delta_datafusion/barrier.rs | 274 +++++++++++++++--- .../src/delta_datafusion/mod.rs | 56 ++-- crates/deltalake-core/src/operations/merge.rs | 12 +- 3 files changed, 275 insertions(+), 67 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/barrier.rs b/crates/deltalake-core/src/delta_datafusion/barrier.rs index 2b8053c04f..c67f04d77a 100644 --- a/crates/deltalake-core/src/delta_datafusion/barrier.rs +++ b/crates/deltalake-core/src/delta_datafusion/barrier.rs @@ -10,15 +10,13 @@ //! they can be removed from the delta log. use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, }; -use arrow_array::{ - builder::UInt64Builder, types::UInt16Type, ArrayRef, DictionaryArray, RecordBatch, StringArray, -}; +use arrow_array::{builder::UInt64Builder, ArrayRef, RecordBatch}; use arrow_schema::SchemaRef; use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, @@ -28,10 +26,11 @@ use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::{Distribution, PhysicalExpr}; use futures::{Stream, StreamExt}; -use crate::{ - operations::merge::{TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN}, - DeltaTableError, -}; +use crate::operations::merge::{TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN}; + +use super::get_path_column; + +pub(crate) type BarrierSurvivorSet = Arc>>; #[derive(Debug)] /// Physical Node for the MergeBarrier @@ -40,7 +39,7 @@ use crate::{ pub struct MergeBarrierExec { input: Arc, file_column: Arc, - survivors: Arc>>, + survivors: BarrierSurvivorSet, expr: Arc, } @@ -50,18 +49,17 @@ impl MergeBarrierExec { input: Arc, file_column: Arc, expr: Arc, - survivors: Arc>>, ) -> Self { MergeBarrierExec { input, file_column, - survivors, + survivors: Arc::new(Mutex::new(HashSet::new())), expr, } } /// Files that have modifications to them and need to removed from the delta log - pub fn survivors(&self) -> Arc>> { + pub fn survivors(&self) -> BarrierSurvivorSet { self.survivors.clone() } } @@ -99,7 +97,6 @@ impl ExecutionPlan for MergeBarrierExec { children[0].clone(), self.file_column.clone(), self.expr.clone(), - self.survivors.clone(), ))) } @@ -191,7 +188,7 @@ struct MergeBarrierStream { state: State, input: SendableRecordBatchStream, file_column: Arc, - survivors: Arc>>, + survivors: BarrierSurvivorSet, map: HashMap, file_partitions: Vec, } @@ -200,7 +197,7 @@ impl MergeBarrierStream { pub fn new( input: SendableRecordBatchStream, schema: SchemaRef, - survivors: Arc>>, + survivors: BarrierSurvivorSet, file_column: Arc, ) -> Self { // Always allocate for a null bucket at index 0; @@ -231,21 +228,7 @@ impl Stream for MergeBarrierStream { State::Feed => { match self.input.poll_next_unpin(cx) { Poll::Ready(Some(Ok(batch))) => { - let path_column = &self.file_column; - let file_dictionary = batch - .column_by_name(path_column) - .unwrap() - .as_any() - .downcast_ref::>() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - path_column - )))? - .downcast_dict::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - path_column - )))?; + let file_dictionary = get_path_column(&batch, &self.file_column)?; // For each record batch, the key for a file path is not stable. // We can iterate through the dictionary and lookup the correspond string for each record and then lookup the correct `file_partition` for that value. @@ -361,7 +344,7 @@ impl Stream for MergeBarrierStream { PartitionBarrierState::Closed => {} PartitionBarrierState::Open => { if let Some(file_name) = &part.file_name { - lock.push(file_name.to_owned()) + lock.insert(file_name.to_owned()); } } } @@ -431,10 +414,231 @@ impl UserDefinedLogicalNodeCore for MergeBarrier { #[cfg(test)] mod tests { + use crate::delta_datafusion::barrier::MergeBarrierExec; + use crate::operations::merge::{ + TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN, + }; + use arrow::datatypes::Schema as ArrowSchema; + use arrow_array::RecordBatch; + use arrow_array::StringArray; + use arrow_array::{DictionaryArray, UInt16Array}; + use arrow_schema::DataType as ArrowDataType; + use arrow_schema::Field; + use datafusion::assert_batches_sorted_eq; + use datafusion::execution::TaskContext; + use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::physical_plan::ExecutionPlan; + use datafusion_physical_expr::expressions::Column; + use futures::StreamExt; + use std::sync::Arc; + + use super::BarrierSurvivorSet; #[tokio::test] - // TODO: - // Need to check if when null exist in dictionary keys - // Need to check nulls when in dictionary values - async fn test_barrier() {} + async fn test_barrier() { + // Validate that files without modifications are dropped and that files with changes passthrough + // File 0: No Changes + // File 1: Contains an update + // File 2: Contains a delete + // null (id: 3): is a insert + + let schema = get_schema(); + let keys = UInt16Array::from(vec![Some(0), Some(1), Some(2), None]); + let values = StringArray::from(vec![Some("file0"), Some("file1"), Some("file2")]); + let dict = DictionaryArray::new(keys, Arc::new(values)); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["0", "1", "2", "3"])), + Arc::new(dict), + //insert column + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + Some(false), + Some(false), + None, + ])), + //update column + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + None, + Some(false), + Some(false), + ])), + //delete column + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + Some(false), + None, + Some(false), + ])), + ], + ) + .unwrap(); + + let (actual, survivors) = execute(vec![batch]).await; + let expected = vec![ + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + "| id | __delta_rs_path | __delta_rs_target_insert | __delta_rs_target_update | __delta_rs_target_delete |", + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + "| 1 | file1 | false | | false |", + "| 2 | file2 | false | false | |", + "| 3 | | | false | false |", + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + + let s = survivors.lock().unwrap(); + assert!(!s.contains(&"file0".to_string())); + assert!(s.contains(&"file1".to_string())); + assert!(s.contains(&"file2".to_string())); + assert_eq!(s.len(), 2); + } + + #[tokio::test] + async fn test_barrier_changing_indicies() { + // Validate implementation can handle different dictionary indicies between batches + + let schema = get_schema(); + let mut batches = vec![]; + + // Batch 1 + let keys = UInt16Array::from(vec![Some(0), Some(1)]); + let values = StringArray::from(vec![Some("file0"), Some("file1")]); + let dict = DictionaryArray::new(keys, Arc::new(values)); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["0", "1"])), + Arc::new(dict), + //insert column + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + Some(false), + ])), + //update column + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + Some(false), + ])), + //delete column + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + Some(false), + ])), + ], + ) + .unwrap(); + batches.push(batch); + // Batch 2 + + let keys = UInt16Array::from(vec![Some(0), Some(1)]); + let values = StringArray::from(vec![Some("file1"), Some("file0")]); + let dict = DictionaryArray::new(keys, Arc::new(values)); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["2", "3"])), + Arc::new(dict), + //insert column + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + Some(false), + ])), + //update column + Arc::new(arrow::array::BooleanArray::from(vec![None, Some(false)])), + //delete column + Arc::new(arrow::array::BooleanArray::from(vec![Some(false), None])), + ], + ) + .unwrap(); + batches.push(batch); + + let (actual, _survivors) = execute(batches).await; + let expected = vec! + [ + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + "| id | __delta_rs_path | __delta_rs_target_insert | __delta_rs_target_update | __delta_rs_target_delete |", + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + "| 0 | file0 | false | false | false |", + "| 1 | file1 | false | false | false |", + "| 2 | file1 | false | | false |", + "| 3 | file0 | false | false | |", + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn test_barrier_null_paths() { + // Arrow dictionaries are interesting since a null value can be either in the keys of the dict or in the values. + // Validate they can be processed without issue + + let schema = get_schema(); + let keys = UInt16Array::from(vec![Some(0), None, Some(1)]); + let values = StringArray::from(vec![Some("file1"), None]); + let dict = DictionaryArray::new(keys, Arc::new(values)); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["1", "2", "3"])), + Arc::new(dict), + Arc::new(arrow::array::BooleanArray::from(vec![ + Some(false), + None, + None, + ])), + Arc::new(arrow::array::BooleanArray::from(vec![false, false, false])), + Arc::new(arrow::array::BooleanArray::from(vec![false, false, false])), + ], + ) + .unwrap(); + + let (actual, _) = execute(vec![batch]).await; + let expected = vec![ + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + "| id | __delta_rs_path | __delta_rs_target_insert | __delta_rs_target_update | __delta_rs_target_delete |", + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + "| 2 | | | false | false |", + "| 3 | | | false | false |", + "+----+-----------------+--------------------------+--------------------------+--------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + + async fn execute(input: Vec) -> (Vec, BarrierSurvivorSet) { + let schema = get_schema(); + let repartition = Arc::new(Column::new("__delta_rs_path", 2)); + let exec = Arc::new(MemoryExec::try_new(&[input], schema.clone(), None).unwrap()); + + let task_ctx = Arc::new(TaskContext::default()); + let merge = + MergeBarrierExec::new(exec, Arc::new("__delta_rs_path".to_string()), repartition); + + let survivors = merge.survivors(); + let coalsece = CoalesceBatchesExec::new(Arc::new(merge), 100); + let mut stream = coalsece.execute(0, task_ctx).unwrap(); + (vec![stream.next().await.unwrap().unwrap()], survivors) + } + + fn get_schema() -> Arc { + Arc::new(ArrowSchema::new(vec![ + Field::new("id", ArrowDataType::Utf8, true), + Field::new( + "__delta_rs_path", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt16), + Box::new(ArrowDataType::Utf8), + ), + true, + ), + Field::new(TARGET_INSERT_COLUMN, ArrowDataType::Boolean, true), + Field::new(TARGET_UPDATE_COLUMN, ArrowDataType::Boolean, true), + Field::new(TARGET_DELETE_COLUMN, ArrowDataType::Boolean, true), + ])) + } } diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index dc333a0c07..bef12aa9f8 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -32,7 +32,7 @@ use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaR use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; use arrow_array::types::UInt16Type; -use arrow_array::{DictionaryArray, StringArray}; +use arrow_array::{DictionaryArray, StringArray, TypedDictionaryArray}; use arrow_schema::Field; use async_trait::async_trait; use chrono::{NaiveDateTime, TimeZone, Utc}; @@ -127,6 +127,21 @@ fn get_scalar_value(value: Option<&ColumnValueStat>, field: &Arc) -> Prec } } +pub(crate) fn get_path_column<'a>( + batch: &'a RecordBatch, + path_column: &str, +) -> DeltaResult> { + let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string()); + batch + .column_by_name(path_column) + .unwrap() + .as_any() + .downcast_ref::>() + .ok_or_else(err)? + .downcast_dict::() + .ok_or_else(err) +} + impl DeltaTableState { /// Provide table level statistics to Datafusion pub fn datafusion_table_statistics(&self) -> DataFusionResult { @@ -1289,31 +1304,20 @@ fn join_batches_with_add_actions( let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum()); for batch in batches { - let array = batch.column_by_name(path_column).ok_or_else(|| { - DeltaTableError::Generic(format!("Unable to find column {}", path_column)) - })?; - - let iter: Box>> = - if dict_array { - let array = array - .as_any() - .downcast_ref::>() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - path_column - )))? - .downcast_dict::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - path_column - )))?; - Box::new(array.into_iter()) - } else { - let array = array.as_any().downcast_ref::().ok_or( - DeltaTableError::Generic(format!("Unable to downcast column {}", path_column)), - )?; - Box::new(array.into_iter()) - }; + let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string()); + + let iter: Box>> = if dict_array { + let array = get_path_column(&batch, path_column)?; + Box::new(array.into_iter()) + } else { + let array = batch + .column_by_name(path_column) + .ok_or_else(err)? + .as_any() + .downcast_ref::() + .ok_or_else(err)?; + Box::new(array.into_iter()) + }; for path in iter { let path = path.ok_or(DeltaTableError::Generic(format!( diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge.rs index c9357d324c..009fd0812d 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge.rs @@ -33,7 +33,7 @@ //! ```` use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; use arrow_schema::Schema as ArrowSchema; @@ -81,7 +81,7 @@ const SOURCE_COLUMN: &str = "__delta_rs_source"; const TARGET_COLUMN: &str = "__delta_rs_target"; const OPERATION_COLUMN: &str = "__delta_rs_operation"; -pub(crate) const DELETE_COLUMN: &str = "__delta_rs_delete"; +const DELETE_COLUMN: &str = "__delta_rs_delete"; pub(crate) const TARGET_INSERT_COLUMN: &str = "__delta_rs_target_insert"; pub(crate) const TARGET_UPDATE_COLUMN: &str = "__delta_rs_target_update"; pub(crate) const TARGET_DELETE_COLUMN: &str = "__delta_rs_target_delete"; @@ -653,12 +653,10 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { if let Some(barrier) = node.as_any().downcast_ref::() { let schema = barrier.input.schema(); let exec_schema: ArrowSchema = schema.as_ref().to_owned().into(); - let survivors = Arc::new(Mutex::new(Vec::new())); return Ok(Some(Arc::new(MergeBarrierExec::new( physical_inputs.get(0).unwrap().clone(), barrier.file_column.clone(), planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?, - survivors, )))); } @@ -737,12 +735,14 @@ async fn execute( let target = LogicalPlanBuilder::scan(target_name, target_provider, None)?.build()?; - // TODO: This is here to prevent predicate pushdowns. In the future we can replace this node to allow pushdowns depending on which operations are being used. + // Not match operations imply a full scan of the target table is required + let enable_pushdown = + not_match_source_operations.is_empty() && not_match_target_operations.is_empty(); let target = LogicalPlan::Extension(Extension { node: Arc::new(MetricObserver { id: TARGET_COUNT_ID.into(), input: target, - enable_pushdown: false, + enable_pushdown, }), }); let target = DataFrame::new(state.clone(), target); From 9a94da9a9e99526ec519b8d7876226b64b0d9122 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 17 Dec 2023 15:32:24 -0500 Subject: [PATCH 07/10] organize merge file structure --- .../src/delta_datafusion/mod.rs | 1 - .../src/delta_datafusion/physical.rs | 18 ------------- .../merge}/barrier.rs | 25 ++++++++++++++++--- .../src/operations/{merge.rs => merge/mod.rs} | 8 ++++-- 4 files changed, 27 insertions(+), 25 deletions(-) rename crates/deltalake-core/src/{delta_datafusion => operations/merge}/barrier.rs (97%) rename crates/deltalake-core/src/operations/{merge.rs => merge/mod.rs} (99%) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index bef12aa9f8..3a4d71ac46 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -81,7 +81,6 @@ use crate::{open_table, open_table_with_storage_options, DeltaTable}; const PATH_COLUMN: &str = "__delta_rs_path"; -pub mod barrier; pub mod expr; pub mod logical; pub mod physical; diff --git a/crates/deltalake-core/src/delta_datafusion/physical.rs b/crates/deltalake-core/src/delta_datafusion/physical.rs index 93308da93d..954df0b046 100644 --- a/crates/deltalake-core/src/delta_datafusion/physical.rs +++ b/crates/deltalake-core/src/delta_datafusion/physical.rs @@ -13,8 +13,6 @@ use futures::{Stream, StreamExt}; use crate::DeltaTableError; -use super::barrier::MergeBarrierExec; - // Metric Observer is used to update DataFusion metrics from a record batch. // Typically the null count for a particular column is pulled after performing a // projection since this count is easy to obtain @@ -180,19 +178,3 @@ pub(crate) fn find_metric_node( None } - -pub(crate) fn find_barrier_node(parent: &Arc) -> Option> { - //! Used to locate the physical MetricCountExec Node after the planner converts the logical node - if parent.as_any().downcast_ref::().is_some() { - return Some(parent.to_owned()); - } - - for child in &parent.children() { - let res = find_barrier_node(child); - if res.is_some() { - return res; - } - } - - None -} diff --git a/crates/deltalake-core/src/delta_datafusion/barrier.rs b/crates/deltalake-core/src/operations/merge/barrier.rs similarity index 97% rename from crates/deltalake-core/src/delta_datafusion/barrier.rs rename to crates/deltalake-core/src/operations/merge/barrier.rs index c67f04d77a..ab7b65095e 100644 --- a/crates/deltalake-core/src/delta_datafusion/barrier.rs +++ b/crates/deltalake-core/src/operations/merge/barrier.rs @@ -26,9 +26,10 @@ use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::{Distribution, PhysicalExpr}; use futures::{Stream, StreamExt}; -use crate::operations::merge::{TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN}; - -use super::get_path_column; +use crate::{ + delta_datafusion::get_path_column, + operations::merge::{TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN}, +}; pub(crate) type BarrierSurvivorSet = Arc>>; @@ -412,9 +413,25 @@ impl UserDefinedLogicalNodeCore for MergeBarrier { } } +pub(crate) fn find_barrier_node(parent: &Arc) -> Option> { + //! Used to locate the physical Barrier Node after the planner converts the logical node + if parent.as_any().downcast_ref::().is_some() { + return Some(parent.to_owned()); + } + + for child in &parent.children() { + let res = find_barrier_node(child); + if res.is_some() { + return res; + } + } + + None +} + #[cfg(test)] mod tests { - use crate::delta_datafusion::barrier::MergeBarrierExec; + use crate::operations::merge::MergeBarrierExec; use crate::operations::merge::{ TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN, }; diff --git a/crates/deltalake-core/src/operations/merge.rs b/crates/deltalake-core/src/operations/merge/mod.rs similarity index 99% rename from crates/deltalake-core/src/operations/merge.rs rename to crates/deltalake-core/src/operations/merge/mod.rs index 009fd0812d..d840c10b39 100644 --- a/crates/deltalake-core/src/operations/merge.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -61,22 +61,26 @@ use parquet::file::properties::WriterProperties; use serde::Serialize; use serde_json::Value; +use self::barrier::{MergeBarrier, MergeBarrierExec}; + use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; use super::transaction::{commit, PROTOCOL}; -use crate::delta_datafusion::barrier::{MergeBarrier, MergeBarrierExec}; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; use crate::delta_datafusion::logical::MetricObserver; -use crate::delta_datafusion::physical::{find_barrier_node, find_metric_node, MetricObserverExec}; +use crate::delta_datafusion::physical::{find_metric_node, MetricObserverExec}; use crate::delta_datafusion::{ register_store, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider, }; use crate::kernel::{Action, Remove}; use crate::logstore::LogStoreRef; +use crate::operations::merge::barrier::find_barrier_node; use crate::operations::write::write_execution_plan; use crate::protocol::{DeltaOperation, MergePredicate}; use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; +mod barrier; + const SOURCE_COLUMN: &str = "__delta_rs_source"; const TARGET_COLUMN: &str = "__delta_rs_target"; From 9434f56525a39f4cd2d6bfb56fd97df6de2fcd3c Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 17 Dec 2023 15:51:36 -0500 Subject: [PATCH 08/10] remove unwraps --- .../src/operations/merge/barrier.rs | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/crates/deltalake-core/src/operations/merge/barrier.rs b/crates/deltalake-core/src/operations/merge/barrier.rs index ab7b65095e..5b093ac2ef 100644 --- a/crates/deltalake-core/src/operations/merge/barrier.rs +++ b/crates/deltalake-core/src/operations/merge/barrier.rs @@ -29,6 +29,7 @@ use futures::{Stream, StreamExt}; use crate::{ delta_datafusion::get_path_column, operations::merge::{TARGET_DELETE_COLUMN, TARGET_INSERT_COLUMN, TARGET_UPDATE_COLUMN}, + DeltaTableError, }; pub(crate) type BarrierSurvivorSet = Arc>>; @@ -158,12 +159,12 @@ impl MergeBarrierPartition { } } - pub fn feed(&mut self, batch: RecordBatch) { + pub fn feed(&mut self, batch: RecordBatch) -> DataFusionResult<()> { match self.state { PartitionBarrierState::Closed => { - let delete_count = get_count(&batch, TARGET_DELETE_COLUMN); - let update_count = get_count(&batch, TARGET_UPDATE_COLUMN); - let insert_count = get_count(&batch, TARGET_INSERT_COLUMN); + let delete_count = get_count(&batch, TARGET_DELETE_COLUMN)?; + let update_count = get_count(&batch, TARGET_UPDATE_COLUMN)?; + let insert_count = get_count(&batch, TARGET_INSERT_COLUMN)?; self.buffer.push(batch); if insert_count > 0 || update_count > 0 || delete_count > 0 { @@ -174,6 +175,7 @@ impl MergeBarrierPartition { self.buffer.push(batch); } } + Ok(()) } pub fn drain(&mut self) -> Option { @@ -216,8 +218,10 @@ impl MergeBarrierStream { } } -fn get_count(batch: &RecordBatch, column: &str) -> usize { - batch.column_by_name(column).unwrap().null_count() +fn get_count(batch: &RecordBatch, column: &str) -> DataFusionResult { + batch.column_by_name(column) + .map(|array| array.null_count()) + .ok_or_else(|| DataFusionError::External(Box::new(DeltaTableError::Generic("Required operation column is missing".to_string())))) } impl Stream for MergeBarrierStream { @@ -288,6 +292,7 @@ impl Stream for MergeBarrierStream { }) .collect::>>()?; + // This unwrap is safe since the processed batched has the same schema let batch = RecordBatch::try_new(batch.schema(), columns).unwrap(); @@ -298,7 +303,7 @@ impl Stream for MergeBarrierStream { for batch in batches { match batch { Ok((partition, batch)) => { - self.file_partitions[partition].feed(batch); + self.file_partitions[partition].feed(batch)?; } Err(err) => { self.state = State::Abort; @@ -339,7 +344,11 @@ impl Stream for MergeBarrierStream { } { - let mut lock = self.survivors.lock().unwrap(); + let mut lock = self.survivors.lock().map_err(|_| { + DataFusionError::External(Box::new(DeltaTableError::Generic( + "MergeBarrier mutex is poisoned".to_string(), + ))) + })?; for part in &self.file_partitions { match part.state { PartitionBarrierState::Closed => {} From 1e7a946d8c18f1902b60f108e15a22982e99a617 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 17 Dec 2023 16:09:17 -0500 Subject: [PATCH 09/10] fix merge append test --- crates/deltalake-core/src/operations/merge/barrier.rs | 11 ++++++++--- crates/deltalake-core/src/operations/merge/mod.rs | 6 +++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/deltalake-core/src/operations/merge/barrier.rs b/crates/deltalake-core/src/operations/merge/barrier.rs index 5b093ac2ef..6883f61253 100644 --- a/crates/deltalake-core/src/operations/merge/barrier.rs +++ b/crates/deltalake-core/src/operations/merge/barrier.rs @@ -219,9 +219,14 @@ impl MergeBarrierStream { } fn get_count(batch: &RecordBatch, column: &str) -> DataFusionResult { - batch.column_by_name(column) - .map(|array| array.null_count()) - .ok_or_else(|| DataFusionError::External(Box::new(DeltaTableError::Generic("Required operation column is missing".to_string())))) + batch + .column_by_name(column) + .map(|array| array.null_count()) + .ok_or_else(|| { + DataFusionError::External(Box::new(DeltaTableError::Generic( + "Required operation column is missing".to_string(), + ))) + }) } impl Stream for MergeBarrierStream { diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index d840c10b39..f171aa40a2 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -1255,16 +1255,16 @@ mod tests { let schema = get_arrow_schema(&None); let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await; // append some data - let _table = write_data(table, &schema).await; + let table = write_data(table, &schema).await; // merge - /* let _err = DeltaOps(table) .merge(merge_source(schema), col("target.id").eq(col("source.id"))) .with_source_alias("source") .with_target_alias("target") + .when_not_matched_by_source_delete(|delete| delete) + .unwrap() .await .expect_err("Remove action is included when Delta table is append-only. Should error"); - */ } async fn write_data(table: DeltaTable, schema: &Arc) -> DeltaTable { From 51a4d709c29fd45e2911886aefc9227c170aa45d Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 17 Dec 2023 16:23:15 -0500 Subject: [PATCH 10/10] remove unneed df config --- crates/deltalake-core/src/operations/merge/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index f171aa40a2..08af4d65ef 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -1172,10 +1172,7 @@ impl std::future::IntoFuture for MergeBuilder { let state = this.state.unwrap_or_else(|| { //TODO: Datafusion's Hashjoin has some memory issues. Running with all cores results in a OoM. Can be removed when upstream improvemetns are made. let config: SessionConfig = DeltaSessionConfig::default().into(); - let config = config - .with_target_partitions(1) - //TODO: Datafusion physical optimizer has a bug that inserts a coalesece in a bad spot - .with_coalesce_batches(true); + let config = config.with_target_partitions(1); let session = SessionContext::new_with_config(config); // If a user provides their own their DF state then they must register the store themselves