From 96dbeeca59a349884fbfb68c535acf43db12b015 Mon Sep 17 00:00:00 2001 From: Mike Seddon Date: Tue, 16 Feb 2021 05:54:20 +0100 Subject: [PATCH] ARROW-11616: [Rust][DataFusion] Add collect_partitioned on DataFrame The DataFrame API has a `collect` method which invokes the `collect(plan: Arc) -> Result>` function which will collect records into a single vector of RecordBatches removing any partitioning via `MergeExec`. This PR adds the DataFrame `collect_partitioned` method so that partitioning can be maintained. This allows easy passing into a new `MemTable`. @andygrove Closes #9485 from seddonm1/expose_collect_partitioned Authored-by: Mike Seddon Signed-off-by: Jorge C. Leitao --- rust/datafusion/src/dataframe.rs | 16 ++++++++++++++++ rust/datafusion/src/execution/dataframe_impl.rs | 17 +++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs index ca4ecc89dfb96..ceb5ca65f5edd 100644 --- a/rust/datafusion/src/dataframe.rs +++ b/rust/datafusion/src/dataframe.rs @@ -206,6 +206,22 @@ pub trait DataFrame: Send + Sync { /// ``` async fn collect(&self) -> Result>; + /// Executes this DataFrame and collects all results into a vector of vector of RecordBatch + /// maintaining the input partitioning. + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let mut ctx = ExecutionContext::new(); + /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?; + /// let batches = df.collect_partitioned().await?; + /// # Ok(()) + /// # } + /// ``` + async fn collect_partitioned(&self) -> Result>>; + /// Returns the schema describing the output of this DataFrame in terms of columns returned, /// where each column has a name, data type, and nullability attribute. diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs index c9a1ff9dd261a..3a0931d8ccc49 100644 --- a/rust/datafusion/src/execution/dataframe_impl.rs +++ b/rust/datafusion/src/execution/dataframe_impl.rs @@ -19,14 +19,17 @@ use std::sync::{Arc, Mutex}; -use crate::dataframe::*; +use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; use crate::logical_plan::{ col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, }; -use crate::{arrow::record_batch::RecordBatch, physical_plan::collect}; +use crate::{ + dataframe::*, + physical_plan::{collect, collect_partitioned}, +}; use async_trait::async_trait; @@ -137,6 +140,16 @@ impl DataFrame for DataFrameImpl { Ok(collect(plan).await?) } + // Convert the logical plan represented by this DataFrame into a physical plan and + // execute it + async fn collect_partitioned(&self) -> Result>> { + let state = self.ctx_state.lock().unwrap().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + let plan = ctx.optimize(&self.plan)?; + let plan = ctx.create_physical_plan(&plan)?; + Ok(collect_partitioned(plan).await?) + } + /// Returns the schema from the logical plan fn schema(&self) -> &DFSchema { self.plan.schema()