Skip to content

Commit 7e21a19

Browse files
authored
Merge branch 'main' into case_reduce_filtering
2 parents f49d3ea + b5b7f9b commit 7e21a19

File tree

20 files changed

+3286
-117
lines changed

20 files changed

+3286
-117
lines changed

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@e43a5023a747770bfcb71ae048541a681714b951 # v2.62.33
45+
uses: taiki-e/install-action@80466ef8efa80486cdfbddf929453a4f3565c791 # v2.62.34
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check

.github/workflows/rust.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ jobs:
425425
sudo apt-get update -qq
426426
sudo apt-get install -y -qq clang
427427
- name: Setup wasm-pack
428-
uses: taiki-e/install-action@e43a5023a747770bfcb71ae048541a681714b951 # v2.62.33
428+
uses: taiki-e/install-action@80466ef8efa80486cdfbddf929453a4f3565c791 # v2.62.34
429429
with:
430430
tool: wasm-pack
431431
- name: Run tests with headless mode
@@ -752,7 +752,7 @@ jobs:
752752
- name: Setup Rust toolchain
753753
uses: ./.github/actions/setup-builder
754754
- name: Install cargo-msrv
755-
uses: taiki-e/install-action@e43a5023a747770bfcb71ae048541a681714b951 # v2.62.33
755+
uses: taiki-e/install-action@80466ef8efa80486cdfbddf929453a4f3565c791 # v2.62.34
756756
with:
757757
tool: cargo-msrv
758758

datafusion/common/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -938,6 +938,11 @@ config_namespace! {
938938
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
939939
pub prefer_hash_join: bool, default = true
940940

941+
/// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
942+
/// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
943+
/// one range filter.
944+
pub enable_piecewise_merge_join: bool, default = false
945+
941946
/// The maximum estimated size in bytes for one input side of a HashJoin
942947
/// will be collected into a single partition
943948
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

datafusion/core/src/physical_planner.rs

Lines changed: 155 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,11 @@ use datafusion_expr::expr::{
7878
};
7979
use datafusion_expr::expr_rewriter::unnormalize_cols;
8080
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
81+
use datafusion_expr::utils::split_conjunction;
8182
use datafusion_expr::{
82-
Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType,
83-
Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame,
84-
WindowFrameBound, WriteOp,
83+
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
84+
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
85+
WindowFrame, WindowFrameBound, WriteOp,
8586
};
8687
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8788
use datafusion_physical_expr::expressions::Literal;
@@ -91,6 +92,7 @@ use datafusion_physical_expr::{
9192
use datafusion_physical_optimizer::PhysicalOptimizerRule;
9293
use datafusion_physical_plan::empty::EmptyExec;
9394
use datafusion_physical_plan::execution_plan::InvariantLevel;
95+
use datafusion_physical_plan::joins::PiecewiseMergeJoinExec;
9496
use datafusion_physical_plan::metrics::MetricType;
9597
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
9698
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
@@ -1133,8 +1135,42 @@ impl DefaultPhysicalPlanner {
11331135
})
11341136
.collect::<Result<join_utils::JoinOn>>()?;
11351137

1138+
// TODO: `num_range_filters` can be used later on for ASOF joins (`num_range_filters > 1`)
1139+
let mut num_range_filters = 0;
1140+
let mut range_filters: Vec<Expr> = Vec::new();
1141+
let mut total_filters = 0;
1142+
11361143
let join_filter = match filter {
11371144
Some(expr) => {
1145+
let split_expr = split_conjunction(expr);
1146+
for expr in split_expr.iter() {
1147+
match *expr {
1148+
Expr::BinaryExpr(BinaryExpr {
1149+
left: _,
1150+
right: _,
1151+
op,
1152+
}) => {
1153+
if matches!(
1154+
op,
1155+
Operator::Lt
1156+
| Operator::LtEq
1157+
| Operator::Gt
1158+
| Operator::GtEq
1159+
) {
1160+
range_filters.push((**expr).clone());
1161+
num_range_filters += 1;
1162+
}
1163+
total_filters += 1;
1164+
}
1165+
// TODO: Want to deal with `Expr::Between` for IEJoins, it counts as two range predicates
1166+
// which is why it is not dealt with in PWMJ
1167+
// Expr::Between(_) => {},
1168+
_ => {
1169+
total_filters += 1;
1170+
}
1171+
}
1172+
}
1173+
11381174
// Extract columns from filter expression and saved in a HashSet
11391175
let cols = expr.column_refs();
11401176

@@ -1190,6 +1226,7 @@ impl DefaultPhysicalPlanner {
11901226
)?;
11911227
let filter_schema =
11921228
Schema::new_with_metadata(filter_fields, metadata);
1229+
11931230
let filter_expr = create_physical_expr(
11941231
expr,
11951232
&filter_df_schema,
@@ -1212,10 +1249,125 @@ impl DefaultPhysicalPlanner {
12121249
let prefer_hash_join =
12131250
session_state.config_options().optimizer.prefer_hash_join;
12141251

1252+
// TODO: Allow PWMJ to deal with residual equijoin conditions
12151253
let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
12161254
if join_filter.is_none() && matches!(join_type, JoinType::Inner) {
12171255
// cross join if there is no join conditions and no join filter set
12181256
Arc::new(CrossJoinExec::new(physical_left, physical_right))
1257+
} else if num_range_filters == 1
1258+
&& total_filters == 1
1259+
&& !matches!(
1260+
join_type,
1261+
JoinType::LeftSemi
1262+
| JoinType::RightSemi
1263+
| JoinType::LeftAnti
1264+
| JoinType::RightAnti
1265+
| JoinType::LeftMark
1266+
| JoinType::RightMark
1267+
)
1268+
&& session_state
1269+
.config_options()
1270+
.optimizer
1271+
.enable_piecewise_merge_join
1272+
{
1273+
let Expr::BinaryExpr(be) = &range_filters[0] else {
1274+
return plan_err!(
1275+
"Unsupported expression for PWMJ: Expected `Expr::BinaryExpr`"
1276+
);
1277+
};
1278+
1279+
let mut op = be.op;
1280+
if !matches!(
1281+
op,
1282+
Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq
1283+
) {
1284+
return plan_err!(
1285+
"Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1286+
op
1287+
);
1288+
}
1289+
1290+
fn reverse_ineq(op: Operator) -> Operator {
1291+
match op {
1292+
Operator::Lt => Operator::Gt,
1293+
Operator::LtEq => Operator::GtEq,
1294+
Operator::Gt => Operator::Lt,
1295+
Operator::GtEq => Operator::LtEq,
1296+
_ => op,
1297+
}
1298+
}
1299+
1300+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1301+
enum Side {
1302+
Left,
1303+
Right,
1304+
Both,
1305+
}
1306+
1307+
let side_of = |e: &Expr| -> Result<Side> {
1308+
let cols = e.column_refs();
1309+
let any_left = cols
1310+
.iter()
1311+
.any(|c| left_df_schema.index_of_column(c).is_ok());
1312+
let any_right = cols
1313+
.iter()
1314+
.any(|c| right_df_schema.index_of_column(c).is_ok());
1315+
1316+
Ok(match (any_left, any_right) {
1317+
(true, false) => Side::Left,
1318+
(false, true) => Side::Right,
1319+
(true, true) => Side::Both,
1320+
_ => unreachable!(),
1321+
})
1322+
};
1323+
1324+
let mut lhs_logical = &be.left;
1325+
let mut rhs_logical = &be.right;
1326+
1327+
let left_side = side_of(lhs_logical)?;
1328+
let right_side = side_of(rhs_logical)?;
1329+
if matches!(left_side, Side::Both)
1330+
|| matches!(right_side, Side::Both)
1331+
{
1332+
return Ok(Arc::new(NestedLoopJoinExec::try_new(
1333+
physical_left,
1334+
physical_right,
1335+
join_filter,
1336+
join_type,
1337+
None,
1338+
)?));
1339+
}
1340+
1341+
if left_side == Side::Right && right_side == Side::Left {
1342+
std::mem::swap(&mut lhs_logical, &mut rhs_logical);
1343+
op = reverse_ineq(op);
1344+
} else if !(left_side == Side::Left && right_side == Side::Right)
1345+
{
1346+
return plan_err!(
1347+
"Unsupported operator for PWMJ: {:?}. Expected one of <, <=, >, >=",
1348+
op
1349+
);
1350+
}
1351+
1352+
let on_left = create_physical_expr(
1353+
lhs_logical,
1354+
left_df_schema,
1355+
session_state.execution_props(),
1356+
)?;
1357+
let on_right = create_physical_expr(
1358+
rhs_logical,
1359+
right_df_schema,
1360+
session_state.execution_props(),
1361+
)?;
1362+
1363+
Arc::new(PiecewiseMergeJoinExec::try_new(
1364+
physical_left,
1365+
physical_right,
1366+
(on_left, on_right),
1367+
op,
1368+
*join_type,
1369+
session_state.config().target_partitions(),
1370+
)?)
12191371
} else {
12201372
// there is no equal join condition, use the nested loop join
12211373
Arc::new(NestedLoopJoinExec::try_new(

datafusion/functions/src/datetime/current_time.rs

Lines changed: 115 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,28 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use arrow::array::timezone::Tz;
1819
use arrow::datatypes::DataType;
1920
use arrow::datatypes::DataType::Time64;
2021
use arrow::datatypes::TimeUnit::Nanosecond;
21-
use std::any::Any;
22-
22+
use chrono::TimeZone;
23+
use chrono::Timelike;
2324
use datafusion_common::{internal_err, Result, ScalarValue};
2425
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
2526
use datafusion_expr::{
2627
ColumnarValue, Documentation, Expr, ScalarUDFImpl, Signature, Volatility,
2728
};
2829
use datafusion_macros::user_doc;
30+
use std::any::Any;
2931

3032
#[user_doc(
3133
doc_section(label = "Time and Date Functions"),
3234
description = r#"
33-
Returns the current UTC time.
35+
Returns the current time in the session time zone.
3436
3537
The `current_time()` return value is determined at query time and will return the same time, no matter when in the query plan the function executes.
38+
39+
The session time zone can be set using the statement 'SET datafusion.execution.time_zone = desired time zone'. The time zone can be a value like +00:00, 'Europe/London' etc.
3640
"#,
3741
syntax_example = "current_time()"
3842
)]
@@ -93,7 +97,20 @@ impl ScalarUDFImpl for CurrentTimeFunc {
9397
info: &dyn SimplifyInfo,
9498
) -> Result<ExprSimplifyResult> {
9599
let now_ts = info.execution_props().query_execution_start_time;
96-
let nano = now_ts.timestamp_nanos_opt().map(|ts| ts % 86400000000000);
100+
101+
// Try to get timezone from config and convert to local time
102+
let nano = info
103+
.execution_props()
104+
.config_options()
105+
.and_then(|config| config.execution.time_zone.parse::<Tz>().ok())
106+
.map_or_else(
107+
|| datetime_to_time_nanos(&now_ts),
108+
|tz| {
109+
let local_now = tz.from_utc_datetime(&now_ts.naive_utc());
110+
datetime_to_time_nanos(&local_now)
111+
},
112+
);
113+
97114
Ok(ExprSimplifyResult::Simplified(Expr::Literal(
98115
ScalarValue::Time64Nanosecond(nano),
99116
None,
@@ -104,3 +121,97 @@ impl ScalarUDFImpl for CurrentTimeFunc {
104121
self.doc()
105122
}
106123
}
124+
125+
// Helper function for conversion of datetime to a timestamp.
126+
fn datetime_to_time_nanos<Tz: TimeZone>(dt: &chrono::DateTime<Tz>) -> Option<i64> {
127+
let hour = dt.hour() as i64;
128+
let minute = dt.minute() as i64;
129+
let second = dt.second() as i64;
130+
let nanosecond = dt.nanosecond() as i64;
131+
Some((hour * 3600 + minute * 60 + second) * 1_000_000_000 + nanosecond)
132+
}
133+
134+
#[cfg(test)]
135+
mod tests {
136+
use super::*;
137+
use arrow::datatypes::{DataType, TimeUnit::Nanosecond};
138+
use chrono::{DateTime, Utc};
139+
use datafusion_common::{Result, ScalarValue};
140+
use datafusion_expr::execution_props::ExecutionProps;
141+
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
142+
use std::sync::Arc;
143+
144+
struct MockSimplifyInfo {
145+
execution_props: ExecutionProps,
146+
}
147+
148+
impl SimplifyInfo for MockSimplifyInfo {
149+
fn is_boolean_type(&self, _expr: &Expr) -> Result<bool> {
150+
Ok(false)
151+
}
152+
153+
fn nullable(&self, _expr: &Expr) -> Result<bool> {
154+
Ok(true)
155+
}
156+
157+
fn execution_props(&self) -> &ExecutionProps {
158+
&self.execution_props
159+
}
160+
161+
fn get_data_type(&self, _expr: &Expr) -> Result<DataType> {
162+
Ok(Time64(Nanosecond))
163+
}
164+
}
165+
166+
fn set_session_timezone_env(tz: &str, start_time: DateTime<Utc>) -> MockSimplifyInfo {
167+
let mut config = datafusion_common::config::ConfigOptions::default();
168+
config.execution.time_zone = tz.to_string();
169+
let mut execution_props =
170+
ExecutionProps::new().with_query_execution_start_time(start_time);
171+
execution_props.config_options = Some(Arc::new(config));
172+
MockSimplifyInfo { execution_props }
173+
}
174+
175+
#[test]
176+
fn test_current_time_timezone_offset() {
177+
// Use a fixed start time for consistent testing
178+
let start_time = Utc.with_ymd_and_hms(2025, 1, 1, 12, 0, 0).unwrap();
179+
180+
// Test with UTC+05:00
181+
let info_plus_5 = set_session_timezone_env("+05:00", start_time);
182+
let result_plus_5 = CurrentTimeFunc::new()
183+
.simplify(vec![], &info_plus_5)
184+
.unwrap();
185+
186+
// Test with UTC-05:00
187+
let info_minus_5 = set_session_timezone_env("-05:00", start_time);
188+
let result_minus_5 = CurrentTimeFunc::new()
189+
.simplify(vec![], &info_minus_5)
190+
.unwrap();
191+
192+
// Extract nanoseconds from results
193+
let nanos_plus_5 = match result_plus_5 {
194+
ExprSimplifyResult::Simplified(Expr::Literal(
195+
ScalarValue::Time64Nanosecond(Some(n)),
196+
_,
197+
)) => n,
198+
_ => panic!("Expected Time64Nanosecond literal"),
199+
};
200+
201+
let nanos_minus_5 = match result_minus_5 {
202+
ExprSimplifyResult::Simplified(Expr::Literal(
203+
ScalarValue::Time64Nanosecond(Some(n)),
204+
_,
205+
)) => n,
206+
_ => panic!("Expected Time64Nanosecond literal"),
207+
};
208+
209+
// Calculate the difference: UTC+05:00 should be 10 hours ahead of UTC-05:00
210+
let difference = nanos_plus_5 - nanos_minus_5;
211+
212+
// 10 hours in nanoseconds
213+
let expected_offset = 10i64 * 3600 * 1_000_000_000;
214+
215+
assert_eq!(difference, expected_offset, "Expected 10-hour offset difference in nanoseconds between UTC+05:00 and UTC-05:00");
216+
}
217+
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,7 @@ impl HashJoinStream {
637637
let (left_side, right_side) = get_final_indices_from_shared_bitmap(
638638
build_side.left_data.visited_indices_bitmap(),
639639
self.join_type,
640+
true,
640641
);
641642
let empty_right_batch = RecordBatch::new_empty(self.right.schema());
642643
// use the left and right indices to produce the batch result

0 commit comments

Comments
 (0)