Skip to content

Commit

Permalink
Merge branch 'main' into df-34-arrow-49
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Dec 20, 2023
2 parents 198020e + 11ea2a5 commit f6cdaa6
Show file tree
Hide file tree
Showing 2 changed files with 640 additions and 20 deletions.
26 changes: 26 additions & 0 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use datafusion_sql::planner::ParserOptions;
use futures::TryStreamExt;

use itertools::Itertools;
use log::error;
Expand Down Expand Up @@ -1019,6 +1020,31 @@ pub(crate) fn logical_expr_to_physical_expr(
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

pub(crate) async fn execute_plan_to_batch(
state: &SessionState,
plan: Arc<dyn ExecutionPlan>,
) -> DeltaResult<arrow::record_batch::RecordBatch> {
let data =
futures::future::try_join_all((0..plan.output_partitioning().partition_count()).map(|p| {
let plan_copy = plan.clone();
let task_context = state.task_ctx().clone();
async move {
let batch_stream = plan_copy.execute(p, task_context)?;

let schema = batch_stream.schema();

let batches = batch_stream.try_collect::<Vec<_>>().await?;

DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?)
}
}))
.await?;

let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?;

Ok(batch)
}

/// Responsible for checking batches of data conform to table's invariants.
#[derive(Clone)]
pub struct DeltaDataChecker {
Expand Down
Loading

0 comments on commit f6cdaa6

Please sign in to comment.