Skip to content

Commit

Permalink
Fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
gokselk committed Dec 23, 2024
2 parents 7062d9e + 8fd792f commit 40c97bd
Show file tree
Hide file tree
Showing 148 changed files with 5,435 additions and 2,418 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ recursive = "0.1.1"
regex = "1.8"
rstest = "0.23.0"
serde_json = "1"
sqlparser = { version = "0.52.0", features = ["visitor"] }
sqlparser = { version = "0.53.0", features = ["visitor"] }
tempfile = "3"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
url = "2.2"
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ Default features:
- `parquet`: support for reading the [Apache Parquet] format
- `regex_expressions`: regular expression functions, such as `regexp_match`
- `unicode_expressions`: Include unicode aware functions such as `character_length`
- `unparser` : enables support to reverse LogicalPlans back into SQL
- `unparser`: enables support to reverse LogicalPlans back into SQL
- `recursive-protection`: uses [recursive](https://docs.rs/recursive/latest/recursive/) for stack overflow protection.

Optional features:

Expand Down
27 changes: 18 additions & 9 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ use crate::{
};

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
Expand Down Expand Up @@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.execution_mode().is_unbounded() {
if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
the source finishes, but the source is unbounded"
);
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(schema, &results, now)?;
Expand Down
9 changes: 5 additions & 4 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,13 @@ mod tests {

#[tokio::test]
async fn s3_object_store_builder() -> Result<()> {
let access_key_id = "fake_access_key_id";
let secret_access_key = "fake_secret_access_key";
// "fake" is uppercase to ensure the values are not lowercased when parsed
let access_key_id = "FAKE_access_key_id";
let secret_access_key = "FAKE_secret_access_key";
let region = "fake_us-east-2";
let endpoint = "endpoint33";
let session_token = "fake_session_token";
let location = "s3://bucket/path/file.parquet";
let session_token = "FAKE_session_token";
let location = "s3://bucket/path/FAKE/file.parquet";

let table_url = ListingTableUrl::parse(location)?;
let scheme = table_url.scheme();
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ cargo run --example dataframe
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data
- [`dataframe_output.rs`](examples/dataframe_output.rs): Examples of methods which write data out from a DataFrame
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;

Expand Down Expand Up @@ -214,7 +215,8 @@ impl CustomExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
67 changes: 67 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::config::CsvOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::DataFusionError;
use std::fs::File;
use std::io::Write;
use std::sync::Arc;
Expand All @@ -29,13 +33,19 @@ use tempfile::tempdir;
/// * [read_parquet]: execute queries against parquet files
/// * [read_csv]: execute queries against csv files
/// * [read_memory]: execute queries against in-memory arrow data
///
/// This example demonstrates the various methods to write out a DataFrame to local storage.
/// See datafusion-examples/examples/external_dependency/dataframe-to-s3.rs for an example
/// using a remote object store.
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
#[tokio::main]
async fn main() -> Result<()> {
// The SessionContext is the main high level API for interacting with DataFusion
let ctx = SessionContext::new();
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
write_out(&ctx).await?;
Ok(())
}

Expand Down Expand Up @@ -139,3 +149,60 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> {

Ok(())
}

/// Use the DataFrame API to:
/// 1. Write out a DataFrame to a table
/// 2. Write out a DataFrame to a parquet file
/// 3. Write out a DataFrame to a csv file
/// 4. Write out a DataFrame to a json file
async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionError> {
let mut df = ctx.sql("values ('a'), ('b'), ('c')").await.unwrap();

// Ensure the column names and types match the target table
df = df.with_column_renamed("column1", "tablecol1").unwrap();

ctx.sql(
"create external table
test(tablecol1 varchar)
stored as parquet
location './datafusion-examples/test_table/'",
)
.await?
.collect()
.await?;

// This is equivalent to INSERT INTO test VALUES ('a'), ('b'), ('c').
// The behavior of write_table depends on the TableProvider's implementation
// of the insert_into method.
df.clone()
.write_table("test", DataFrameWriteOptions::new())
.await?;

df.clone()
.write_parquet(
"./datafusion-examples/test_parquet/",
DataFrameWriteOptions::new(),
None,
)
.await?;

df.clone()
.write_csv(
"./datafusion-examples/test_csv/",
// DataFrameWriteOptions contains options which control how data is written
// such as compression codec
DataFrameWriteOptions::new(),
Some(CsvOptions::default().with_compression(CompressionTypeVariant::GZIP)),
)
.await?;

df.clone()
.write_json(
"./datafusion-examples/test_json/",
DataFrameWriteOptions::new(),
None,
)
.await?;

Ok(())
}
78 changes: 0 additions & 78 deletions datafusion-examples/examples/dataframe_output.rs

This file was deleted.

5 changes: 4 additions & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ name = "datafusion_common"
path = "src/lib.rs"

[features]
default = ["recursive-protection"]
avro = ["apache-avro"]
backtrace = []
pyarrow = ["pyo3", "arrow/pyarrow", "parquet"]
force_hash_collisions = []
recursive-protection = ["dep:recursive"]

[dependencies]
ahash = { workspace = true }
Expand All @@ -57,11 +59,12 @@ half = { workspace = true }
hashbrown = { workspace = true }
indexmap = { workspace = true }
libc = "0.2.140"
log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.22.0", optional = true }
recursive = { workspace = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }

Expand Down
Loading

0 comments on commit 40c97bd

Please sign in to comment.