diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index e9ad8db95e47..745c4496a67c 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -42,7 +42,9 @@ use crate::physical_plan::{ collect, collect_partitioned, execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, }; -use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, SessionContext}; +use crate::prelude::{ + CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, SessionContext, +}; use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; @@ -50,7 +52,10 @@ use arrow::datatypes::{DataType, Field}; use arrow_array::Int32Array; use arrow_schema::{Schema, SchemaRef}; use datafusion_common::config::{CsvOptions, JsonOptions}; -use datafusion_common::{assert_batches_eq, exec_err, not_impl_err, plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions}; +use datafusion_common::{ + assert_batches_eq, exec_err, not_impl_err, plan_err, Column, DFSchema, + DataFusionError, ParamValues, SchemaError, UnnestOptions, +}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{case, is_null, lit, SortExpr}; use datafusion_expr::{ @@ -61,8 +66,8 @@ use datafusion_functions_aggregate::expr_fn::{ }; use async_trait::async_trait; -use tempfile::TempDir; use datafusion_catalog::Session; +use tempfile::TempDir; /// Contains options that control how data is /// written out from a DataFrame @@ -1530,7 +1535,9 @@ impl DataFrame { let plan = if write_options.sort_by.is_empty() { self.plan } else { - LogicalPlanBuilder::from(self.plan).sort(write_options.sort_by)?.build()? + LogicalPlanBuilder::from(self.plan) + .sort(write_options.sort_by)? + .build()? }; let plan = LogicalPlanBuilder::insert_into( @@ -1596,7 +1603,9 @@ impl DataFrame { let plan = if options.sort_by.is_empty() { self.plan } else { - LogicalPlanBuilder::from(self.plan).sort(options.sort_by)?.build()? + LogicalPlanBuilder::from(self.plan) + .sort(options.sort_by)? + .build()? }; let plan = LogicalPlanBuilder::copy_to( @@ -1663,7 +1672,9 @@ impl DataFrame { let plan = if options.sort_by.is_empty() { self.plan } else { - LogicalPlanBuilder::from(self.plan).sort(options.sort_by)?.build()? + LogicalPlanBuilder::from(self.plan) + .sort(options.sort_by)? + .build()? }; let plan = LogicalPlanBuilder::copy_to( @@ -4100,19 +4111,29 @@ async fn write_parquet_with_order() -> Result<()> { let write_df = ctx.read_batch(RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Int32Array::from(vec![1, 5, 7, 3,2])), - Arc::new(Int32Array::from(vec![2, 3, 4, 5,6])), + Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])), + Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])), ], )?)?; let test_path = tmp_dir.path().join("test.parquet"); - write_df.clone().write_parquet(test_path.to_str().unwrap(), DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]), - None,).await?; + write_df + .clone() + .write_parquet( + test_path.to_str().unwrap(), + DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]), + None, + ) + .await?; let ctx = SessionContext::new(); - ctx.register_parquet("data", test_path.to_str().unwrap(), ParquetReadOptions::default()) - .await?; + ctx.register_parquet( + "data", + test_path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; let df = ctx.sql("SELECT * FROM data").await?; let results = df.collect().await?; @@ -4147,19 +4168,29 @@ async fn write_csv_with_order() -> Result<()> { let write_df = ctx.read_batch(RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Int32Array::from(vec![1, 5, 7, 3,2])), - Arc::new(Int32Array::from(vec![2, 3, 4, 5,6])), + Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])), + Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])), ], )?)?; let test_path = tmp_dir.path().join("test.csv"); - write_df.clone().write_csv(test_path.to_str().unwrap(), DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]), - None,).await?; + write_df + .clone() + .write_csv( + test_path.to_str().unwrap(), + DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]), + None, + ) + .await?; let ctx = SessionContext::new(); - ctx.register_csv("data", test_path.to_str().unwrap(), CsvReadOptions::new().schema(&schema)) - .await?; + ctx.register_csv( + "data", + test_path.to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await?; let df = ctx.sql("SELECT * FROM data").await?; let results = df.collect().await?; @@ -4194,19 +4225,29 @@ async fn write_json_with_order() -> Result<()> { let write_df = ctx.read_batch(RecordBatch::try_new( schema.clone(), vec![ - Arc::new(Int32Array::from(vec![1, 5, 7, 3,2])), - Arc::new(Int32Array::from(vec![2, 3, 4, 5,6])), + Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])), + Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])), ], )?)?; let test_path = tmp_dir.path().join("test.json"); - write_df.clone().write_json(test_path.to_str().unwrap(), DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]), - None,).await?; + write_df + .clone() + .write_json( + test_path.to_str().unwrap(), + DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]), + None, + ) + .await?; let ctx = SessionContext::new(); - ctx.register_json("data", test_path.to_str().unwrap(), NdJsonReadOptions::default().schema(&schema)) - .await?; + ctx.register_json( + "data", + test_path.to_str().unwrap(), + NdJsonReadOptions::default().schema(&schema), + ) + .await?; let df = ctx.sql("SELECT * FROM data").await?; let results = df.collect().await?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 0f3d1b72264d..1dd4d68fca6b 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -77,7 +77,9 @@ impl DataFrame { let plan = if options.sort_by.is_empty() { self.plan } else { - LogicalPlanBuilder::from(self.plan).sort(options.sort_by)?.build()? + LogicalPlanBuilder::from(self.plan) + .sort(options.sort_by)? + .build()? }; let plan = LogicalPlanBuilder::copy_to(