Skip to content

Commit

Permalink
Fix fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuqi-lucas committed Dec 23, 2024
1 parent 6eb83f7 commit 4a02a8f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 25 deletions.
89 changes: 65 additions & 24 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,20 @@ use crate::physical_plan::{
collect, collect_partitioned, execute_stream, execute_stream_partitioned,
ExecutionPlan, SendableRecordBatchStream,
};
use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, SessionContext};
use crate::prelude::{
CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, SessionContext,
};

use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field};
use arrow_array::Int32Array;
use arrow_schema::{Schema, SchemaRef};
use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{assert_batches_eq, exec_err, not_impl_err, plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions};
use datafusion_common::{
assert_batches_eq, exec_err, not_impl_err, plan_err, Column, DFSchema,
DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
Expand All @@ -61,8 +66,8 @@ use datafusion_functions_aggregate::expr_fn::{
};

use async_trait::async_trait;
use tempfile::TempDir;
use datafusion_catalog::Session;
use tempfile::TempDir;

/// Contains options that control how data is
/// written out from a DataFrame
Expand Down Expand Up @@ -1530,7 +1535,9 @@ impl DataFrame {
let plan = if write_options.sort_by.is_empty() {
self.plan
} else {
LogicalPlanBuilder::from(self.plan).sort(write_options.sort_by)?.build()?
LogicalPlanBuilder::from(self.plan)
.sort(write_options.sort_by)?
.build()?
};

let plan = LogicalPlanBuilder::insert_into(
Expand Down Expand Up @@ -1596,7 +1603,9 @@ impl DataFrame {
let plan = if options.sort_by.is_empty() {
self.plan
} else {
LogicalPlanBuilder::from(self.plan).sort(options.sort_by)?.build()?
LogicalPlanBuilder::from(self.plan)
.sort(options.sort_by)?
.build()?
};

let plan = LogicalPlanBuilder::copy_to(
Expand Down Expand Up @@ -1663,7 +1672,9 @@ impl DataFrame {
let plan = if options.sort_by.is_empty() {
self.plan
} else {
LogicalPlanBuilder::from(self.plan).sort(options.sort_by)?.build()?
LogicalPlanBuilder::from(self.plan)
.sort(options.sort_by)?
.build()?
};

let plan = LogicalPlanBuilder::copy_to(
Expand Down Expand Up @@ -4100,19 +4111,29 @@ async fn write_parquet_with_order() -> Result<()> {
let write_df = ctx.read_batch(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 5, 7, 3,2])),
Arc::new(Int32Array::from(vec![2, 3, 4, 5,6])),
Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])),
Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
],
)?)?;

let test_path = tmp_dir.path().join("test.parquet");

write_df.clone().write_parquet(test_path.to_str().unwrap(), DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]),
None,).await?;
write_df
.clone()
.write_parquet(
test_path.to_str().unwrap(),
DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]),
None,
)
.await?;

let ctx = SessionContext::new();
ctx.register_parquet("data", test_path.to_str().unwrap(), ParquetReadOptions::default())
.await?;
ctx.register_parquet(
"data",
test_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?;

let df = ctx.sql("SELECT * FROM data").await?;
let results = df.collect().await?;
Expand Down Expand Up @@ -4147,19 +4168,29 @@ async fn write_csv_with_order() -> Result<()> {
let write_df = ctx.read_batch(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 5, 7, 3,2])),
Arc::new(Int32Array::from(vec![2, 3, 4, 5,6])),
Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])),
Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
],
)?)?;

let test_path = tmp_dir.path().join("test.csv");

write_df.clone().write_csv(test_path.to_str().unwrap(), DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]),
None,).await?;
write_df
.clone()
.write_csv(
test_path.to_str().unwrap(),
DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]),
None,
)
.await?;

let ctx = SessionContext::new();
ctx.register_csv("data", test_path.to_str().unwrap(), CsvReadOptions::new().schema(&schema))
.await?;
ctx.register_csv(
"data",
test_path.to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

let df = ctx.sql("SELECT * FROM data").await?;
let results = df.collect().await?;
Expand Down Expand Up @@ -4194,19 +4225,29 @@ async fn write_json_with_order() -> Result<()> {
let write_df = ctx.read_batch(RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 5, 7, 3,2])),
Arc::new(Int32Array::from(vec![2, 3, 4, 5,6])),
Arc::new(Int32Array::from(vec![1, 5, 7, 3, 2])),
Arc::new(Int32Array::from(vec![2, 3, 4, 5, 6])),
],
)?)?;

let test_path = tmp_dir.path().join("test.json");

write_df.clone().write_json(test_path.to_str().unwrap(), DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]),
None,).await?;
write_df
.clone()
.write_json(
test_path.to_str().unwrap(),
DataFrameWriteOptions::new().with_sort_by(vec![col("a").sort(true, true)]),
None,
)
.await?;

let ctx = SessionContext::new();
ctx.register_json("data", test_path.to_str().unwrap(), NdJsonReadOptions::default().schema(&schema))
.await?;
ctx.register_json(
"data",
test_path.to_str().unwrap(),
NdJsonReadOptions::default().schema(&schema),
)
.await?;

let df = ctx.sql("SELECT * FROM data").await?;
let results = df.collect().await?;
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ impl DataFrame {
let plan = if options.sort_by.is_empty() {
self.plan
} else {
LogicalPlanBuilder::from(self.plan).sort(options.sort_by)?.build()?
LogicalPlanBuilder::from(self.plan)
.sort(options.sort_by)?
.build()?
};

let plan = LogicalPlanBuilder::copy_to(
Expand Down

0 comments on commit 4a02a8f

Please sign in to comment.