Skip to content

Commit 4442c54

Browse files
authored
Merge branch 'main' into case_reduce_filtering
2 parents f22e7bc + 35b2e35 commit 4442c54

File tree

21 files changed

+405
-283
lines changed

21 files changed

+405
-283
lines changed

.github/workflows/rust.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,19 @@ jobs:
353353
with:
354354
save-if: ${{ github.ref_name == 'main' }}
355355
shared-key: "amd-ci-linux-test-example"
356+
- name: Remove unnecessary preinstalled software
357+
run: |
358+
echo "Disk space before cleanup:"
359+
df -h
360+
apt-get clean
361+
rm -rf /__t/CodeQL
362+
rm -rf /__t/PyPy
363+
rm -rf /__t/Java_Temurin-Hotspot_jdk
364+
rm -rf /__t/Python
365+
rm -rf /__t/go
366+
rm -rf /__t/Ruby
367+
echo "Disk space after cleanup:"
368+
df -h
356369
- name: Run examples
357370
run: |
358371
# test datafusion-sql examples

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/src/imdb/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ mod tests {
534534
let plan = ctx.sql(&query).await?;
535535
let plan = plan.into_optimized_plan()?;
536536
let bytes = logical_plan_to_bytes(&plan)?;
537-
let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
537+
let plan2 = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
538538
let plan_formatted = format!("{}", plan.display_indent());
539539
let plan2_formatted = format!("{}", plan2.display_indent());
540540
assert_eq!(plan_formatted, plan2_formatted);

benchmarks/src/tpch/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,7 @@ mod tests {
387387
let plan = ctx.sql(&query).await?;
388388
let plan = plan.into_optimized_plan()?;
389389
let bytes = logical_plan_to_bytes(&plan)?;
390-
let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
390+
let plan2 = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
391391
let plan_formatted = format!("{}", plan.display_indent());
392392
let plan2_formatted = format!("{}", plan2.display_indent());
393393
assert_eq!(plan_formatted, plan2_formatted);

datafusion/common/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ pub use error::{
108108
// The HashMap and HashSet implementations that should be used as the uniform defaults
109109
pub type HashMap<K, V, S = DefaultHashBuilder> = hashbrown::HashMap<K, V, S>;
110110
pub type HashSet<T, S = DefaultHashBuilder> = hashbrown::HashSet<T, S>;
111+
pub mod hash_map {
112+
pub use hashbrown::hash_map::Entry;
113+
}
114+
pub mod hash_set {
115+
pub use hashbrown::hash_set::Entry;
116+
}
111117

112118
/// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is
113119
/// not possible. In normal usage of DataFusion the downcast should always succeed.

datafusion/functions-aggregate/src/min_max/min_max_bytes.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ use arrow::array::{
2020
LargeBinaryBuilder, LargeStringBuilder, StringBuilder, StringViewBuilder,
2121
};
2222
use arrow::datatypes::DataType;
23-
use datafusion_common::{internal_err, Result};
23+
use datafusion_common::hash_map::Entry;
24+
use datafusion_common::{internal_err, HashMap, Result};
2425
use datafusion_expr::{EmitTo, GroupsAccumulator};
2526
use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::apply_filter_as_nulls;
2627
use std::mem::size_of;
@@ -391,14 +392,6 @@ struct MinMaxBytesState {
391392
total_data_bytes: usize,
392393
}
393394

394-
#[derive(Debug, Clone, Copy)]
395-
enum MinMaxLocation<'a> {
396-
/// the min/max value is stored in the existing `min_max` array
397-
ExistingMinMax,
398-
/// the min/max value is stored in the input array at the given index
399-
Input(&'a [u8]),
400-
}
401-
402395
/// Implement the MinMaxBytesAccumulator with a comparison function
403396
/// for comparing strings
404397
impl MinMaxBytesState {
@@ -450,7 +443,7 @@ impl MinMaxBytesState {
450443
// Minimize value copies by calculating the new min/maxes for each group
451444
// in this batch (either the existing min/max or the new input value)
452445
// and updating the owned values in `self.min_maxes` at most once
453-
let mut locations = vec![MinMaxLocation::ExistingMinMax; total_num_groups];
446+
let mut locations = HashMap::<usize, &[u8]>::with_capacity(group_indices.len());
454447

455448
// Figure out the new min value for each group
456449
for (new_val, group_index) in iter.into_iter().zip(group_indices.iter()) {
@@ -459,32 +452,29 @@ impl MinMaxBytesState {
459452
continue; // skip nulls
460453
};
461454

462-
let existing_val = match locations[group_index] {
463-
// previous input value was the min/max, so compare it
464-
MinMaxLocation::Input(existing_val) => existing_val,
465-
MinMaxLocation::ExistingMinMax => {
466-
let Some(existing_val) = self.min_max[group_index].as_ref() else {
467-
// no existing min/max, so this is the new min/max
468-
locations[group_index] = MinMaxLocation::Input(new_val);
469-
continue;
470-
};
471-
existing_val.as_ref()
455+
match locations.entry(group_index) {
456+
Entry::Occupied(mut occupied_entry) => {
457+
if cmp(new_val, occupied_entry.get()) {
458+
occupied_entry.insert(new_val);
459+
}
460+
}
461+
Entry::Vacant(vacant_entry) => {
462+
if let Some(old_val) = self.min_max[group_index].as_ref() {
463+
if cmp(new_val, old_val) {
464+
vacant_entry.insert(new_val);
465+
}
466+
} else {
467+
vacant_entry.insert(new_val);
468+
}
472469
}
473470
};
474-
475-
// Compare the new value to the existing value, replacing if necessary
476-
if cmp(new_val, existing_val) {
477-
locations[group_index] = MinMaxLocation::Input(new_val);
478-
}
479471
}
480472

481473
// Update self.min_max with any new min/max values we found in the input
482-
for (group_index, location) in locations.iter().enumerate() {
483-
match location {
484-
MinMaxLocation::ExistingMinMax => {}
485-
MinMaxLocation::Input(new_val) => self.set_value(group_index, new_val),
486-
}
474+
for (group_index, location) in locations.iter() {
475+
self.set_value(*group_index, location);
487476
}
477+
488478
Ok(())
489479
}
490480

datafusion/functions-nested/src/set_ops.rs

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ use arrow::datatypes::{DataType, Field, FieldRef};
2929
use arrow::row::{RowConverter, SortField};
3030
use datafusion_common::cast::{as_large_list_array, as_list_array};
3131
use datafusion_common::utils::ListCoercion;
32-
use datafusion_common::{
33-
exec_err, internal_err, plan_err, utils::take_function_args, Result,
34-
};
32+
use datafusion_common::{exec_err, internal_err, utils::take_function_args, Result};
3533
use datafusion_expr::{
3634
ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
3735
};
@@ -289,13 +287,7 @@ impl ScalarUDFImpl for ArrayDistinct {
289287
}
290288

291289
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
292-
match &arg_types[0] {
293-
List(field) => Ok(DataType::new_list(field.data_type().clone(), true)),
294-
LargeList(field) => {
295-
Ok(DataType::new_large_list(field.data_type().clone(), true))
296-
}
297-
arg_type => plan_err!("{} does not support type {arg_type}", self.name()),
298-
}
290+
Ok(arg_types[0].clone())
299291
}
300292

301293
fn invoke_with_args(
@@ -563,3 +555,54 @@ fn general_array_distinct<OffsetSize: OffsetSizeTrait>(
563555
array.nulls().cloned(),
564556
)?))
565557
}
558+
559+
#[cfg(test)]
560+
mod tests {
561+
use std::sync::Arc;
562+
563+
use arrow::{
564+
array::{Int32Array, ListArray},
565+
buffer::OffsetBuffer,
566+
datatypes::{DataType, Field},
567+
};
568+
use datafusion_common::{config::ConfigOptions, DataFusionError};
569+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs};
570+
571+
use crate::set_ops::array_distinct_udf;
572+
573+
#[test]
574+
fn test_array_distinct_inner_nullability_result_type_match_return_type(
575+
) -> Result<(), DataFusionError> {
576+
let udf = array_distinct_udf();
577+
578+
for inner_nullable in [true, false] {
579+
let inner_field = Field::new_list_field(DataType::Int32, inner_nullable);
580+
let input_field =
581+
Field::new_list("input", Arc::new(inner_field.clone()), true);
582+
583+
// [[1, 1, 2]]
584+
let input_array = ListArray::new(
585+
inner_field.into(),
586+
OffsetBuffer::new(vec![0, 3].into()),
587+
Arc::new(Int32Array::new(vec![1, 1, 2].into(), None)),
588+
None,
589+
);
590+
591+
let input_array = ColumnarValue::Array(Arc::new(input_array));
592+
593+
let result = udf.invoke_with_args(ScalarFunctionArgs {
594+
args: vec![input_array],
595+
arg_fields: vec![input_field.clone().into()],
596+
number_rows: 1,
597+
return_field: input_field.clone().into(),
598+
config_options: Arc::new(ConfigOptions::default()),
599+
})?;
600+
601+
assert_eq!(
602+
result.data_type(),
603+
udf.return_type(&[input_field.data_type().clone()])?
604+
);
605+
}
606+
Ok(())
607+
}
608+
}

datafusion/optimizer/src/replace_distinct_aggregate.rs

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion_common::tree_node::Transformed;
2525
use datafusion_common::{Column, Result};
2626
use datafusion_expr::expr_rewriter::normalize_cols;
2727
use datafusion_expr::utils::expand_wildcard;
28-
use datafusion_expr::{col, ExprFunctionExt, LogicalPlanBuilder};
28+
use datafusion_expr::{col, lit, ExprFunctionExt, Limit, LogicalPlanBuilder};
2929
use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan};
3030

3131
/// Optimizer that replaces logical [[Distinct]] with a logical [[Aggregate]]
@@ -54,6 +54,17 @@ use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan};
5454
/// )
5555
/// ORDER BY a DESC
5656
/// ```
57+
///
58+
/// In case there are no columns, the [[Distinct]] is replaced by a [[Limit]]
59+
///
60+
/// ```text
61+
/// SELECT DISTINCT * FROM empty_table
62+
/// ```
63+
///
64+
/// Into
65+
/// ```text
66+
/// SELECT * FROM empty_table LIMIT 1
67+
/// ```
5768
#[derive(Default, Debug)]
5869
pub struct ReplaceDistinctWithAggregate {}
5970

@@ -78,6 +89,16 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
7889
LogicalPlan::Distinct(Distinct::All(input)) => {
7990
let group_expr = expand_wildcard(input.schema(), &input, None)?;
8091

92+
if group_expr.is_empty() {
93+
// Special case: there are no columns to group by, so we can't replace it by a group by
94+
// however, we can replace it by LIMIT 1 because there is either no output or a single empty row
95+
return Ok(Transformed::yes(LogicalPlan::Limit(Limit {
96+
skip: None,
97+
fetch: Some(Box::new(lit(1i64))),
98+
input,
99+
})));
100+
}
101+
81102
let field_count = input.schema().fields().len();
82103
for dep in input.schema().functional_dependencies().iter() {
83104
// If distinct is exactly the same with a previous GROUP BY, we can
@@ -184,15 +205,17 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
184205

185206
#[cfg(test)]
186207
mod tests {
187-
use std::sync::Arc;
188-
189208
use crate::assert_optimized_plan_eq_snapshot;
190209
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
191210
use crate::test::*;
211+
use arrow::datatypes::{Fields, Schema};
212+
use std::sync::Arc;
192213

193214
use crate::OptimizerContext;
194215
use datafusion_common::Result;
195-
use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder, Expr};
216+
use datafusion_expr::{
217+
col, logical_plan::builder::LogicalPlanBuilder, table_scan, Expr,
218+
};
196219
use datafusion_functions_aggregate::sum::sum;
197220

198221
macro_rules! assert_optimized_plan_equal {
@@ -274,4 +297,16 @@ mod tests {
274297
TableScan: test
275298
")
276299
}
300+
301+
#[test]
302+
fn use_limit_1_when_no_columns() -> Result<()> {
303+
let plan = table_scan(Some("test"), &Schema::new(Fields::empty()), None)?
304+
.distinct()?
305+
.build()?;
306+
307+
assert_optimized_plan_equal!(plan, @r"
308+
Limit: skip=0, fetch=1
309+
TableScan: test
310+
")
311+
}
277312
}

datafusion/proto/Cargo.toml

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,31 @@ name = "datafusion_proto"
4040
[features]
4141
default = ["parquet"]
4242
json = ["pbjson", "serde", "serde_json", "datafusion-proto-common/json"]
43-
parquet = ["datafusion/parquet", "datafusion-common/parquet"]
44-
avro = ["datafusion/avro", "datafusion-common/avro"]
43+
parquet = ["datafusion-datasource-parquet", "datafusion-common/parquet", "datafusion/parquet"]
44+
avro = ["datafusion-datasource-avro", "datafusion-common/avro"]
45+
46+
# Note to developers: do *not* add `datafusion` as a dependency in
47+
# this crate. See https://github.com/apache/datafusion/issues/17713
48+
# for additional information.
4549

4650
[dependencies]
4751
arrow = { workspace = true }
4852
chrono = { workspace = true }
49-
datafusion = { workspace = true, default-features = false }
53+
datafusion-catalog = { workspace = true }
54+
datafusion-catalog-listing = { workspace = true }
5055
datafusion-common = { workspace = true }
56+
datafusion-datasource = { workspace = true }
57+
datafusion-datasource-arrow = { workspace = true }
58+
datafusion-datasource-avro = { workspace = true, optional = true }
59+
datafusion-datasource-csv = { workspace = true }
60+
datafusion-datasource-json = { workspace = true }
61+
datafusion-datasource-parquet = { workspace = true, optional = true }
62+
datafusion-execution = { workspace = true }
5163
datafusion-expr = { workspace = true }
64+
datafusion-functions-table = { workspace = true }
65+
datafusion-physical-expr = { workspace = true }
66+
datafusion-physical-expr-common = { workspace = true }
67+
datafusion-physical-plan = { workspace = true }
5268
datafusion-proto-common = { workspace = true }
5369
object_store = { workspace = true }
5470
pbjson = { workspace = true, optional = true }

0 commit comments

Comments
 (0)