From 39ea387fe041274bf32b1b360f3374a42b617cf9 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 5 Feb 2024 09:30:42 +0800 Subject: [PATCH 01/14] feat: impl for ScalarExpr --- src/flow/src/expr/func.rs | 23 +++ src/flow/src/expr/scalar.rs | 333 ++++++++++++++++++++++++++++++++++++ 2 files changed, 356 insertions(+) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index eed43f65a759..9505f51d47d5 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -101,3 +101,26 @@ pub enum VariadicFunc { And, Or, } + +impl UnaryFunc { + pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result { + todo!() + } +} + +impl BinaryFunc { + pub fn eval( + &self, + values: &[Value], + expr1: &ScalarExpr, + expr2: &ScalarExpr, + ) -> Result { + todo!() + } +} + +impl VariadicFunc { + pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { + todo!() + } +} diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 3c1d745a8616..e5b45df766ec 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -59,3 +59,336 @@ pub enum ScalarExpr { els: Box, }, } + +impl ScalarExpr { + pub fn call_unary(self, func: UnaryFunc) -> Self { + ScalarExpr::CallUnary { + func, + expr: Box::new(self), + } + } + + pub fn call_binary(self, other: Self, func: BinaryFunc) -> Self { + ScalarExpr::CallBinary { + func, + expr1: Box::new(self), + expr2: Box::new(other), + } + } + + pub fn eval(&self, values: &[Value]) -> Result { + match self { + ScalarExpr::Column(index) => Ok(values[*index].clone()), + ScalarExpr::Literal(row_res, _ty) => Ok(row_res.clone()), + ScalarExpr::CallUnmaterializable(f) => OptimizeSnafu { + reason: "Can't eval unmaterializable function".to_string(), + } + .fail(), + ScalarExpr::CallUnary { func, expr } => func.eval(values, expr), + ScalarExpr::CallBinary { func, expr1, expr2 } => func.eval(values, expr1, expr2), + ScalarExpr::CallVariadic { func, exprs } => func.eval(values, exprs), + ScalarExpr::If { cond, then, els } => match cond.eval(values) { + Ok(Value::Boolean(true)) => then.eval(values), + Ok(Value::Boolean(false)) => els.eval(values), + _ => InvalidArgumentSnafu { + reason: "if condition must be boolean".to_string(), + } + .fail(), + }, + } + } + + /// Rewrites column indices with their value in `permutation`. + /// + /// This method is applicable even when `permutation` is not a + /// strict permutation, and it only needs to have entries for + /// each column referenced in `self`. + pub fn permute(&mut self, permutation: &[usize]) { + self.visit_mut_post_nolimit(&mut |e| { + if let ScalarExpr::Column(old_i) = e { + *old_i = permutation[*old_i]; + } + }); + } + + /// Rewrites column indices with their value in `permutation`. + /// + /// This method is applicable even when `permutation` is not a + /// strict permutation, and it only needs to have entries for + /// each column referenced in `self`. + pub fn permute_map(&mut self, permutation: &BTreeMap) { + self.visit_mut_post_nolimit(&mut |e| { + if let ScalarExpr::Column(old_i) = e { + *old_i = permutation[old_i]; + } + }); + } + + pub fn support(&self) -> BTreeSet { + let mut support = BTreeSet::new(); + self.visit_post_nolimit(&mut |e| { + if let ScalarExpr::Column(i) = e { + support.insert(*i); + } + }); + support + } + + pub fn as_literal(&self) -> Option { + if let ScalarExpr::Literal(lit, _column_type) = self { + Some(lit.clone()) + } else { + None + } + } + + pub fn is_literal(&self) -> bool { + matches!(self, ScalarExpr::Literal(..)) + } + + pub fn is_literal_true(&self) -> bool { + Some(Value::Boolean(true)) == self.as_literal() + } + + pub fn is_literal_false(&self) -> bool { + Some(Value::Boolean(false)) == self.as_literal() + } + + pub fn is_literal_null(&self) -> bool { + Some(Value::Null) == self.as_literal() + } + + pub fn literal_null() -> Self { + ScalarExpr::Literal(Value::Null, ConcreteDataType::null_datatype()) + } + + pub fn literal(res: Value, typ: ConcreteDataType) -> Self { + ScalarExpr::Literal(res, typ) + } + + pub fn literal_false() -> Self { + ScalarExpr::Literal(Value::Boolean(false), ConcreteDataType::boolean_datatype()) + } + + pub fn literal_true() -> Self { + ScalarExpr::Literal(Value::Boolean(true), ConcreteDataType::boolean_datatype()) + } +} + +impl ScalarExpr { + /// visit post-order without stack call limit, but may cause stack overflow + fn visit_post_nolimit(&self, f: &mut F) + where + F: FnMut(&Self), + { + self.visit_children(|e| e.visit_post_nolimit(f)); + f(self); + } + + fn visit_children(&self, mut f: F) + where + F: FnMut(&Self), + { + match self { + ScalarExpr::Column(_) + | ScalarExpr::Literal(_, _) + | ScalarExpr::CallUnmaterializable(_) => (), + ScalarExpr::CallUnary { func: _, expr } => f(expr), + ScalarExpr::CallBinary { + func: as_any, + expr1, + expr2, + } => { + f(expr1); + f(expr2); + } + ScalarExpr::CallVariadic { func: _, exprs } => { + for expr in exprs { + f(expr); + } + } + ScalarExpr::If { cond, then, els } => { + f(cond); + f(then); + f(els); + } + } + } + + fn visit_mut_post_nolimit(&mut self, f: &mut F) + where + F: FnMut(&mut Self), + { + self.visit_mut_children(|e: &mut Self| e.visit_mut_post_nolimit(f)); + f(self); + } + + fn visit_mut_children(&mut self, mut f: F) + where + F: FnMut(&mut Self), + { + match self { + ScalarExpr::Column(_) + | ScalarExpr::Literal(_, _) + | ScalarExpr::CallUnmaterializable(_) => (), + ScalarExpr::CallUnary { func, expr } => f(expr), + ScalarExpr::CallBinary { func, expr1, expr2 } => { + f(expr1); + f(expr2); + } + ScalarExpr::CallVariadic { func, exprs } => { + for expr in exprs { + f(expr); + } + } + ScalarExpr::If { cond, then, els } => { + f(cond); + f(then); + f(els); + } + } + } +} + +impl ScalarExpr { + /// if expr contains function `Now` + pub fn contains_temporal(&self) -> bool { + let mut contains = false; + self.visit_post_nolimit(&mut |e| { + if let ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) = e { + contains = true; + } + }); + contains + } + + /// extract lower or upper bound of `Now` for expr, where `lower bound <= expr < upper bound` + /// + /// returned bool indicates whether the bound is upper bound: + /// + /// false for lower bound, true for upper bound + /// TODO(discord9): allow simple transform like `now() + a < b` to `now() < b - a` + pub fn extract_bound(&self) -> Result<(Option, Option), String> { + let unsupported_err = |msg: &str| { + Err(format!( + "Unsupported temporal predicate: {msg}. NOTE: Use `now()` in direct comparison: {:?}", + self + )) + }; + if let Self::CallBinary { + mut func, + mut expr1, + mut expr2, + } = self.clone() + { + if !(expr1.contains_temporal() ^ expr2.contains_temporal()) { + return unsupported_err("one side of the comparison must be `now()`"); + } + if !expr1.contains_temporal() + && *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) + { + std::mem::swap(&mut expr1, &mut expr2); + func = match func { + BinaryFunc::Eq => BinaryFunc::Eq, + BinaryFunc::NotEq => BinaryFunc::NotEq, + BinaryFunc::Lt => BinaryFunc::Gt, + BinaryFunc::Lte => BinaryFunc::Gte, + BinaryFunc::Gt => BinaryFunc::Lt, + BinaryFunc::Gte => BinaryFunc::Lte, + _ => { + return unsupported_err("The top level operator must be comparison"); + } + }; + } + // TODO: support simple transform like `now() + a < b` to `now() < b - a` + if expr2.contains_temporal() + || *expr1 != ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) + { + return unsupported_err("None of the sides of the comparison is `now()`"); + } + let step = |expr: ScalarExpr| expr.call_unary(UnaryFunc::StepTimestamp); + match func { + BinaryFunc::Eq => Ok((Some(*expr2.clone()), Some(step(*expr2)))), + BinaryFunc::Lt => Ok((None, Some(*expr2))), + BinaryFunc::Lte => Ok((None, Some(step(*expr2)))), + BinaryFunc::Gt => Ok((Some(step(*expr2)), None)), + BinaryFunc::Gte => Ok((Some(*expr2), None)), + _ => unreachable!("Already checked"), + } + } else { + unsupported_err("None of the sides of the comparison is `now()`") + } + } +} + +#[test] +fn test_extract_bound() { + let test_list = [ + // col(0) == now + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Eq, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok(( + Some(ScalarExpr::Column(0)), + Some(ScalarExpr::CallUnary { + func: UnaryFunc::StepTimestamp, + expr: Box::new(ScalarExpr::Column(0)), + }), + )), + ), + // now < col(0) + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Lt, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok((None, Some(ScalarExpr::Column(0)))), + ), + // now <= col(0) + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Lte, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok(( + None, + Some(ScalarExpr::CallUnary { + func: UnaryFunc::StepTimestamp, + expr: Box::new(ScalarExpr::Column(0)), + }), + )), + ), + // now > col(0) -> now >= col(0) + 1 + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Gt, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok(( + Some(ScalarExpr::CallUnary { + func: UnaryFunc::StepTimestamp, + expr: Box::new(ScalarExpr::Column(0)), + }), + None, + )), + ), + // now >= col(0) + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Gte, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok((Some(ScalarExpr::Column(0)), None)), + ), + ]; + for (expr, expected) in test_list.iter() { + assert_eq!(expr.extract_bound(), *expected); + } +} From 955eeba84a734de041fb0cf8fd042d0949ef0d58 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 5 Feb 2024 09:42:54 +0800 Subject: [PATCH 02/14] feat: plain functions --- src/flow/src/expr/func.rs | 278 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 264 insertions(+), 14 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 9505f51d47d5..232890c2c22d 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -47,6 +47,74 @@ pub enum UnaryFunc { StepTimestamp, Cast(ConcreteDataType), } + +impl UnaryFunc { + pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result { + let arg = expr.eval(values)?; + match self { + Self::Not => { + let bool = if let Value::Boolean(bool) = arg { + Ok(bool) + } else { + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: arg.data_type(), + } + .fail()? + }?; + Ok(Value::from(!bool)) + } + Self::IsNull => Ok(Value::from(arg.is_null())), + Self::IsTrue => { + let bool = if let Value::Boolean(bool) = arg { + Ok(bool) + } else { + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: arg.data_type(), + } + .fail()? + }?; + Ok(Value::from(bool)) + } + Self::IsFalse => { + let bool = if let Value::Boolean(bool) = arg { + Ok(bool) + } else { + TypeMismatchSnafu { + expected: ConcreteDataType::boolean_datatype(), + actual: arg.data_type(), + } + .fail()? + }?; + Ok(Value::from(!bool)) + } + Self::StepTimestamp => { + if let Value::DateTime(datetime) = arg { + let datetime = DateTime::from(datetime.val() + 1); + Ok(Value::from(datetime)) + } else { + TypeMismatchSnafu { + expected: ConcreteDataType::datetime_datatype(), + actual: arg.data_type(), + } + .fail()? + } + } + Self::Cast(to) => { + let arg_ty = arg.data_type(); + let res = cast(arg, to).context({ + CastValueSnafu { + from: arg_ty, + to: to.clone(), + } + })?; + Ok(res) + } + } + } +} + /// TODO(discord9): support more binary functions for more types #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] pub enum BinaryFunc { @@ -96,18 +164,6 @@ pub enum BinaryFunc { ModUInt64, } -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] -pub enum VariadicFunc { - And, - Or, -} - -impl UnaryFunc { - pub fn eval(&self, values: &[Value], expr: &ScalarExpr) -> Result { - todo!() - } -} - impl BinaryFunc { pub fn eval( &self, @@ -115,12 +171,206 @@ impl BinaryFunc { expr1: &ScalarExpr, expr2: &ScalarExpr, ) -> Result { - todo!() + let left = expr1.eval(values)?; + let right = expr2.eval(values)?; + match self { + Self::Eq => Ok(Value::from(left == right)), + Self::NotEq => Ok(Value::from(left != right)), + Self::Lt => Ok(Value::from(left < right)), + Self::Lte => Ok(Value::from(left <= right)), + Self::Gt => Ok(Value::from(left > right)), + Self::Gte => Ok(Value::from(left >= right)), + Self::AddInt16 => Ok(add::(left, right)?), + Self::AddInt32 => Ok(add::(left, right)?), + Self::AddInt64 => Ok(add::(left, right)?), + Self::AddUInt16 => Ok(add::(left, right)?), + Self::AddUInt32 => Ok(add::(left, right)?), + Self::AddUInt64 => Ok(add::(left, right)?), + Self::AddFloat32 => Ok(add::(left, right)?), + Self::AddFloat64 => Ok(add::(left, right)?), + + Self::SubInt16 => Ok(sub::(left, right)?), + Self::SubInt32 => Ok(sub::(left, right)?), + Self::SubInt64 => Ok(sub::(left, right)?), + Self::SubUInt16 => Ok(sub::(left, right)?), + Self::SubUInt32 => Ok(sub::(left, right)?), + Self::SubUInt64 => Ok(sub::(left, right)?), + Self::SubFloat32 => Ok(sub::(left, right)?), + Self::SubFloat64 => Ok(sub::(left, right)?), + + Self::MulInt16 => Ok(mul::(left, right)?), + Self::MulInt32 => Ok(mul::(left, right)?), + Self::MulInt64 => Ok(mul::(left, right)?), + Self::MulUInt16 => Ok(mul::(left, right)?), + Self::MulUInt32 => Ok(mul::(left, right)?), + Self::MulUInt64 => Ok(mul::(left, right)?), + Self::MulFloat32 => Ok(mul::(left, right)?), + Self::MulFloat64 => Ok(mul::(left, right)?), + + Self::DivInt16 => Ok(div::(left, right)?), + Self::DivInt32 => Ok(div::(left, right)?), + Self::DivInt64 => Ok(div::(left, right)?), + Self::DivUInt16 => Ok(div::(left, right)?), + Self::DivUInt32 => Ok(div::(left, right)?), + Self::DivUInt64 => Ok(div::(left, right)?), + Self::DivFloat32 => Ok(div::(left, right)?), + Self::DivFloat64 => Ok(div::(left, right)?), + + Self::ModInt16 => Ok(rem::(left, right)?), + Self::ModInt32 => Ok(rem::(left, right)?), + Self::ModInt64 => Ok(rem::(left, right)?), + Self::ModUInt16 => Ok(rem::(left, right)?), + Self::ModUInt32 => Ok(rem::(left, right)?), + Self::ModUInt64 => Ok(rem::(left, right)?), + } } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] +pub enum VariadicFunc { + And, + Or, +} + impl VariadicFunc { pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { - todo!() + match self { + VariadicFunc::And => and(values, exprs), + VariadicFunc::Or => or(values, exprs), + } + } +} + +fn and(values: &[Value], exprs: &[ScalarExpr]) -> Result { + // If any is false, then return false. Else, if any is null, then return null. Else, return true. + let mut null = false; + let mut err = None; + for expr in exprs { + match expr.eval(values) { + Ok(Value::Boolean(true)) => {} + Ok(Value::Boolean(false)) => return Ok(Value::Boolean(false)), // short-circuit + Ok(Value::Null) => null = true, + Err(this_err) => { + if err.is_none() { + err = Some(this_err) + } + } // retain first error encountered + Ok(x) => InvalidArgumentSnafu { + reason: format!( + "`and()` only support boolean type, found value {:?} of type {:?}", + x, + x.data_type() + ), + } + .fail()?, + } + } + match (err, null) { + (Some(err), _) => Err(err), + (None, true) => Ok(Value::Null), + (None, false) => Ok(Value::Boolean(true)), + } +} + +fn or(values: &[Value], exprs: &[ScalarExpr]) -> Result { + // If any is false, then return false. Else, if any is null, then return null. Else, return true. + let mut null = false; + let mut err = None; + for expr in exprs { + match expr.eval(values) { + Ok(Value::Boolean(true)) => return Ok(Value::Boolean(true)), // short-circuit + Ok(Value::Boolean(false)) => {} // short-circuit + Ok(Value::Null) => null = true, + Err(this_err) => { + if err.is_none() { + err = Some(this_err) + } + } // retain first error encountered + Ok(x) => InvalidArgumentSnafu { + reason: format!( + "`or()` only support boolean type, found value {:?} of type {:?}", + x, + x.data_type() + ), + } + .fail()?, + } } + match (err, null) { + (Some(err), _) => Err(err), + (None, true) => Ok(Value::Null), + (None, false) => Ok(Value::Boolean(false)), + } +} + +fn add(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Add, + Value: From, +{ + let left = T::try_from(left) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let right = T::try_from(right) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + Ok(Value::from(left + right)) +} + +fn sub(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Sub, + Value: From, +{ + let left = T::try_from(left) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let right = T::try_from(right) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + Ok(Value::from(left - right)) +} + +fn mul(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Mul, + Value: From, +{ + let left = T::try_from(left) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let right = T::try_from(right) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + Ok(Value::from(left * right)) +} + +fn div(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Div, + >::Error: std::fmt::Debug, + Value: From, +{ + let left = T::try_from(left) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let right = T::try_from(right) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + Ok(Value::from(left / right)) +} + +fn rem(left: Value, right: Value) -> Result +where + T: TryFrom + std::ops::Rem, + >::Error: std::fmt::Debug, + Value: From, +{ + let left = T::try_from(left) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let right = T::try_from(right) + .map_err(|e| e.to_string()) + .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + Ok(Value::from(left % right)) } From af8a19c328add3b8082aa34dab34826fb65d29f0 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 5 Feb 2024 10:02:36 +0800 Subject: [PATCH 03/14] refactor: simpler trait bound&tests --- Cargo.lock | 1 + src/flow/Cargo.toml | 1 + src/flow/src/expr/func.rs | 40 +++++++++++++++++++++++++++++++++------ 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2a0ced8bceb..cb8f12e9e188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3407,6 +3407,7 @@ dependencies = [ "datatypes", "hydroflow", "itertools 0.10.5", + "num-traits", "serde", "servers", "session", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index f20aa5d07e4c..2b244314cb02 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -17,6 +17,7 @@ common-time.workspace = true datatypes.workspace = true hydroflow = "0.5.0" itertools.workspace = true +num-traits = "0.2" serde.workspace = true servers.workspace = true session.workspace = true diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 232890c2c22d..a027a11b82e7 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::ops::Div; + use common_time::DateTime; use datatypes::data_type::ConcreteDataType; use datatypes::types::cast; @@ -22,7 +24,7 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use super::ScalarExpr; -use crate::expr::error::CastValueSnafu; +use crate::expr::error::{CastValueSnafu, DivisionByZeroSnafu}; use crate::expr::InvalidArgumentSnafu; // TODO(discord9): more function & eval use crate::{ @@ -305,7 +307,7 @@ fn or(values: &[Value], exprs: &[ScalarExpr]) -> Result { fn add(left: Value, right: Value) -> Result where - T: TryFrom + std::ops::Add, + T: TryFrom + num_traits::Num, Value: From, { let left = T::try_from(left) @@ -319,7 +321,7 @@ where fn sub(left: Value, right: Value) -> Result where - T: TryFrom + std::ops::Sub, + T: TryFrom + num_traits::Num, Value: From, { let left = T::try_from(left) @@ -333,7 +335,7 @@ where fn mul(left: Value, right: Value) -> Result where - T: TryFrom + std::ops::Mul, + T: TryFrom + num_traits::Num, Value: From, { let left = T::try_from(left) @@ -347,7 +349,7 @@ where fn div(left: Value, right: Value) -> Result where - T: TryFrom + std::ops::Div, + T: TryFrom + num_traits::Num, >::Error: std::fmt::Debug, Value: From, { @@ -357,12 +359,15 @@ where let right = T::try_from(right) .map_err(|e| e.to_string()) .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + if right.is_zero() { + return Err(DivisionByZeroSnafu {}.build()); + } Ok(Value::from(left / right)) } fn rem(left: Value, right: Value) -> Result where - T: TryFrom + std::ops::Rem, + T: TryFrom + num_traits::Num, >::Error: std::fmt::Debug, Value: From, { @@ -374,3 +379,26 @@ where .map_err(|e| TryFromValueSnafu { msg: e }.build())?; Ok(Value::from(left % right)) } + +#[test] +fn test_num_ops() { + let left = Value::from(10); + let right = Value::from(3); + let res = add::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(13)); + let res = sub::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(7)); + let res = mul::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(30)); + let res = div::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(3)); + let res = rem::(left.clone(), right.clone()).unwrap(); + assert_eq!(res, Value::from(1)); + + let values = vec![Value::from(true), Value::from(false)]; + let exprs = vec![ScalarExpr::Column(0), ScalarExpr::Column(1)]; + let res = and(&values, &exprs).unwrap(); + assert_eq!(res, Value::from(false)); + let res = or(&values, &exprs).unwrap(); + assert_eq!(res, Value::from(true)); +} From 4307ab667e6a3fbff66c7b6545fc0b5d6c354f11 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 5 Feb 2024 16:02:17 +0800 Subject: [PATCH 04/14] chore: remove unused imports --- src/flow/src/expr/func.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index a027a11b82e7..3e638651e36f 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Div; - use common_time::DateTime; use datatypes::data_type::ConcreteDataType; use datatypes::types::cast; @@ -23,10 +21,8 @@ use hydroflow::bincode::Error; use serde::{Deserialize, Serialize}; use snafu::ResultExt; -use super::ScalarExpr; use crate::expr::error::{CastValueSnafu, DivisionByZeroSnafu}; -use crate::expr::InvalidArgumentSnafu; -// TODO(discord9): more function & eval +use crate::expr::{InvalidArgumentSnafu, ScalarExpr}; use crate::{ expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu}, repr::Row, From 1286c95fffffdaf3cf11669d9062e6fad4ab0756 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 5 Feb 2024 16:11:46 +0800 Subject: [PATCH 05/14] chore: fmt --- src/flow/src/expr/func.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 3e638651e36f..20da374ea7ea 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -21,12 +21,11 @@ use hydroflow::bincode::Error; use serde::{Deserialize, Serialize}; use snafu::ResultExt; -use crate::expr::error::{CastValueSnafu, DivisionByZeroSnafu}; -use crate::expr::{InvalidArgumentSnafu, ScalarExpr}; -use crate::{ - expr::error::{EvalError, TryFromValueSnafu, TypeMismatchSnafu}, - repr::Row, +use crate::expr::error::{ + CastValueSnafu, DivisionByZeroSnafu, EvalError, TryFromValueSnafu, TypeMismatchSnafu, }; +use crate::expr::{InvalidArgumentSnafu, ScalarExpr}; +use crate::repr::Row; /// UnmaterializableFunc is a function that can't be eval independently, /// and require special handling From a3bacae151eec55885bce0398141ed5e3275d955 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 5 Feb 2024 16:43:16 +0800 Subject: [PATCH 06/14] refactor: early ret on first error --- src/flow/src/expr/func.rs | 66 +++++++++++---------------------------- 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 20da374ea7ea..b5a00a1bb89d 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -241,16 +241,13 @@ impl VariadicFunc { fn and(values: &[Value], exprs: &[ScalarExpr]) -> Result { // If any is false, then return false. Else, if any is null, then return null. Else, return true. let mut null = false; - let mut err = None; for expr in exprs { match expr.eval(values) { Ok(Value::Boolean(true)) => {} Ok(Value::Boolean(false)) => return Ok(Value::Boolean(false)), // short-circuit Ok(Value::Null) => null = true, Err(this_err) => { - if err.is_none() { - err = Some(this_err) - } + return Err(this_err); } // retain first error encountered Ok(x) => InvalidArgumentSnafu { reason: format!( @@ -262,26 +259,22 @@ fn and(values: &[Value], exprs: &[ScalarExpr]) -> Result { .fail()?, } } - match (err, null) { - (Some(err), _) => Err(err), - (None, true) => Ok(Value::Null), - (None, false) => Ok(Value::Boolean(true)), + match null { + true => Ok(Value::Null), + false => Ok(Value::Boolean(true)), } } fn or(values: &[Value], exprs: &[ScalarExpr]) -> Result { // If any is false, then return false. Else, if any is null, then return null. Else, return true. let mut null = false; - let mut err = None; for expr in exprs { match expr.eval(values) { Ok(Value::Boolean(true)) => return Ok(Value::Boolean(true)), // short-circuit - Ok(Value::Boolean(false)) => {} // short-circuit + Ok(Value::Boolean(false)) => {} Ok(Value::Null) => null = true, Err(this_err) => { - if err.is_none() { - err = Some(this_err) - } + return Err(this_err); } // retain first error encountered Ok(x) => InvalidArgumentSnafu { reason: format!( @@ -293,10 +286,9 @@ fn or(values: &[Value], exprs: &[ScalarExpr]) -> Result { .fail()?, } } - match (err, null) { - (Some(err), _) => Err(err), - (None, true) => Ok(Value::Null), - (None, false) => Ok(Value::Boolean(false)), + match null { + true => Ok(Value::Null), + false => Ok(Value::Boolean(false)), } } @@ -305,12 +297,8 @@ where T: TryFrom + num_traits::Num, Value: From, { - let left = T::try_from(left) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; - let right = T::try_from(right) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; + let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; Ok(Value::from(left + right)) } @@ -319,12 +307,8 @@ where T: TryFrom + num_traits::Num, Value: From, { - let left = T::try_from(left) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; - let right = T::try_from(right) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; + let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; Ok(Value::from(left - right)) } @@ -333,12 +317,8 @@ where T: TryFrom + num_traits::Num, Value: From, { - let left = T::try_from(left) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; - let right = T::try_from(right) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; + let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; Ok(Value::from(left * right)) } @@ -348,12 +328,8 @@ where >::Error: std::fmt::Debug, Value: From, { - let left = T::try_from(left) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; - let right = T::try_from(right) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; + let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; if right.is_zero() { return Err(DivisionByZeroSnafu {}.build()); } @@ -366,12 +342,8 @@ where >::Error: std::fmt::Debug, Value: From, { - let left = T::try_from(left) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; - let right = T::try_from(right) - .map_err(|e| e.to_string()) - .map_err(|e| TryFromValueSnafu { msg: e }.build())?; + let left = T::try_from(left).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; + let right = T::try_from(right).map_err(|e| TryFromValueSnafu { msg: e.to_string() }.build())?; Ok(Value::from(left % right)) } From 1f97f4d4e0554030e04165825c1b88a384b9c6ac Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 5 Feb 2024 17:02:54 +0800 Subject: [PATCH 07/14] refactor: remove abunant match arm --- src/flow/src/expr/func.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index b5a00a1bb89d..b1de428bf25e 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -62,19 +62,7 @@ impl UnaryFunc { Ok(Value::from(!bool)) } Self::IsNull => Ok(Value::from(arg.is_null())), - Self::IsTrue => { - let bool = if let Value::Boolean(bool) = arg { - Ok(bool) - } else { - TypeMismatchSnafu { - expected: ConcreteDataType::boolean_datatype(), - actual: arg.data_type(), - } - .fail()? - }?; - Ok(Value::from(bool)) - } - Self::IsFalse => { + Self::IsTrue | Self::IsFalse => { let bool = if let Value::Boolean(bool) = arg { Ok(bool) } else { From 4a6d8ade34494f0b6ea38279712b9a1782f2c4fe Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 20 Feb 2024 18:01:40 +0800 Subject: [PATCH 08/14] chore: per review --- src/flow/src/expr/func.rs | 7 +- src/flow/src/expr/scalar.rs | 142 ++++++++++++++++++------------------ 2 files changed, 79 insertions(+), 70 deletions(-) diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index b1de428bf25e..4def62ec468b 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -72,7 +72,11 @@ impl UnaryFunc { } .fail()? }?; - Ok(Value::from(!bool)) + if matches!(self, Self::IsTrue) { + Ok(Value::from(bool)) + } else { + Ok(Value::from(!bool)) + } } Self::StepTimestamp => { if let Value::DateTime(datetime) = arg { @@ -165,6 +169,7 @@ impl BinaryFunc { Self::Lte => Ok(Value::from(left <= right)), Self::Gt => Ok(Value::from(left > right)), Self::Gte => Ok(Value::from(left >= right)), + Self::AddInt16 => Ok(add::(left, right)?), Self::AddInt32 => Ok(add::(left, right)?), Self::AddInt64 => Ok(add::(left, right)?), diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index e5b45df766ec..dcaf0dbe5644 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -321,74 +321,78 @@ impl ScalarExpr { } } -#[test] -fn test_extract_bound() { - let test_list = [ - // col(0) == now - ( - ScalarExpr::CallBinary { - func: BinaryFunc::Eq, - expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), - expr2: Box::new(ScalarExpr::Column(0)), - }, - Ok(( - Some(ScalarExpr::Column(0)), - Some(ScalarExpr::CallUnary { - func: UnaryFunc::StepTimestamp, - expr: Box::new(ScalarExpr::Column(0)), - }), - )), - ), - // now < col(0) - ( - ScalarExpr::CallBinary { - func: BinaryFunc::Lt, - expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), - expr2: Box::new(ScalarExpr::Column(0)), - }, - Ok((None, Some(ScalarExpr::Column(0)))), - ), - // now <= col(0) - ( - ScalarExpr::CallBinary { - func: BinaryFunc::Lte, - expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), - expr2: Box::new(ScalarExpr::Column(0)), - }, - Ok(( - None, - Some(ScalarExpr::CallUnary { - func: UnaryFunc::StepTimestamp, - expr: Box::new(ScalarExpr::Column(0)), - }), - )), - ), - // now > col(0) -> now >= col(0) + 1 - ( - ScalarExpr::CallBinary { - func: BinaryFunc::Gt, - expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), - expr2: Box::new(ScalarExpr::Column(0)), - }, - Ok(( - Some(ScalarExpr::CallUnary { - func: UnaryFunc::StepTimestamp, - expr: Box::new(ScalarExpr::Column(0)), - }), - None, - )), - ), - // now >= col(0) - ( - ScalarExpr::CallBinary { - func: BinaryFunc::Gte, - expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), - expr2: Box::new(ScalarExpr::Column(0)), - }, - Ok((Some(ScalarExpr::Column(0)), None)), - ), - ]; - for (expr, expected) in test_list.iter() { - assert_eq!(expr.extract_bound(), *expected); +#[cfg(test)] +mod test { + use super::*; + #[test] + fn test_extract_bound() { + let test_list = [ + // col(0) == now + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Eq, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok(( + Some(ScalarExpr::Column(0)), + Some(ScalarExpr::CallUnary { + func: UnaryFunc::StepTimestamp, + expr: Box::new(ScalarExpr::Column(0)), + }), + )), + ), + // now < col(0) + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Lt, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok((None, Some(ScalarExpr::Column(0)))), + ), + // now <= col(0) + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Lte, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok(( + None, + Some(ScalarExpr::CallUnary { + func: UnaryFunc::StepTimestamp, + expr: Box::new(ScalarExpr::Column(0)), + }), + )), + ), + // now > col(0) -> now >= col(0) + 1 + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Gt, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok(( + Some(ScalarExpr::CallUnary { + func: UnaryFunc::StepTimestamp, + expr: Box::new(ScalarExpr::Column(0)), + }), + None, + )), + ), + // now >= col(0) + ( + ScalarExpr::CallBinary { + func: BinaryFunc::Gte, + expr1: Box::new(ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now)), + expr2: Box::new(ScalarExpr::Column(0)), + }, + Ok((Some(ScalarExpr::Column(0)), None)), + ), + ]; + for (expr, expected) in test_list.iter() { + assert_eq!(expr.extract_bound(), *expected); + } } } From 15f155c9fea69dac5ac99738632c0491c66a44de Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 20 Feb 2024 18:08:29 +0800 Subject: [PATCH 09/14] doc: `support` fn --- src/flow/src/expr/scalar.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index dcaf0dbe5644..29be93eb8b94 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -124,6 +124,7 @@ impl ScalarExpr { }); } + /// Returns the set of columns that are referenced by `self`. pub fn support(&self) -> BTreeSet { let mut support = BTreeSet::new(); self.visit_post_nolimit(&mut |e| { From 4a8bc2189ad02f7ecdeebba03e273ece800ef849 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 20 Feb 2024 18:54:42 +0800 Subject: [PATCH 10/14] chore: per review more --- src/flow/src/expr/error.rs | 3 ++ src/flow/src/expr/func.rs | 23 ++++++++++++- src/flow/src/expr/scalar.rs | 67 +++++++++++++++++++------------------ 3 files changed, 59 insertions(+), 34 deletions(-) diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 0fd58ba1cf8f..233538fb6564 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -58,4 +58,7 @@ pub enum EvalError { #[snafu(display("Optimize error: {reason}"))] Optimize { reason: String, location: Location }, + + #[snafu(display("Unsupported temporal filter: {reason}"))] + UnsupportedTemporalFilter { reason: String, location: Location }, } diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 4def62ec468b..85a127f09a4d 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -22,7 +22,8 @@ use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::expr::error::{ - CastValueSnafu, DivisionByZeroSnafu, EvalError, TryFromValueSnafu, TypeMismatchSnafu, + CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, TryFromValueSnafu, + TypeMismatchSnafu, }; use crate::expr::{InvalidArgumentSnafu, ScalarExpr}; use crate::repr::Row; @@ -214,6 +215,26 @@ impl BinaryFunc { Self::ModUInt64 => Ok(rem::(left, right)?), } } + + /// Reverse the comparison operator, i.e. `a < b` becomes `b > a`, + /// equal and not equal are unchanged. + pub fn reverse_compare(&self) -> Result { + let ret = match &self { + BinaryFunc::Eq => BinaryFunc::Eq, + BinaryFunc::NotEq => BinaryFunc::NotEq, + BinaryFunc::Lt => BinaryFunc::Gt, + BinaryFunc::Lte => BinaryFunc::Gte, + BinaryFunc::Gt => BinaryFunc::Lt, + BinaryFunc::Gte => BinaryFunc::Lte, + _ => { + return InternalSnafu { + reason: format!("Expect a comparison operator, found {:?}", self), + } + .fail(); + } + }; + Ok(ret) + } } #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Hash)] diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 29be93eb8b94..bfbe30a10138 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -18,7 +18,9 @@ use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; use serde::{Deserialize, Serialize}; -use crate::expr::error::{EvalError, InvalidArgumentSnafu, OptimizeSnafu}; +use crate::expr::error::{ + EvalError, InvalidArgumentSnafu, OptimizeSnafu, UnsupportedTemporalFilterSnafu, +}; use crate::expr::func::{BinaryFunc, UnaryFunc, UnmaterializableFunc, VariadicFunc}; /// A scalar expression, which can be evaluated to a value. @@ -194,16 +196,12 @@ impl ScalarExpr { ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) | ScalarExpr::CallUnmaterializable(_) => (), - ScalarExpr::CallUnary { func: _, expr } => f(expr), - ScalarExpr::CallBinary { - func: as_any, - expr1, - expr2, - } => { + ScalarExpr::CallUnary { expr, .. } => f(expr), + ScalarExpr::CallBinary { expr1, expr2, .. } => { f(expr1); f(expr2); } - ScalarExpr::CallVariadic { func: _, exprs } => { + ScalarExpr::CallVariadic { exprs, .. } => { for expr in exprs { f(expr); } @@ -232,12 +230,12 @@ impl ScalarExpr { ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) | ScalarExpr::CallUnmaterializable(_) => (), - ScalarExpr::CallUnary { func, expr } => f(expr), - ScalarExpr::CallBinary { func, expr1, expr2 } => { + ScalarExpr::CallUnary { expr, .. } => f(expr), + ScalarExpr::CallBinary { expr1, expr2, .. } => { f(expr1); f(expr2); } - ScalarExpr::CallVariadic { func, exprs } => { + ScalarExpr::CallVariadic { exprs, .. } => { for expr in exprs { f(expr); } @@ -269,12 +267,12 @@ impl ScalarExpr { /// /// false for lower bound, true for upper bound /// TODO(discord9): allow simple transform like `now() + a < b` to `now() < b - a` - pub fn extract_bound(&self) -> Result<(Option, Option), String> { + pub fn extract_bound(&self) -> Result<(Option, Option), EvalError> { let unsupported_err = |msg: &str| { - Err(format!( - "Unsupported temporal predicate: {msg}. NOTE: Use `now()` in direct comparison: {:?}", - self - )) + UnsupportedTemporalFilterSnafu { + reason: msg.to_string(), + } + .fail() }; if let Self::CallBinary { mut func, @@ -282,37 +280,34 @@ impl ScalarExpr { mut expr2, } = self.clone() { - if !(expr1.contains_temporal() ^ expr2.contains_temporal()) { + let expr_1_contains_now = expr1.contains_temporal(); + let expr_2_contains_now = expr2.contains_temporal(); + if !(expr_1_contains_now ^ expr_2_contains_now) { return unsupported_err("one side of the comparison must be `now()`"); } - if !expr1.contains_temporal() + if !expr_1_contains_now && *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) { std::mem::swap(&mut expr1, &mut expr2); - func = match func { - BinaryFunc::Eq => BinaryFunc::Eq, - BinaryFunc::NotEq => BinaryFunc::NotEq, - BinaryFunc::Lt => BinaryFunc::Gt, - BinaryFunc::Lte => BinaryFunc::Gte, - BinaryFunc::Gt => BinaryFunc::Lt, - BinaryFunc::Gte => BinaryFunc::Lte, - _ => { - return unsupported_err("The top level operator must be comparison"); - } - }; + func = BinaryFunc::reverse_compare(&func)?; } // TODO: support simple transform like `now() + a < b` to `now() < b - a` - if expr2.contains_temporal() + if expr_2_contains_now || *expr1 != ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) { return unsupported_err("None of the sides of the comparison is `now()`"); } let step = |expr: ScalarExpr| expr.call_unary(UnaryFunc::StepTimestamp); match func { + // now == expr2 -> now <= expr2 && now < expr2 + 1 BinaryFunc::Eq => Ok((Some(*expr2.clone()), Some(step(*expr2)))), + // now < expr2 -> now < expr2 BinaryFunc::Lt => Ok((None, Some(*expr2))), + // now <= expr2 -> now < expr2 + 1 BinaryFunc::Lte => Ok((None, Some(step(*expr2)))), + // now > expr2 -> now >= expr2 + 1 BinaryFunc::Gt => Ok((Some(step(*expr2)), None)), + // now >= expr2 -> now >= expr2 BinaryFunc::Gte => Ok((Some(*expr2), None)), _ => unreachable!("Already checked"), } @@ -327,7 +322,7 @@ mod test { use super::*; #[test] fn test_extract_bound() { - let test_list = [ + let test_list: [(ScalarExpr, Result<_, EvalError>); 5] = [ // col(0) == now ( ScalarExpr::CallBinary { @@ -392,8 +387,14 @@ mod test { Ok((Some(ScalarExpr::Column(0)), None)), ), ]; - for (expr, expected) in test_list.iter() { - assert_eq!(expr.extract_bound(), *expected); + for (expr, expected) in test_list.into_iter() { + let actual = expr.extract_bound(); + // EvalError is not Eq, so we need to compare the error message + match (actual, expected) { + (Ok(l), Ok(r)) => assert_eq!(l, r), + (Err(l), Err(r)) => assert!(matches!(l, r)), + (l, r) => panic!("expected: {:?}, actual: {:?}", r, l), + } } } } From 42da3ad6900c7ff83981fa60d27cb84148d82f63 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Wed, 21 Feb 2024 10:02:01 +0800 Subject: [PATCH 11/14] chore: more per review --- src/flow/src/expr/scalar.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index bfbe30a10138..ada441aad296 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -282,21 +282,27 @@ impl ScalarExpr { { let expr_1_contains_now = expr1.contains_temporal(); let expr_2_contains_now = expr2.contains_temporal(); + if !(expr_1_contains_now ^ expr_2_contains_now) { return unsupported_err("one side of the comparison must be `now()`"); } - if !expr_1_contains_now - && *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) - { - std::mem::swap(&mut expr1, &mut expr2); - func = BinaryFunc::reverse_compare(&func)?; - } + // TODO: support simple transform like `now() + a < b` to `now() < b - a` - if expr_2_contains_now - || *expr1 != ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) - { + + let expr1_is_now = + *expr1 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); + let expr2_is_now = + *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); + + if expr1_is_now ^ expr2_is_now { return unsupported_err("None of the sides of the comparison is `now()`"); } + + if expr2_is_now { + std::mem::swap(&mut expr1, &mut expr2); + func = BinaryFunc::reverse_compare(&func)?; + } + let step = |expr: ScalarExpr| expr.call_unary(UnaryFunc::StepTimestamp); match func { // now == expr2 -> now <= expr2 && now < expr2 + 1 From 24d2d501aa1ca89067f9353fb3a900abf1039e0f Mon Sep 17 00:00:00 2001 From: Discord9 Date: Wed, 21 Feb 2024 10:23:46 +0800 Subject: [PATCH 12/14] fix: extract_bound --- src/flow/src/expr/scalar.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index ada441aad296..1d5d4df3bdb2 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -294,7 +294,7 @@ impl ScalarExpr { let expr2_is_now = *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); - if expr1_is_now ^ expr2_is_now { + if !(expr1_is_now ^ expr2_is_now) { return unsupported_err("None of the sides of the comparison is `now()`"); } From f106fc885093ea7b326d6437aba398f462f010b7 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Wed, 21 Feb 2024 11:09:10 +0800 Subject: [PATCH 13/14] chore: per review --- src/flow/src/expr/scalar.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 1d5d4df3bdb2..d720e961607e 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -127,7 +127,7 @@ impl ScalarExpr { } /// Returns the set of columns that are referenced by `self`. - pub fn support(&self) -> BTreeSet { + pub fn get_all_ref_columns(&self) -> BTreeSet { let mut support = BTreeSet::new(); self.visit_post_nolimit(&mut |e| { if let ScalarExpr::Column(i) = e { @@ -280,13 +280,6 @@ impl ScalarExpr { mut expr2, } = self.clone() { - let expr_1_contains_now = expr1.contains_temporal(); - let expr_2_contains_now = expr2.contains_temporal(); - - if !(expr_1_contains_now ^ expr_2_contains_now) { - return unsupported_err("one side of the comparison must be `now()`"); - } - // TODO: support simple transform like `now() + a < b` to `now() < b - a` let expr1_is_now = From f721059932a64b704512891996c0a8c436f47fe9 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Wed, 21 Feb 2024 16:37:21 +0800 Subject: [PATCH 14/14] refactor: reduce nest --- src/flow/src/expr/scalar.rs | 59 ++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index d720e961607e..fa03bb9f1912 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -274,44 +274,43 @@ impl ScalarExpr { } .fail() }; - if let Self::CallBinary { + + let Self::CallBinary { mut func, mut expr1, mut expr2, } = self.clone() - { - // TODO: support simple transform like `now() + a < b` to `now() < b - a` + else { + return unsupported_err("Not a binary expression"); + }; - let expr1_is_now = - *expr1 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); - let expr2_is_now = - *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); + // TODO: support simple transform like `now() + a < b` to `now() < b - a` - if !(expr1_is_now ^ expr2_is_now) { - return unsupported_err("None of the sides of the comparison is `now()`"); - } + let expr1_is_now = *expr1 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); + let expr2_is_now = *expr2 == ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now); - if expr2_is_now { - std::mem::swap(&mut expr1, &mut expr2); - func = BinaryFunc::reverse_compare(&func)?; - } + if !(expr1_is_now ^ expr2_is_now) { + return unsupported_err("None of the sides of the comparison is `now()`"); + } - let step = |expr: ScalarExpr| expr.call_unary(UnaryFunc::StepTimestamp); - match func { - // now == expr2 -> now <= expr2 && now < expr2 + 1 - BinaryFunc::Eq => Ok((Some(*expr2.clone()), Some(step(*expr2)))), - // now < expr2 -> now < expr2 - BinaryFunc::Lt => Ok((None, Some(*expr2))), - // now <= expr2 -> now < expr2 + 1 - BinaryFunc::Lte => Ok((None, Some(step(*expr2)))), - // now > expr2 -> now >= expr2 + 1 - BinaryFunc::Gt => Ok((Some(step(*expr2)), None)), - // now >= expr2 -> now >= expr2 - BinaryFunc::Gte => Ok((Some(*expr2), None)), - _ => unreachable!("Already checked"), - } - } else { - unsupported_err("None of the sides of the comparison is `now()`") + if expr2_is_now { + std::mem::swap(&mut expr1, &mut expr2); + func = BinaryFunc::reverse_compare(&func)?; + } + + let step = |expr: ScalarExpr| expr.call_unary(UnaryFunc::StepTimestamp); + match func { + // now == expr2 -> now <= expr2 && now < expr2 + 1 + BinaryFunc::Eq => Ok((Some(*expr2.clone()), Some(step(*expr2)))), + // now < expr2 -> now < expr2 + BinaryFunc::Lt => Ok((None, Some(*expr2))), + // now <= expr2 -> now < expr2 + 1 + BinaryFunc::Lte => Ok((None, Some(step(*expr2)))), + // now > expr2 -> now >= expr2 + 1 + BinaryFunc::Gt => Ok((Some(step(*expr2)), None)), + // now >= expr2 -> now >= expr2 + BinaryFunc::Gte => Ok((Some(*expr2), None)), + _ => unreachable!("Already checked"), } } }