-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support Ansi mode in abs function #500
Changes from 24 commits
fce78fa
1071eee
750331d
9f89b57
e6eda86
0b37f8e
d1e2099
73e5513
cff5f29
9b3b4c8
708fffe
f7df357
76914b0
3b55ca2
aa92450
ab28bf6
0dda0b2
1fc4f48
828ab3b
fe2a003
dc3f2a8
3dff4bb
19969d6
6fb873a
bf64a24
b4df447
a72db13
809052d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use arrow::datatypes::DataType; | ||
use arrow_schema::ArrowError; | ||
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature}; | ||
use datafusion_common::DataFusionError; | ||
use datafusion_functions::math; | ||
use std::{any::Any, sync::Arc}; | ||
|
||
use crate::execution::operators::ExecutionError; | ||
|
||
use super::{arithmetic_overflow_error, EvalMode}; | ||
|
||
#[derive(Debug)] | ||
pub struct CometAbsFunc { | ||
inner_abs_func: Arc<dyn ScalarUDFImpl>, | ||
eval_mode: EvalMode, | ||
data_type_name: String, | ||
} | ||
|
||
impl CometAbsFunc { | ||
pub fn new(eval_mode: EvalMode, data_type_name: String) -> Result<Self, ExecutionError> { | ||
if let EvalMode::Legacy | EvalMode::Ansi = eval_mode { | ||
Ok(Self { | ||
inner_abs_func: math::abs().inner(), | ||
eval_mode, | ||
data_type_name, | ||
}) | ||
} else { | ||
Err(ExecutionError::GeneralError(format!( | ||
"Invalid EvalMode: \"{:?}\"", | ||
eval_mode | ||
))) | ||
} | ||
} | ||
} | ||
|
||
impl ScalarUDFImpl for CometAbsFunc { | ||
fn as_any(&self) -> &dyn Any { | ||
self | ||
} | ||
fn name(&self) -> &str { | ||
"abs" | ||
} | ||
|
||
fn signature(&self) -> &Signature { | ||
self.inner_abs_func.signature() | ||
} | ||
|
||
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType, DataFusionError> { | ||
self.inner_abs_func.return_type(arg_types) | ||
} | ||
|
||
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> { | ||
match self.inner_abs_func.invoke(args) { | ||
Err(DataFusionError::ArrowError(ArrowError::ComputeError(msg), trace)) | ||
if msg.contains("overflow") => | ||
{ | ||
if self.eval_mode == EvalMode::Legacy { | ||
Ok(args[0].clone()) | ||
} else { | ||
let msg = arithmetic_overflow_error(&self.data_type_name).to_string(); | ||
Err(DataFusionError::ArrowError( | ||
ArrowError::ComputeError(msg), | ||
trace, | ||
)) | ||
} | ||
} | ||
other => other, | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -24,7 +24,6 @@ use datafusion::{ | |||||
arrow::{compute::SortOptions, datatypes::SchemaRef}, | ||||||
common::DataFusionError, | ||||||
execution::FunctionRegistry, | ||||||
functions::math, | ||||||
logical_expr::{ | ||||||
BuiltinScalarFunction, Operator as DataFusionOperator, ScalarFunctionDefinition, | ||||||
}, | ||||||
|
@@ -52,6 +51,7 @@ use datafusion_common::{ | |||||
tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, | ||||||
JoinType as DFJoinType, ScalarValue, | ||||||
}; | ||||||
use datafusion_physical_expr::udf::ScalarUDF; | ||||||
use itertools::Itertools; | ||||||
use jni::objects::GlobalRef; | ||||||
use num::{BigInt, ToPrimitive}; | ||||||
|
@@ -65,7 +65,7 @@ use crate::{ | |||||
avg_decimal::AvgDecimal, | ||||||
bitwise_not::BitwiseNotExpr, | ||||||
bloom_filter_might_contain::BloomFilterMightContain, | ||||||
cast::{Cast, EvalMode}, | ||||||
cast::Cast, | ||||||
checkoverflow::CheckOverflow, | ||||||
correlation::Correlation, | ||||||
covariance::Covariance, | ||||||
|
@@ -97,6 +97,8 @@ use crate::{ | |||||
}, | ||||||
}; | ||||||
|
||||||
use super::expressions::{abs::CometAbsFunc, EvalMode}; | ||||||
|
||||||
// For clippy error on type_complexity. | ||||||
type ExecResult<T> = Result<T, ExecutionError>; | ||||||
type PhyAggResult = Result<Vec<Arc<dyn AggregateExpr>>, ExecutionError>; | ||||||
|
@@ -356,11 +358,7 @@ impl PhysicalPlanner { | |||||
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; | ||||||
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap()); | ||||||
let timezone = expr.timezone.clone(); | ||||||
let eval_mode = match spark_expression::EvalMode::try_from(expr.eval_mode)? { | ||||||
spark_expression::EvalMode::Legacy => EvalMode::Legacy, | ||||||
spark_expression::EvalMode::Try => EvalMode::Try, | ||||||
spark_expression::EvalMode::Ansi => EvalMode::Ansi, | ||||||
}; | ||||||
let eval_mode = EvalMode::try_from(expr.eval_mode)?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be more idiomatic to use
Suggested change
|
||||||
|
||||||
Ok(Arc::new(Cast::new(child, datatype, eval_mode, timezone))) | ||||||
} | ||||||
|
@@ -499,7 +497,12 @@ impl PhysicalPlanner { | |||||
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; | ||||||
let return_type = child.data_type(&input_schema)?; | ||||||
let args = vec![child]; | ||||||
let scalar_def = ScalarFunctionDefinition::UDF(math::abs()); | ||||||
let eval_mode = EvalMode::try_from(expr.eval_mode)?; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be more idiomatic to use
Suggested change
|
||||||
let comet_abs = ScalarUDF::new_from_impl(CometAbsFunc::new( | ||||||
eval_mode, | ||||||
return_type.to_string(), | ||||||
)?); | ||||||
let scalar_def = ScalarFunctionDefinition::UDF(Arc::new(comet_abs)); | ||||||
|
||||||
let expr = | ||||||
ScalarFunctionExpr::new("abs", scalar_def, args, return_type, None, false); | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -480,6 +480,7 @@ message BitwiseNot { | |
|
||
message Abs { | ||
Expr child = 1; | ||
EvalMode eval_mode = 2; | ||
} | ||
|
||
message Subquery { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1489,15 +1489,15 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim | |
None | ||
} | ||
|
||
case Abs(child, _) => | ||
case Abs(child, failOnErr) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since we are already using failOnErr, can we simply use this boolean instead of evalmode struct? what are you thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, thanks for your review! The intention why it is done this way is that in the Rust code the ansi mode is always treated the same way whether the expression supports the three ansi modes or only two. If not, we have to do a different treatment for both cases. |
||
val childExpr = exprToProtoInternal(child, inputs) | ||
if (childExpr.isDefined) { | ||
val abs = | ||
ExprOuterClass.Abs | ||
.newBuilder() | ||
.setChild(childExpr.get) | ||
.build() | ||
Some(Expr.newBuilder().setAbs(abs).build()) | ||
val evalModeStr = | ||
if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY | ||
val absBuilder = ExprOuterClass.Abs.newBuilder() | ||
absBuilder.setChild(childExpr.get) | ||
absBuilder.setEvalMode(evalModeStr) | ||
Some(Expr.newBuilder().setAbs(absBuilder).build()) | ||
} else { | ||
withInfo(expr, child) | ||
None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if Arrow/DataFusion threw a specific overflow error so that we didn't have to look for a string within the error message, but I guess that isn't available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am going to file a feature request in DataFusion. I will post the link here later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See apache/datafusion#10805 and apache/arrow-rs#5845
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! I understand that we have to wait for the next version of arror-rs to be released and integrate it here to be able to make the changes, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can continue with this and do a follow up once the arrow-rs change is available. Or you can wait; the arrow-rs community is very responsive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to do it in a follow-up PR, I think. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would help to log an issue to keep track
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apparently my PR in Arrow will not be available for 3 months because it is an API change 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we upgrade to the version of the arrow with the improved overflow eror reporing then the tests in this PR will fail (because we are looking for
ComputeError
but instead will getArithmeticOverflow
) so I don't think we need to file an issueThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@parthchandra fyi ☝️