Skip to content

Commit

Permalink
Merge branch 'main' of github.com:apache/datafusion into dev/xinli/va…
Browse files Browse the repository at this point in the history
…lue-normal
  • Loading branch information
xinlifoobar committed Jul 17, 2024
2 parents 02b78fb + b0925c8 commit 6a02169
Show file tree
Hide file tree
Showing 192 changed files with 18,733 additions and 2,162 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/dev_pr/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

development-process:
- changed-files:
- any-glob-to-any-file: ['dev/**.*', '.github/**.*', 'ci/**.*', '.asf.yaml']
- any-glob-to-any-file: ['dev/**/*', '.github/**/*', 'ci/**/*', '.asf.yaml']

documentation:
- changed-files:
- any-glob-to-any-file: ['docs/**.*', 'README.md', './**/README.md', 'DEVELOPERS.md', 'datafusion/docs/**.*']
- any-glob-to-any-file: ['docs/**/*', 'README.md', './**/README.md', 'DEVELOPERS.md', 'datafusion/docs/**/*']

sql:
- changed-files:
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ rand = "0.8"
regex = "1.8"
rstest = "0.21.0"
serde_json = "1"
sqlparser = { version = "0.47", features = ["visitor"] }
sqlparser = { version = "0.48", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down
5 changes: 3 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;

use async_trait::async_trait;
use dirs::home_dir;
Expand Down Expand Up @@ -162,6 +163,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let mut builder = SessionStateBuilder::from(state.clone());
let optimized_name = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_name.as_str())?;
let scheme = table_url.scheme();
Expand All @@ -178,13 +180,18 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// to any command options so the only choice is to use an empty collection
match scheme {
"s3" | "oss" | "cos" => {
state = state.add_table_options_extension(AwsOptions::default());
if let Some(table_options) = builder.table_options() {
table_options.extensions.insert(AwsOptions::default())
}
}
"gs" | "gcs" => {
state = state.add_table_options_extension(GcpOptions::default())
if let Some(table_options) = builder.table_options() {
table_options.extensions.insert(GcpOptions::default())
}
}
_ => {}
};
state = builder.build();
let store = get_object_store(
&state,
table_url.scheme(),
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ cargo run --example dataframe
- [`parquet_index.rs`](examples/parquet_index.rs): Create an secondary index over several parquet files and use it to speed up queries
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
- [`parquet_exec_visitor.rs`](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into Datafusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from Datafusion `Expr` and `LogicalPlan`
- [`parse_sql_expr.rs`](examples/parse_sql_expr.rs): Parse SQL text into DataFusion `Expr`.
- [`plan_to_sql.rs`](examples/plan_to_sql.rs): Generate SQL from DataFusion `Expr` and `LogicalPlan`
- [`pruning.rs`](examples/pruning.rs): Use pruning to rule out files based on statistics
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP
Expand Down
80 changes: 40 additions & 40 deletions datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@
//! DeltaScan
//! ```
use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

use datafusion::common::Result;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_common::internal_err;
use datafusion_common::{internal_err, DataFusionError};
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::ScalarUDF;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use datafusion_proto::protobuf;
use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -239,53 +240,52 @@ struct ComposedPhysicalExtensionCodec {
codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
}

impl ComposedPhysicalExtensionCodec {
fn try_any<T>(
&self,
mut f: impl FnMut(&dyn PhysicalExtensionCodec) -> Result<T>,
) -> Result<T> {
let mut last_err = None;
for codec in &self.codecs {
match f(codec.as_ref()) {
Ok(node) => return Ok(node),
Err(err) => last_err = Some(err),
}
}

Err(last_err.unwrap_or_else(|| {
DataFusionError::NotImplemented("Empty list of composed codecs".to_owned())
}))
}
}

impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
registry: &dyn FunctionRegistry,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_decode(buf, inputs, registry) {
Ok(plan) => return Ok(plan),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
self.try_any(|codec| codec.try_decode(buf, inputs, registry))
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_encode(node.clone(), buf) {
Ok(_) => return Ok(()),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
self.try_any(|codec| codec.try_encode(node.clone(), buf))
}

fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_decode_udf(name, _buf) {
Ok(plan) => return Ok(plan),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
self.try_any(|codec| codec.try_decode_udf(name, buf))
}

fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_encode_udf(_node, _buf) {
Ok(_) => return Ok(()),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
self.try_any(|codec| codec.try_encode_udf(node, buf))
}

fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
self.try_any(|codec| codec.try_decode_udaf(name, buf))
}

fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
self.try_any(|codec| codec.try_encode_udaf(node, buf))
}
}
9 changes: 4 additions & 5 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::{
datatypes::UInt64Type,
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::{
datasource::{
file_format::{
Expand All @@ -32,9 +33,9 @@ use datafusion::{
MemTable,
},
error::Result,
execution::{context::SessionState, runtime_env::RuntimeEnv},
execution::context::SessionState,
physical_plan::ExecutionPlan,
prelude::{SessionConfig, SessionContext},
prelude::SessionContext,
};
use datafusion_common::{GetExt, Statistics};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand Down Expand Up @@ -176,9 +177,7 @@ impl GetExt for TSVFileFactory {
#[tokio::main]
async fn main() -> Result<()> {
// Create a new context with the default configuration
let config = SessionConfig::new();
let runtime = RuntimeEnv::default();
let mut state = SessionState::new_with_config_rt(config, Arc::new(runtime));
let mut state = SessionStateBuilder::new().with_default_features().build();

// Register the custom file format
let file_format = Arc::new(TSVFileFactory::new());
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async fn main() -> Result<()> {
Ok(())
}

/// Datafusion's `expr_fn` API makes it easy to create [`Expr`]s for the
/// DataFusion's `expr_fn` API makes it easy to create [`Expr`]s for the
/// full range of expression types such as aggregates and window functions.
fn expr_fn_demo() -> Result<()> {
// Let's say you want to call the "first_value" aggregate function
Expand Down
9 changes: 9 additions & 0 deletions datafusion-examples/examples/parse_sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,14 @@ async fn round_trip_parse_sql_expr_demo() -> Result<()> {

assert_eq!(sql, round_trip_sql);

// enable pretty-unparsing. This make the output more human-readable
// but can be problematic when passed to other SQL engines due to
// difference in precedence rules between DataFusion and target engines.
let unparser = Unparser::default().with_pretty(true);

let pretty = "int_col < 5 OR double_col = 8";
let pretty_round_trip_sql = unparser.expr_to_sql(&parsed_expr)?.to_string();
assert_eq!(pretty, pretty_round_trip_sql);

Ok(())
}
24 changes: 19 additions & 5 deletions datafusion-examples/examples/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use datafusion::error::Result;

use datafusion::prelude::*;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_sql::unparser::dialect::CustomDialect;
use datafusion_sql::unparser::dialect::CustomDialectBuilder;
use datafusion_sql::unparser::{plan_to_sql, Unparser};

/// This example demonstrates the programmatic construction of SQL strings using
Expand All @@ -31,9 +31,9 @@ use datafusion_sql::unparser::{plan_to_sql, Unparser};
/// 1. [`simple_expr_to_sql_demo`]: Create a simple expression [`Exprs`] with
/// fluent API and convert to sql suitable for passing to another database
///
/// 2. [`simple_expr_to_sql_demo_no_escape`] Create a simple expression
/// [`Exprs`] with fluent API and convert to sql without escaping column names
/// more suitable for displaying to humans.
/// 2. [`simple_expr_to_pretty_sql_demo`] Create a simple expression
/// [`Exprs`] with fluent API and convert to sql without extra parentheses,
/// suitable for displaying to humans
///
/// 3. [`simple_expr_to_sql_demo_escape_mysql_style`]" Create a simple
/// expression [`Exprs`] with fluent API and convert to sql escaping column
Expand All @@ -49,6 +49,7 @@ use datafusion_sql::unparser::{plan_to_sql, Unparser};
async fn main() -> Result<()> {
// See how to evaluate expressions
simple_expr_to_sql_demo()?;
simple_expr_to_pretty_sql_demo()?;
simple_expr_to_sql_demo_escape_mysql_style()?;
simple_plan_to_sql_demo().await?;
round_trip_plan_to_sql_demo().await?;
Expand All @@ -64,11 +65,24 @@ fn simple_expr_to_sql_demo() -> Result<()> {
Ok(())
}

/// DataFusioon can remove parentheses when converting an expression to SQL.
/// Note that output is intended for humans, not for other SQL engines,
/// as difference in precedence rules can cause expressions to be parsed differently.
fn simple_expr_to_pretty_sql_demo() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let unparser = Unparser::default().with_pretty(true);
let sql = unparser.expr_to_sql(&expr)?.to_string();
assert_eq!(sql, r#"a < 5 OR a = 8"#);
Ok(())
}

/// DataFusion can convert expressions to SQL without escaping column names using
/// using a custom dialect and an explicit unparser
fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let dialect = CustomDialect::new(Some('`'));
let dialect = CustomDialectBuilder::new()
.with_identifier_quote_style('`')
.build();
let unparser = Unparser::new(&dialect);
let sql = unparser.expr_to_sql(&expr)?.to_string();
assert_eq!(sql, r#"((`a` < 5) OR (`a` = 8))"#);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ config_namespace! {
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false

/// Should Datafusion keep the columns used for partition_by in the output RecordBatches
/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/csv_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
if let Some(v) = &value.timestamp_format {
builder = builder.with_timestamp_format(v.into())
}
if let Some(v) = &value.timestamp_tz_format {
builder = builder.with_timestamp_tz_format(v.into())
}
if let Some(v) = &value.time_format {
builder = builder.with_time_format(v.into())
}
Expand Down
13 changes: 6 additions & 7 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1682,8 +1682,10 @@ impl ScalarValue {
DataType::UInt16 => build_array_primitive!(UInt16Array, UInt16),
DataType::UInt32 => build_array_primitive!(UInt32Array, UInt32),
DataType::UInt64 => build_array_primitive!(UInt64Array, UInt64),
DataType::Utf8View => build_array_string!(StringViewArray, Utf8View),
DataType::Utf8 => build_array_string!(StringArray, Utf8),
DataType::LargeUtf8 => build_array_string!(LargeStringArray, LargeUtf8),
DataType::BinaryView => build_array_string!(BinaryViewArray, BinaryView),
DataType::Binary => build_array_string!(BinaryArray, Binary),
DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary),
DataType::Date32 => build_array_primitive!(Date32Array, Date32),
Expand Down Expand Up @@ -1841,8 +1843,6 @@ impl ScalarValue {
| DataType::Time64(TimeUnit::Millisecond)
| DataType::Map(_, _)
| DataType::RunEndEncoded(_, _)
| DataType::Utf8View
| DataType::BinaryView
| DataType::ListView(_)
| DataType::LargeListView(_) => {
return _internal_err!(
Expand Down Expand Up @@ -2678,7 +2678,10 @@ impl ScalarValue {
DataType::Duration(TimeUnit::Nanosecond) => {
typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond)?
}

DataType::Map(_, _) => {
let a = array.slice(index, 1);
Self::Map(Arc::new(a.as_map().to_owned()))
}
other => {
return _not_impl_err!(
"Can't create a scalar from array of type \"{other:?}\""
Expand Down Expand Up @@ -5692,16 +5695,12 @@ mod tests {
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
);

// needs https://github.com/apache/arrow-rs/issues/5893
/*
check_scalar_cast(ScalarValue::Utf8(None), DataType::Utf8View);
check_scalar_cast(ScalarValue::from("foo"), DataType::Utf8View);
check_scalar_cast(
ScalarValue::from("larger than 12 bytes string"),
DataType::Utf8View,
);
*/
}

// mimics how casting work on scalar values by `casting` `scalar` to `desired_type`
Expand Down
Loading

0 comments on commit 6a02169

Please sign in to comment.