From 46927e3fb42e26ecd2fb0e713229f74e95904fc6 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 6 Feb 2024 10:20:49 +0800 Subject: [PATCH 01/14] feat: mfp impls --- src/flow/src/expr/linear.rs | 429 ++++++++++++++++++++++++++++++++++++ 1 file changed, 429 insertions(+) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index eddb98c49e1b..a65fa463f24b 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -62,12 +62,270 @@ pub struct MapFilterProject { pub input_arity: usize, } +impl MapFilterProject { + /// Create a no-op operator for an input of a supplied arity. + pub fn new(input_arity: usize) -> Self { + Self { + expressions: Vec::new(), + predicates: Vec::new(), + projection: (0..input_arity).collect(), + input_arity, + } + } + + /// Given two mfps, return an mfp that applies one + /// followed by the other. + /// Note that the arguments are in the opposite order + /// from how function composition is usually written in mathematics. + pub fn compose(before: Self, after: Self) -> Self { + let (m, f, p) = after.into_map_filter_project(); + before.map(m).filter(f).project(p) + } + + /// True if the operator describes the identity transformation. + pub fn is_identity(&self) -> bool { + self.expressions.is_empty() + && self.predicates.is_empty() + && self.projection.len() == self.input_arity + && self.projection.iter().enumerate().all(|(i, p)| i == *p) + } + + /// Retain only the indicated columns in the presented order. + pub fn project(mut self, columns: I) -> Self + where + I: IntoIterator + std::fmt::Debug, + { + self.projection = columns.into_iter().map(|c| self.projection[c]).collect(); + self + } + + /// Retain only rows satisfying these predicates. + /// + /// This method introduces predicates as eagerly as they can be evaluated, + /// which may not be desired for predicates that may cause exceptions. + /// If fine manipulation is required, the predicates can be added manually. + pub fn filter(mut self, predicates: I) -> Self + where + I: IntoIterator, + { + for mut predicate in predicates { + // Correct column references. + predicate.permute(&self.projection[..]); + + // Validate column references. + assert!(predicate + .support() + .into_iter() + .all(|c| c < self.input_arity + self.expressions.len())); + + // Insert predicate as eagerly as it can be evaluated: + // just after the largest column in its support is formed. + let max_support = predicate + .support() + .into_iter() + .max() + .map(|c| c + 1) + .unwrap_or(0); + self.predicates.push((max_support, predicate)) + } + // Stable sort predicates by position at which they take effect. + self.predicates + .sort_by_key(|(position, predicate)| *position); + self + } + + /// Append the result of evaluating expressions to each row. + pub fn map(mut self, expressions: I) -> Self + where + I: IntoIterator, + { + for mut expression in expressions { + // Correct column references. + expression.permute(&self.projection[..]); + + // Validate column references. + assert!(expression + .support() + .into_iter() + .all(|c| c < self.input_arity + self.expressions.len())); + + // Introduce expression and produce as output. + self.expressions.push(expression); + self.projection + .push(self.input_arity + self.expressions.len() - 1); + } + + self + } + + /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning. + pub fn into_map_filter_project(self) -> (Vec, Vec, Vec) { + let predicates = self + .predicates + .into_iter() + .map(|(_pos, predicate)| predicate) + .collect(); + (self.expressions, predicates, self.projection) + } + + /// As the arguments to `Map`, `Filter`, and `Project` operators. + /// + /// In principle, this operator can be implemented as a sequence of + /// more elemental operators, likely less efficiently. + pub fn as_map_filter_project(&self) -> (Vec, Vec, Vec) { + self.clone().into_map_filter_project() + } +} + +impl MapFilterProject { + pub fn optimize(&mut self) { + // TODO(discord9): optimize + } + + /// Convert the `MapFilterProject` into a staged evaluation plan. + /// + /// The main behavior is extract temporal predicates, which cannot be evaluated + /// using the standard machinery. + pub fn into_plan(self) -> Result { + MfpPlan::create_from(self) + } + + /// Lists input columns whose values are used in outputs. + /// + /// It is entirely appropriate to determine the demand of an instance + /// and then both apply a projection to the subject of the instance and + /// `self.permute` this instance. + pub fn demand(&self) -> BTreeSet { + let mut demanded = BTreeSet::new(); + for (_index, pred) in self.predicates.iter() { + demanded.extend(pred.support()); + } + demanded.extend(self.projection.iter().cloned()); + for index in (0..self.expressions.len()).rev() { + if demanded.contains(&(self.input_arity + index)) { + demanded.extend(self.expressions[index].support()); + } + } + demanded.retain(|col| col < &self.input_arity); + demanded + } + + /// Update input column references, due to an input projection or permutation. + /// + /// The `shuffle` argument remaps expected column identifiers to new locations, + /// with the expectation that `shuffle` describes all input columns, and so the + /// intermediate results will be able to start at position `shuffle.len()`. + /// + /// The supplied `shuffle` may not list columns that are not "demanded" by the + /// instance, and so we should ensure that `self` is optimized to not reference + /// columns that are not demanded. + pub fn permute(&mut self, mut shuffle: BTreeMap, new_input_arity: usize) { + let (mut map, mut filter, mut project) = self.as_map_filter_project(); + for index in 0..map.len() { + // Intermediate columns are just shifted. + shuffle.insert(self.input_arity + index, new_input_arity + index); + } + for expr in map.iter_mut() { + expr.permute_map(&shuffle); + } + for pred in filter.iter_mut() { + pred.permute_map(&shuffle); + } + for proj in project.iter_mut() { + assert!(shuffle[proj] < new_input_arity + map.len()); + *proj = shuffle[proj]; + } + *self = Self::new(new_input_arity) + .map(map) + .filter(filter) + .project(project) + } +} + /// A wrapper type which indicates it is safe to simply evaluate all expressions. #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct SafeMfpPlan { pub(crate) mfp: MapFilterProject, } +impl SafeMfpPlan { + pub fn permute(&mut self, map: BTreeMap, new_arity: usize) { + self.mfp.permute(map, new_arity); + } + + /// Evaluates the linear operator on a supplied list of datums. + /// + /// The arguments are the initial datums associated with the row, + /// and an appropriately lifetimed arena for temporary allocations + /// needed by scalar evaluation. + /// + /// An `Ok` result will either be `None` if any predicate did not + /// evaluate to `Value::Boolean(true)`, or the values of the columns listed + /// by `self.projection` if all predicates passed. If an error + /// occurs in the evaluation it is returned as an `Err` variant. + /// As the evaluation exits early with failed predicates, it may + /// miss some errors that would occur later in evaluation. + /// + /// The `row` is not cleared first, but emptied if the function + /// returns `Ok(Some(row)). + #[inline(always)] + pub fn evaluate_into( + &self, + values: &mut Vec, + row_buf: &mut Row, + ) -> Result, EvalError> { + let passed_predicates = self.evaluate_inner(values)?; + if !passed_predicates { + Ok(None) + } else { + row_buf.clear(); + row_buf.extend(self.mfp.projection.iter().map(|c| values[*c].clone())); + Ok(Some(row_buf.clone())) + } + } + + /// A version of `evaluate` which produces an iterator over `Datum` + /// as output. + /// + /// This version can be useful when one wants to capture the resulting + /// datums without packing and then unpacking a row. + #[inline(always)] + pub fn evaluate_iter<'a>( + &'a self, + datums: &'a mut Vec, + ) -> Result + 'a>, EvalError> { + let passed_predicates = self.evaluate_inner(datums)?; + if !passed_predicates { + Ok(None) + } else { + Ok(Some( + self.mfp.projection.iter().map(move |i| datums[*i].clone()), + )) + } + } + + /// Populates `values` with `self.expressions` and tests `self.predicates`. + /// + /// This does not apply `self.projection`, which is up to the calling method. + pub fn evaluate_inner(&self, values: &mut Vec) -> Result { + let mut expression = 0; + for (support, predicate) in self.mfp.predicates.iter() { + while self.mfp.input_arity + expression < *support { + values.push(self.mfp.expressions[expression].eval(&values[..])?); + expression += 1; + } + if predicate.eval(&values[..])? != Value::Boolean(true) { + return Ok(false); + } + } + while expression < self.mfp.expressions.len() { + values.push(self.mfp.expressions[expression].eval(&values[..])?); + expression += 1; + } + Ok(true) + } +} + impl std::ops::Deref for SafeMfpPlan { type Target = MapFilterProject; fn deref(&self) -> &Self::Target { @@ -94,3 +352,174 @@ pub struct MfpPlan { /// Expressions that when evaluated upper-bound `MzNow`. pub(crate) upper_bounds: Vec, } + +impl MfpPlan { + /// find `now` in `predicates` and put them into lower/upper temporal bounds for temporal filter to use + pub fn create_from(mut mfp: MapFilterProject) -> Result { + let mut lower_bounds = Vec::new(); + let mut upper_bounds = Vec::new(); + + let mut temporal = Vec::new(); + + // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`. + mfp.optimize(); + + mfp.predicates.retain(|(_position, predicate)| { + if predicate.contains_temporal() { + temporal.push(predicate.clone()); + false + } else { + true + } + }); + for predicate in temporal { + let (lower, upper) = predicate.extract_bound()?; + lower_bounds.extend(lower); + upper_bounds.extend(upper); + } + Ok(Self { + mfp: SafeMfpPlan { mfp }, + lower_bounds, + upper_bounds, + }) + } + + /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs. + pub fn is_identity(&self) -> bool { + self.mfp.mfp.is_identity() && self.lower_bounds.is_empty() && self.upper_bounds.is_empty() + } + + /// if `lower_bound <= sys_time < upper_bound`, return `[(data, sys_time, +1), (data, min_upper_bound, -1)]` + /// + /// else if `sys_time < lower_bound`, return `[(data, lower_bound, +1), (data, min_upper_bound, -1)]` + /// + /// else if `sys_time >= upper_bound`, return `[None, None]` + /// + /// if eval error appeal in any of those process, corresponding result will be `Err` + pub fn evaluate>( + &self, + values: &mut Vec, + sys_time: repr::Timestamp, + diff: Diff, + ) -> impl Iterator> + { + match self.mfp.evaluate_inner(values) { + Err(e) => { + return Some(Err((e.into(), sys_time, diff))) + .into_iter() + .chain(None); + } + Ok(true) => {} + Ok(false) => { + return None.into_iter().chain(None); + } + } + + let mut lower_bound = sys_time; + let mut upper_bound = None; + + // Track whether we have seen a null in either bound, as this should + // prevent the record from being produced at any time. + let mut null_eval = false; + let ret_err = |e: EvalError| { + Some(Err((e.into(), sys_time, diff))) + .into_iter() + .chain(None) + }; + for l in self.lower_bounds.iter() { + match l.eval(values) { + Ok(v) => { + if v.is_null() { + null_eval = true; + continue; + } + match value_to_internal_ts(v) { + Ok(ts) => lower_bound = lower_bound.max(ts), + Err(e) => return ret_err(e), + } + } + Err(e) => return ret_err(e), + }; + } + + for u in self.upper_bounds.iter() { + if upper_bound != Some(lower_bound) { + match u.eval(values) { + Err(e) => return ret_err(e), + Ok(val) => { + if val.is_null() { + null_eval = true; + continue; + } + let ts = match value_to_internal_ts(val) { + Ok(ts) => ts, + Err(e) => return ret_err(e), + }; + if let Some(upper) = upper_bound { + upper_bound = Some(upper.min(ts)); + } else { + upper_bound = Some(ts); + } + // Force the upper bound to be at least the lower + // bound. + if upper_bound.is_some() && upper_bound < Some(lower_bound) { + upper_bound = Some(lower_bound); + } + } + } + } + } + + if Some(lower_bound) != upper_bound && !null_eval { + let res_row = Row::pack(self.mfp.mfp.projection.iter().map(|c| values[*c].clone())); + let upper_opt = + upper_bound.map(|upper_bound| Ok((res_row.clone(), upper_bound, -diff))); + // if diff==-1, the `upper_opt` will cancel the future `-1` inserted before by previous diff==1 row + let lower = Some(Ok((res_row, lower_bound, diff))); + + lower.into_iter().chain(upper_opt) + } else { + None.into_iter().chain(None) + } + } +} + +#[test] +fn test_mfp() { + use crate::expr::func::BinaryFunc; + let mfp = MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), + ScalarExpr::Column(1).call_binary(ScalarExpr::Column(2), BinaryFunc::Lt), + ]) + .project(vec![3, 4]); + assert!(!mfp.is_identity()); + let mfp = MapFilterProject::compose(mfp, MapFilterProject::new(2)); + { + let mfp_0 = mfp.as_map_filter_project(); + let same = MapFilterProject::new(3) + .map(mfp_0.0) + .filter(mfp_0.1) + .project(mfp_0.2); + assert_eq!(mfp, same); + } + assert_eq!(mfp.demand().len(), 3); + let mut mfp = mfp; + mfp.permute(BTreeMap::from([(0, 2), (2, 0), (1, 1)]), 3); + assert_eq!( + mfp, + MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), + ScalarExpr::Column(1).call_binary(ScalarExpr::Column(0), BinaryFunc::Lt), + ]) + .project(vec![3, 4]) + ); + let safe_mfp = SafeMfpPlan { mfp }; + let mut values = vec![Value::from(4), Value::from(2), Value::from(3)]; + let ret = safe_mfp + .evaluate_into(&mut values, &mut Row::empty()) + .unwrap() + .unwrap(); + assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)])); +} From 68a299d876ffe438c433ffc0550ffec0cfc1c744 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Thu, 22 Feb 2024 16:18:20 +0800 Subject: [PATCH 02/14] fix: after rebase --- src/flow/src/expr/linear.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index a65fa463f24b..449aac0d4a4f 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -114,14 +114,14 @@ impl MapFilterProject { // Validate column references. assert!(predicate - .support() + .get_all_ref_columns() .into_iter() .all(|c| c < self.input_arity + self.expressions.len())); // Insert predicate as eagerly as it can be evaluated: // just after the largest column in its support is formed. let max_support = predicate - .support() + .get_all_ref_columns() .into_iter() .max() .map(|c| c + 1) @@ -145,7 +145,7 @@ impl MapFilterProject { // Validate column references. assert!(expression - .support() + .get_all_ref_columns() .into_iter() .all(|c| c < self.input_arity + self.expressions.len())); @@ -186,7 +186,7 @@ impl MapFilterProject { /// /// The main behavior is extract temporal predicates, which cannot be evaluated /// using the standard machinery. - pub fn into_plan(self) -> Result { + pub fn into_plan(self) -> Result { MfpPlan::create_from(self) } @@ -198,12 +198,12 @@ impl MapFilterProject { pub fn demand(&self) -> BTreeSet { let mut demanded = BTreeSet::new(); for (_index, pred) in self.predicates.iter() { - demanded.extend(pred.support()); + demanded.extend(pred.get_all_ref_columns()); } demanded.extend(self.projection.iter().cloned()); for index in (0..self.expressions.len()).rev() { if demanded.contains(&(self.input_arity + index)) { - demanded.extend(self.expressions[index].support()); + demanded.extend(self.expressions[index].get_all_ref_columns()); } } demanded.retain(|col| col < &self.input_arity); @@ -355,7 +355,7 @@ pub struct MfpPlan { impl MfpPlan { /// find `now` in `predicates` and put them into lower/upper temporal bounds for temporal filter to use - pub fn create_from(mut mfp: MapFilterProject) -> Result { + pub fn create_from(mut mfp: MapFilterProject) -> Result { let mut lower_bounds = Vec::new(); let mut upper_bounds = Vec::new(); From 8d12739c5bbe2a337a546ad2b11f793be78a2bc4 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Fri, 23 Feb 2024 11:37:24 +0800 Subject: [PATCH 03/14] test: temporal filter mfp --- src/flow/src/expr/linear.rs | 140 ++++++++++++++++++++++++++---------- 1 file changed, 104 insertions(+), 36 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 449aac0d4a4f..19c8cd9a3b29 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -48,8 +48,10 @@ pub struct MapFilterProject { /// Each entry is prepended with a column identifier indicating /// the column *before* which the predicate should first be applied. /// Most commonly this would be one plus the largest column identifier - /// in the predicate's support, but it could be larger to implement + /// in the predicate's referred columns, but it could be larger to implement /// guarded evaluation of predicates. + /// Put it in another word, the first element of the tuple means + /// the predicates can't be evaluated until that number of columns is formed. /// /// This list should be sorted by the first field. pub predicates: Vec<(usize, ScalarExpr)>, @@ -372,6 +374,7 @@ impl MfpPlan { true } }); + for predicate in temporal { let (lower, upper) = predicate.extract_bound()?; lower_bounds.extend(lower); @@ -484,42 +487,107 @@ impl MfpPlan { } } -#[test] -fn test_mfp() { - use crate::expr::func::BinaryFunc; - let mfp = MapFilterProject::new(3) - .map(vec![ - ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), - ScalarExpr::Column(1).call_binary(ScalarExpr::Column(2), BinaryFunc::Lt), - ]) - .project(vec![3, 4]); - assert!(!mfp.is_identity()); - let mfp = MapFilterProject::compose(mfp, MapFilterProject::new(2)); - { - let mfp_0 = mfp.as_map_filter_project(); - let same = MapFilterProject::new(3) - .map(mfp_0.0) - .filter(mfp_0.1) - .project(mfp_0.2); - assert_eq!(mfp, same); +#[cfg(test)] +mod test { + use datatypes::data_type::ConcreteDataType; + use itertools::Itertools; + + use super::*; + use crate::expr::UnmaterializableFunc; + #[test] + fn test_mfp_with_time() { + use crate::expr::func::BinaryFunc; + let lte_now = ScalarExpr::Column(0).call_binary( + ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now), + BinaryFunc::Lte, + ); + assert!(lte_now.contains_temporal()); + + let gt_now_minus_two = ScalarExpr::Column(0) + .call_binary( + ScalarExpr::Literal(Value::from(2i64), ConcreteDataType::int64_datatype()), + BinaryFunc::AddInt64, + ) + .call_binary( + ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now), + BinaryFunc::Gt, + ); + assert!(gt_now_minus_two.contains_temporal()); + + let mfp = MapFilterProject::new(3) + .filter(vec![ + // col(0) <= now() + lte_now, + // col(0) + 2 > now() + gt_now_minus_two, + ]) + .project(vec![0]); + + let mfp = MfpPlan::create_from(mfp).unwrap(); + let expected = vec![ + ( + 0, + vec![ + (Row::new(vec![Value::from(4i64)]), 4, 1), + (Row::new(vec![Value::from(4i64)]), 6, -1), + ], + ), + ( + 5, + vec![ + (Row::new(vec![Value::from(4i64)]), 5, 1), + (Row::new(vec![Value::from(4i64)]), 6, -1), + ], + ), + (10, vec![]), + ]; + for (sys_time, expected) in expected { + let mut values = vec![Value::from(4i64), Value::from(2i64), Value::from(3i64)]; + let ret = mfp + .evaluate::(&mut values, sys_time, 1) + .collect::, _>>() + .unwrap(); + assert_eq!(ret, expected); + } } - assert_eq!(mfp.demand().len(), 3); - let mut mfp = mfp; - mfp.permute(BTreeMap::from([(0, 2), (2, 0), (1, 1)]), 3); - assert_eq!( - mfp, - MapFilterProject::new(3) + + #[test] + fn test_mfp() { + use crate::expr::func::BinaryFunc; + let mfp = MapFilterProject::new(3) .map(vec![ - ScalarExpr::Column(2).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), - ScalarExpr::Column(1).call_binary(ScalarExpr::Column(0), BinaryFunc::Lt), + ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), + ScalarExpr::Column(1).call_binary(ScalarExpr::Column(2), BinaryFunc::Lt), ]) - .project(vec![3, 4]) - ); - let safe_mfp = SafeMfpPlan { mfp }; - let mut values = vec![Value::from(4), Value::from(2), Value::from(3)]; - let ret = safe_mfp - .evaluate_into(&mut values, &mut Row::empty()) - .unwrap() - .unwrap(); - assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)])); + .project(vec![3, 4]); + assert!(!mfp.is_identity()); + let mfp = MapFilterProject::compose(mfp, MapFilterProject::new(2)); + { + let mfp_0 = mfp.as_map_filter_project(); + let same = MapFilterProject::new(3) + .map(mfp_0.0) + .filter(mfp_0.1) + .project(mfp_0.2); + assert_eq!(mfp, same); + } + assert_eq!(mfp.demand().len(), 3); + let mut mfp = mfp; + mfp.permute(BTreeMap::from([(0, 2), (2, 0), (1, 1)]), 3); + assert_eq!( + mfp, + MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), + ScalarExpr::Column(1).call_binary(ScalarExpr::Column(0), BinaryFunc::Lt), + ]) + .project(vec![3, 4]) + ); + let safe_mfp = SafeMfpPlan { mfp }; + let mut values = vec![Value::from(4), Value::from(2), Value::from(3)]; + let ret = safe_mfp + .evaluate_into(&mut values, &mut Row::empty()) + .unwrap() + .unwrap(); + assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)])); + } } From 1779e139609c7698b4753522c05d51bee3ed61a6 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 26 Feb 2024 12:57:47 +0800 Subject: [PATCH 04/14] refactor: more comments&test --- src/flow/src/expr/linear.rs | 173 +++++++++++++++++++++++++++++------- 1 file changed, 141 insertions(+), 32 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 19c8cd9a3b29..17aa6dbd446a 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -16,9 +16,10 @@ use std::collections::{BTreeMap, BTreeSet}; use datatypes::value::Value; use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt}; use crate::expr::error::EvalError; -use crate::expr::{Id, LocalId, ScalarExpr}; +use crate::expr::{Id, InvalidArgumentSnafu, LocalId, ScalarExpr}; use crate::repr::{self, value_to_internal_ts, Diff, Row}; /// A compound operator that can be applied row-by-row. @@ -79,26 +80,41 @@ impl MapFilterProject { /// followed by the other. /// Note that the arguments are in the opposite order /// from how function composition is usually written in mathematics. - pub fn compose(before: Self, after: Self) -> Self { + pub fn compose(before: Self, after: Self) -> Result { let (m, f, p) = after.into_map_filter_project(); - before.map(m).filter(f).project(p) + before.map(m)?.filter(f)?.project(p) } /// True if the operator describes the identity transformation. pub fn is_identity(&self) -> bool { self.expressions.is_empty() && self.predicates.is_empty() + // identity if projection is the identity permutation && self.projection.len() == self.input_arity && self.projection.iter().enumerate().all(|(i, p)| i == *p) } /// Retain only the indicated columns in the presented order. - pub fn project(mut self, columns: I) -> Self + pub fn project(mut self, columns: I) -> Result where I: IntoIterator + std::fmt::Debug, { - self.projection = columns.into_iter().map(|c| self.projection[c]).collect(); - self + // + self.projection = columns + .into_iter() + .map(|c| self.projection.get(c).cloned().ok_or(c)) + .collect::, _>>() + .map_err(|c| { + InvalidArgumentSnafu { + reason: format!( + "column index {} out of range, expected at most {} columns", + c, + self.projection.len() + ), + } + .build() + })?; + Ok(self) } /// Retain only rows satisfying these predicates. @@ -106,7 +122,7 @@ impl MapFilterProject { /// This method introduces predicates as eagerly as they can be evaluated, /// which may not be desired for predicates that may cause exceptions. /// If fine manipulation is required, the predicates can be added manually. - pub fn filter(mut self, predicates: I) -> Self + pub fn filter(mut self, predicates: I) -> Result where I: IntoIterator, { @@ -115,15 +131,24 @@ impl MapFilterProject { predicate.permute(&self.projection[..]); // Validate column references. - assert!(predicate - .get_all_ref_columns() - .into_iter() - .all(|c| c < self.input_arity + self.expressions.len())); + let referred_columns = predicate.get_all_ref_columns(); + for c in referred_columns.iter() { + // current row len include input columns and previous number of expressions + let cur_row_len = self.input_arity + self.expressions.len(); + ensure!( + *c < cur_row_len, + InvalidArgumentSnafu { + reason: format!( + "column index {} out of range, expected at most {} columns", + c, cur_row_len + ) + } + ); + } // Insert predicate as eagerly as it can be evaluated: // just after the largest column in its support is formed. - let max_support = predicate - .get_all_ref_columns() + let max_support = referred_columns .into_iter() .max() .map(|c| c + 1) @@ -133,11 +158,11 @@ impl MapFilterProject { // Stable sort predicates by position at which they take effect. self.predicates .sort_by_key(|(position, predicate)| *position); - self + Ok(self) } /// Append the result of evaluating expressions to each row. - pub fn map(mut self, expressions: I) -> Self + pub fn map(mut self, expressions: I) -> Result where I: IntoIterator, { @@ -146,18 +171,28 @@ impl MapFilterProject { expression.permute(&self.projection[..]); // Validate column references. - assert!(expression - .get_all_ref_columns() - .into_iter() - .all(|c| c < self.input_arity + self.expressions.len())); + for c in expression.get_all_ref_columns().into_iter() { + // current row len include input columns and previous number of expressions + let current_row_len = self.input_arity + self.expressions.len(); + ensure!( + c < current_row_len, + InvalidArgumentSnafu { + reason: format!( + "column index {} out of range, expected at most {} columns", + c, current_row_len + ) + } + ); + } // Introduce expression and produce as output. self.expressions.push(expression); - self.projection - .push(self.input_arity + self.expressions.len() - 1); + // Expression by default is projected to output. + let cur_expr_col_num = self.input_arity + self.expressions.len() - 1; + self.projection.push(cur_expr_col_num); } - self + Ok(self) } /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning. @@ -199,15 +234,21 @@ impl MapFilterProject { /// `self.permute` this instance. pub fn demand(&self) -> BTreeSet { let mut demanded = BTreeSet::new(); + // first, get all columns referenced by predicates for (_index, pred) in self.predicates.iter() { demanded.extend(pred.get_all_ref_columns()); } + // then, get columns referenced by projection which is direct output demanded.extend(self.projection.iter().cloned()); + + // check every expressions, if a expression is contained in demanded, then all columns it referenced should be added to demanded for index in (0..self.expressions.len()).rev() { if demanded.contains(&(self.input_arity + index)) { demanded.extend(self.expressions[index].get_all_ref_columns()); } } + + // only keep demanded columns that are in input demanded.retain(|col| col < &self.input_arity); demanded } @@ -221,7 +262,12 @@ impl MapFilterProject { /// The supplied `shuffle` may not list columns that are not "demanded" by the /// instance, and so we should ensure that `self` is optimized to not reference /// columns that are not demanded. - pub fn permute(&mut self, mut shuffle: BTreeMap, new_input_arity: usize) { + pub fn permute( + &mut self, + mut shuffle: BTreeMap, + new_input_arity: usize, + ) -> Result<(), EvalError> { + // decompose self into map, filter, project for ease of manipulation let (mut map, mut filter, mut project) = self.as_map_filter_project(); for index in 0..map.len() { // Intermediate columns are just shifted. @@ -233,14 +279,24 @@ impl MapFilterProject { for pred in filter.iter_mut() { pred.permute_map(&shuffle); } + let new_row_len = new_input_arity + map.len(); for proj in project.iter_mut() { - assert!(shuffle[proj] < new_input_arity + map.len()); + ensure!( + shuffle[proj] < new_row_len, + InvalidArgumentSnafu { + reason: format!( + "shuffled column index {} out of range, expected at most {} columns", + shuffle[proj], new_row_len + ) + } + ); *proj = shuffle[proj]; } *self = Self::new(new_input_arity) - .map(map) - .filter(filter) - .project(project) + .map(map)? + .filter(filter)? + .project(project)?; + Ok(()) } } @@ -251,6 +307,7 @@ pub struct SafeMfpPlan { } impl SafeMfpPlan { + /// See [`MapFilterProject::permute`]. pub fn permute(&mut self, map: BTreeMap, new_arity: usize) { self.mfp.permute(map, new_arity); } @@ -320,6 +377,7 @@ impl SafeMfpPlan { return Ok(false); } } + // while evaluated expressions are less than total expressions, keep evaluating while expression < self.mfp.expressions.len() { values.push(self.mfp.expressions[expression].eval(&values[..])?); expression += 1; @@ -493,7 +551,7 @@ mod test { use itertools::Itertools; use super::*; - use crate::expr::UnmaterializableFunc; + use crate::expr::{BinaryFunc, UnmaterializableFunc}; #[test] fn test_mfp_with_time() { use crate::expr::func::BinaryFunc; @@ -521,7 +579,9 @@ mod test { // col(0) + 2 > now() gt_now_minus_two, ]) - .project(vec![0]); + .unwrap() + .project(vec![0]) + .unwrap(); let mfp = MfpPlan::create_from(mfp).unwrap(); let expected = vec![ @@ -559,15 +619,20 @@ mod test { ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), ScalarExpr::Column(1).call_binary(ScalarExpr::Column(2), BinaryFunc::Lt), ]) - .project(vec![3, 4]); + .unwrap() + .project(vec![3, 4]) + .unwrap(); assert!(!mfp.is_identity()); - let mfp = MapFilterProject::compose(mfp, MapFilterProject::new(2)); + let mfp = MapFilterProject::compose(mfp, MapFilterProject::new(2)).unwrap(); { let mfp_0 = mfp.as_map_filter_project(); let same = MapFilterProject::new(3) .map(mfp_0.0) + .unwrap() .filter(mfp_0.1) - .project(mfp_0.2); + .unwrap() + .project(mfp_0.2) + .unwrap(); assert_eq!(mfp, same); } assert_eq!(mfp.demand().len(), 3); @@ -580,7 +645,9 @@ mod test { ScalarExpr::Column(2).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt), ScalarExpr::Column(1).call_binary(ScalarExpr::Column(0), BinaryFunc::Lt), ]) + .unwrap() .project(vec![3, 4]) + .unwrap() ); let safe_mfp = SafeMfpPlan { mfp }; let mut values = vec![Value::from(4), Value::from(2), Value::from(3)]; @@ -590,4 +657,46 @@ mod test { .unwrap(); assert_eq!(ret, Row::pack(vec![Value::from(false), Value::from(true)])); } + + #[test] + fn manipulation_mfp() { + // give a input of 4 columns + let mfp = MapFilterProject::new(4); + // append a expression to the mfp'input row that get the sum of the first 3 columns + let mfp = mfp + .map(vec![ScalarExpr::Column(0) + .call_binary(ScalarExpr::Column(1), BinaryFunc::AddInt32) + .call_binary(ScalarExpr::Column(2), BinaryFunc::AddInt32)]) + .unwrap(); + // only retain sum result + let mfp = mfp.project(vec![4]).unwrap(); + // accept only if if the sum is greater than 10 + let mfp = mfp + .filter(vec![ScalarExpr::Column(0).call_binary( + ScalarExpr::Literal(Value::from(10i32), ConcreteDataType::int32_datatype()), + BinaryFunc::Gt, + )]) + .unwrap(); + let mut input1 = vec![ + Value::from(4), + Value::from(2), + Value::from(3), + Value::from("abc"), + ]; + let safe_mfp = SafeMfpPlan { mfp }; + let ret = safe_mfp + .evaluate_into(&mut input1, &mut Row::empty()) + .unwrap(); + assert_eq!(ret, None); + let mut input2 = vec![ + Value::from(5), + Value::from(2), + Value::from(4), + Value::from("abc"), + ]; + let ret = safe_mfp + .evaluate_into(&mut input2, &mut Row::empty()) + .unwrap(); + assert_eq!(ret, Some(Row::pack(vec![Value::from(11)]))); + } } From 6f427c44bf3f0ce57d3b2af87286c436773d3176 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 26 Feb 2024 16:19:24 +0800 Subject: [PATCH 05/14] test: permute --- src/flow/src/expr/linear.rs | 72 ++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 17aa6dbd446a..a7b5402066f0 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -267,16 +267,63 @@ impl MapFilterProject { mut shuffle: BTreeMap, new_input_arity: usize, ) -> Result<(), EvalError> { + // check shuffle is valid + let demand = self.demand(); + for d in demand { + ensure!( + shuffle.contains_key(&d), + InvalidArgumentSnafu { + reason: format!( + "Demanded column {} is not in shuffle's keys: {:?}", + d, + shuffle.keys() + ) + } + ); + } + if shuffle.len() > new_input_arity { + return InvalidArgumentSnafu { + reason: format!( + "shuffle's length {} is greater than new_input_arity {}", + shuffle.len(), + new_input_arity + ), + } + .fail(); + } + // decompose self into map, filter, project for ease of manipulation let (mut map, mut filter, mut project) = self.as_map_filter_project(); for index in 0..map.len() { // Intermediate columns are just shifted. shuffle.insert(self.input_arity + index, new_input_arity + index); } + + let shuffle_keys = shuffle.keys().cloned().collect::>(); for expr in map.iter_mut() { + if !expr.get_all_ref_columns().is_subset(&shuffle_keys) { + return InvalidArgumentSnafu { + reason: format!( + "Expression's referred columns {:?} is not a subset of shuffle's keys {:?}", + expr.get_all_ref_columns(), + shuffle.keys() + ), + } + .fail(); + } expr.permute_map(&shuffle); } for pred in filter.iter_mut() { + if !pred.get_all_ref_columns().is_subset(&shuffle_keys) { + return InvalidArgumentSnafu { + reason: format!( + "Predicate's referred columns {:?} is not a subset of shuffle's keys {:?}", + pred.get_all_ref_columns(), + shuffle.keys() + ), + } + .fail(); + } pred.permute_map(&shuffle); } let new_row_len = new_input_arity + map.len(); @@ -637,7 +684,8 @@ mod test { } assert_eq!(mfp.demand().len(), 3); let mut mfp = mfp; - mfp.permute(BTreeMap::from([(0, 2), (2, 0), (1, 1)]), 3); + mfp.permute(BTreeMap::from([(0, 2), (2, 0), (1, 1)]), 3) + .unwrap(); assert_eq!( mfp, MapFilterProject::new(3) @@ -699,4 +747,26 @@ mod test { .unwrap(); assert_eq!(ret, Some(Row::pack(vec![Value::from(11)]))); } + + #[test] + fn test_permute() { + let mfp = MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt) + ]) + .unwrap() + .filter(vec![ + ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt) + ]) + .unwrap() + .project(vec![0, 1]) + .unwrap(); + assert_eq!(mfp.demand(), BTreeSet::from([0, 1])); + let mut less = mfp.clone(); + less.permute(BTreeMap::from([(1, 0), (0, 1)]), 2).unwrap(); + + let mut more = mfp.clone(); + more.permute(BTreeMap::from([(0, 1), (1, 2), (2, 0)]), 4) + .unwrap(); + } } From 409b8273694b86436f7b5a2c3e494fda2cbb07fe Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 26 Feb 2024 17:06:33 +0800 Subject: [PATCH 06/14] fix: check input len when eval --- src/flow/src/expr/linear.rs | 49 ++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index a7b5402066f0..154405807b4f 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -380,7 +380,18 @@ impl SafeMfpPlan { values: &mut Vec, row_buf: &mut Row, ) -> Result, EvalError> { + if values.len() != self.mfp.input_arity { + return InvalidArgumentSnafu { + reason: format!( + "values length {} is not equal to input_arity {}", + values.len(), + self.mfp.input_arity + ), + } + .fail(); + } let passed_predicates = self.evaluate_inner(values)?; + if !passed_predicates { Ok(None) } else { @@ -598,7 +609,7 @@ mod test { use itertools::Itertools; use super::*; - use crate::expr::{BinaryFunc, UnmaterializableFunc}; + use crate::expr::{BinaryFunc, UnaryFunc, UnmaterializableFunc}; #[test] fn test_mfp_with_time() { use crate::expr::func::BinaryFunc; @@ -769,4 +780,40 @@ mod test { more.permute(BTreeMap::from([(0, 1), (1, 2), (2, 0)]), 4) .unwrap(); } + + #[test] + fn mfp_test_cast_and_filter() { + let mfp = MapFilterProject::new(3) + .map(vec![ScalarExpr::Column(0).call_unary(UnaryFunc::Cast( + ConcreteDataType::int32_datatype(), + ))]) + .unwrap() + .filter(vec![ + ScalarExpr::Column(3).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt) + ]) + .unwrap() + .project([0, 1, 2]) + .unwrap(); + let mut input1 = vec![ + Value::from(4i64), + Value::from(2), + Value::from(3), + Value::from(53), + ]; + let safe_mfp = SafeMfpPlan { mfp }; + let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty()); + assert!(matches!(ret, Err(EvalError::InvalidArgument { .. }))); + + let mut input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)]; + let ret = safe_mfp + .evaluate_into(&mut input2.clone(), &mut Row::empty()) + .unwrap(); + assert_eq!(ret, Some(Row::new(input2))); + + let mut input3 = vec![Value::from(4i64), Value::from(5), Value::from(2)]; + let ret = safe_mfp + .evaluate_into(&mut input3, &mut Row::empty()) + .unwrap(); + assert_eq!(ret, None); + } } From cb70e7ae08b654d36951128febe2ca8ca47116ca Mon Sep 17 00:00:00 2001 From: Discord9 Date: Mon, 26 Feb 2024 18:52:05 +0800 Subject: [PATCH 07/14] refactor: err handle&docs: more explain graph --- src/flow/src/expr/linear.rs | 121 +++++++++++++++++++++++++++++++++--- src/flow/src/expr/scalar.rs | 31 ++++++++- 2 files changed, 142 insertions(+), 10 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 154405807b4f..c453064bb6e6 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -28,7 +28,7 @@ use crate::repr::{self, value_to_internal_ts, Diff, Row}; /// It applies a sequences of map expressions, which are allowed to /// refer to previous expressions, interleaved with predicates which /// must be satisfied for an output to be produced. If all predicates -/// evaluate to `Datum::True` the data at the identified columns are +/// evaluate to `Value::Boolean(True)` the data at the identified columns are /// collected and produced as output in a packed `Row`. /// /// This operator is a "builder" and its contents may contain expressions @@ -95,11 +95,32 @@ impl MapFilterProject { } /// Retain only the indicated columns in the presented order. + /// + /// i.e. before: `self.projection = [2, 1, 0], columns = [0, 1]` + /// ```mermaid + /// flowchart TD + /// col-0 + /// col-1 + /// col-2 + /// projection --> |2|col-0 + /// projection --> |1|col-1 + /// projection --> |0|col-2 + /// ``` + /// + /// after: `self.projection = [2, 1]` + /// ```mermaid + /// flowchart TD + /// col-0 + /// col-1 + /// col-2 + /// projection --> |2|col-0 + /// projection --> |1|col-1 + /// projection -.-> col-2 + /// ``` pub fn project(mut self, columns: I) -> Result where I: IntoIterator + std::fmt::Debug, { - // self.projection = columns .into_iter() .map(|c| self.projection.get(c).cloned().ok_or(c)) @@ -122,13 +143,42 @@ impl MapFilterProject { /// This method introduces predicates as eagerly as they can be evaluated, /// which may not be desired for predicates that may cause exceptions. /// If fine manipulation is required, the predicates can be added manually. + /// + /// simply added to the end of the predicates list + /// + /// while paying attention to column references maintained by `self.projection` + /// + /// so, before `self.projection = [2, 1, 0], filter = [0]+[1]`: + /// ```mermaid + /// flowchart TD + /// col-0 + /// col-1 + /// col-2 + /// projection --> |2|col-0 + /// projection --> |1|col-1 + /// projection --> |0| col-2 + /// filter("filter:[0]+[1]") + /// ``` + /// becomes: + /// ```mermaid + /// flowchart TD + /// col-0 + /// col-1 + /// col-2 + /// projection --> |2|col-0 + /// projection --> |1|col-1 + /// projection --> |0| col-2 + /// filter("filter:[0]+[1]") + /// filter -->|0|col-2 + /// filter -->|1|col-1 + /// ``` pub fn filter(mut self, predicates: I) -> Result where I: IntoIterator, { for mut predicate in predicates { // Correct column references. - predicate.permute(&self.projection[..]); + predicate.permute(&self.projection[..])?; // Validate column references. let referred_columns = predicate.get_all_ref_columns(); @@ -162,13 +212,43 @@ impl MapFilterProject { } /// Append the result of evaluating expressions to each row. + /// + /// simply append `expressions` to `self.expressions` + /// + /// while paying attention to column references maintained by `self.projection` + /// + /// hence, before apply map with a previously non-trivial projection would be like: + /// before: + /// ```mermaid + /// flowchart TD + /// col-0 + /// col-1 + /// col-2 + /// projection --> |2|col-0 + /// projection --> |1|col-1 + /// projection --> |0| col-2 + /// map("map:[0]+[1]") + /// ``` + /// after apply map: + /// ```mermaid + /// flowchart TD + /// col-0 + /// col-1 + /// col-2 + /// projection --> |2|col-0 + /// projection --> |1|col-1 + /// projection --> |0| col-2 + /// map("map:[0]+[1]") + /// map -->|0|col-2 + /// map -->|1|col-1 + /// ``` pub fn map(mut self, expressions: I) -> Result where I: IntoIterator, { for mut expression in expressions { // Correct column references. - expression.permute(&self.projection[..]); + expression.permute(&self.projection[..])?; // Validate column references. for c in expression.get_all_ref_columns().into_iter() { @@ -311,7 +391,7 @@ impl MapFilterProject { } .fail(); } - expr.permute_map(&shuffle); + expr.permute_map(&shuffle)?; } for pred in filter.iter_mut() { if !pred.get_all_ref_columns().is_subset(&shuffle_keys) { @@ -324,7 +404,7 @@ impl MapFilterProject { } .fail(); } - pred.permute_map(&shuffle); + pred.permute_map(&shuffle)?; } let new_row_len = new_input_arity + map.len(); for proj in project.iter_mut() { @@ -355,8 +435,12 @@ pub struct SafeMfpPlan { impl SafeMfpPlan { /// See [`MapFilterProject::permute`]. - pub fn permute(&mut self, map: BTreeMap, new_arity: usize) { - self.mfp.permute(map, new_arity); + pub fn permute( + &mut self, + map: BTreeMap, + new_arity: usize, + ) -> Result<(), EvalError> { + self.mfp.permute(map, new_arity) } /// Evaluates the linear operator on a supplied list of datums. @@ -816,4 +900,25 @@ mod test { .unwrap(); assert_eq!(ret, None); } + + #[test] + fn test_mfp_out_of_order() { + let mfp = MapFilterProject::new(3) + .project(vec![2, 1, 0]) + .unwrap() + .filter(vec![ + ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Gt) + ]) + .unwrap() + .map(vec![ + ScalarExpr::Column(0).call_binary(ScalarExpr::Column(1), BinaryFunc::Lt) + ]) + .unwrap() + .project(vec![3]) + .unwrap(); + let mut input1 = vec![Value::from(2), Value::from(3), Value::from(4)]; + let safe_mfp = SafeMfpPlan { mfp }; + let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty()); + assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)]))); + } } diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index fa03bb9f1912..71dfedd6aa32 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -105,12 +105,26 @@ impl ScalarExpr { /// This method is applicable even when `permutation` is not a /// strict permutation, and it only needs to have entries for /// each column referenced in `self`. - pub fn permute(&mut self, permutation: &[usize]) { + pub fn permute(&mut self, permutation: &[usize]) -> Result<(), EvalError> { + if self + .get_all_ref_columns() + .into_iter() + .any(|i| i >= permutation.len()) + { + return InvalidArgumentSnafu { + reason: format!( + "permutation {:?} is not a valid permutation for expression {:?}", + permutation, self + ), + } + .fail(); + } self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { *old_i = permutation[*old_i]; } }); + Ok(()) } /// Rewrites column indices with their value in `permutation`. @@ -118,12 +132,25 @@ impl ScalarExpr { /// This method is applicable even when `permutation` is not a /// strict permutation, and it only needs to have entries for /// each column referenced in `self`. - pub fn permute_map(&mut self, permutation: &BTreeMap) { + pub fn permute_map(&mut self, permutation: &BTreeMap) -> Result<(), EvalError> { + if !self + .get_all_ref_columns() + .is_subset(&permutation.keys().cloned().collect()) + { + return InvalidArgumentSnafu { + reason: format!( + "permutation {:?} is not a valid permutation for expression {:?}", + permutation, self + ), + } + .fail(); + } self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { *old_i = permutation[old_i]; } }); + Ok(()) } /// Returns the set of columns that are referenced by `self`. From 3f2336fc4818a8bcb627e2db3d19f27d8bc3bded Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 27 Feb 2024 10:32:59 +0800 Subject: [PATCH 08/14] docs: better flowchart map,filter,project --- src/flow/src/expr/linear.rs | 63 ++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index c453064bb6e6..977ca785db9b 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -96,15 +96,15 @@ impl MapFilterProject { /// Retain only the indicated columns in the presented order. /// - /// i.e. before: `self.projection = [2, 1, 0], columns = [0, 1]` + /// i.e. before: `self.projection = [1, 2, 0], columns = [1, 0]` /// ```mermaid /// flowchart TD /// col-0 /// col-1 /// col-2 + /// projection --> |0|col-1 + /// projection --> |1|col-2 /// projection --> |2|col-0 - /// projection --> |1|col-1 - /// projection --> |0|col-2 /// ``` /// /// after: `self.projection = [2, 1]` @@ -113,9 +113,14 @@ impl MapFilterProject { /// col-0 /// col-1 /// col-2 - /// projection --> |2|col-0 - /// projection --> |1|col-1 - /// projection -.-> col-2 + /// project("project:[1,2,0]") + /// project + /// project -->|0| col-1 + /// project -->|1| col-2 + /// project -->|2| col-0 + /// new_project("apply new project:[1,0]") + /// new_project -->|0| col-2 + /// new_project -->|1| col-1 /// ``` pub fn project(mut self, columns: I) -> Result where @@ -148,29 +153,21 @@ impl MapFilterProject { /// /// while paying attention to column references maintained by `self.projection` /// - /// so, before `self.projection = [2, 1, 0], filter = [0]+[1]`: - /// ```mermaid - /// flowchart TD - /// col-0 - /// col-1 - /// col-2 - /// projection --> |2|col-0 - /// projection --> |1|col-1 - /// projection --> |0| col-2 - /// filter("filter:[0]+[1]") - /// ``` + /// so `self.projection = [1, 2, 0], filter = [0]+[1]>0`: /// becomes: /// ```mermaid /// flowchart TD /// col-0 /// col-1 /// col-2 - /// projection --> |2|col-0 - /// projection --> |1|col-1 - /// projection --> |0| col-2 - /// filter("filter:[0]+[1]") - /// filter -->|0|col-2 - /// filter -->|1|col-1 + /// project("first project:[1,2,0]") + /// project + /// project -->|0| col-1 + /// project -->|1| col-2 + /// project -->|2| col-0 + /// filter("then filter:[0]+[1]>0") + /// filter -->|0| col-1 + /// filter --> |1| col-2 /// ``` pub fn filter(mut self, predicates: I) -> Result where @@ -224,10 +221,9 @@ impl MapFilterProject { /// col-0 /// col-1 /// col-2 + /// projection --> |0|col-1 + /// projection --> |1|col-2 /// projection --> |2|col-0 - /// projection --> |1|col-1 - /// projection --> |0| col-2 - /// map("map:[0]+[1]") /// ``` /// after apply map: /// ```mermaid @@ -235,12 +231,15 @@ impl MapFilterProject { /// col-0 /// col-1 /// col-2 - /// projection --> |2|col-0 - /// projection --> |1|col-1 - /// projection --> |0| col-2 - /// map("map:[0]+[1]") - /// map -->|0|col-2 - /// map -->|1|col-1 + /// project("project:[1,2,0]") + /// project + /// project -->|0| col-1 + /// project -->|1| col-2 + /// project -->|2| col-0 + /// map("map:[0]/[1]/[2]") + /// map -->|0|col-1 + /// map -->|1|col-2 + /// map -->|2|col-0 /// ``` pub fn map(mut self, expressions: I) -> Result where From 7f86d4ac2eb50be80251edbfcf408ee2e54dd9a8 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 27 Feb 2024 10:54:32 +0800 Subject: [PATCH 09/14] refactor: visit_* falliable --- src/flow/src/expr/scalar.rs | 123 ++++++++++++++++++------------------ 1 file changed, 63 insertions(+), 60 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 71dfedd6aa32..9013e997668a 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -82,7 +82,7 @@ impl ScalarExpr { match self { ScalarExpr::Column(index) => Ok(values[*index].clone()), ScalarExpr::Literal(row_res, _ty) => Ok(row_res.clone()), - ScalarExpr::CallUnmaterializable(f) => OptimizeSnafu { + ScalarExpr::CallUnmaterializable(_) => OptimizeSnafu { reason: "Can't eval unmaterializable function".to_string(), } .fail(), @@ -106,24 +106,23 @@ impl ScalarExpr { /// strict permutation, and it only needs to have entries for /// each column referenced in `self`. pub fn permute(&mut self, permutation: &[usize]) -> Result<(), EvalError> { - if self - .get_all_ref_columns() - .into_iter() - .any(|i| i >= permutation.len()) - { - return InvalidArgumentSnafu { - reason: format!( - "permutation {:?} is not a valid permutation for expression {:?}", - permutation, self - ), - } - .fail(); - } self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { - *old_i = permutation[*old_i]; + permutation + .get(*old_i) + .map(|new_i| *old_i = *new_i) + .ok_or_else(|| { + InvalidArgumentSnafu { + reason: format!( + "permutation {:?} is not a valid permutation for expression {:?}", + permutation, e + ), + } + .build() + })?; } - }); + Ok(()) + })?; Ok(()) } @@ -133,24 +132,23 @@ impl ScalarExpr { /// strict permutation, and it only needs to have entries for /// each column referenced in `self`. pub fn permute_map(&mut self, permutation: &BTreeMap) -> Result<(), EvalError> { - if !self - .get_all_ref_columns() - .is_subset(&permutation.keys().cloned().collect()) - { - return InvalidArgumentSnafu { - reason: format!( - "permutation {:?} is not a valid permutation for expression {:?}", - permutation, self - ), - } - .fail(); - } self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { - *old_i = permutation[old_i]; + permutation + .get(old_i) + .map(|new_i| *old_i = *new_i) + .ok_or_else(|| { + InvalidArgumentSnafu { + reason: format!( + "permutation {:?} is not a valid permutation for expression {:?}", + permutation, e + ), + } + .build() + })?; } - }); - Ok(()) + Ok(()) + }) } /// Returns the set of columns that are referenced by `self`. @@ -160,7 +158,9 @@ impl ScalarExpr { if let ScalarExpr::Column(i) = e { support.insert(*i); } - }); + Ok(()) + }) + .unwrap(); support } @@ -207,70 +207,72 @@ impl ScalarExpr { impl ScalarExpr { /// visit post-order without stack call limit, but may cause stack overflow - fn visit_post_nolimit(&self, f: &mut F) + fn visit_post_nolimit(&self, f: &mut F) -> Result<(), EvalError> where - F: FnMut(&Self), + F: FnMut(&Self) -> Result<(), EvalError>, { - self.visit_children(|e| e.visit_post_nolimit(f)); - f(self); + self.visit_children(|e| e.visit_post_nolimit(f))?; + f(self) } - fn visit_children(&self, mut f: F) + fn visit_children(&self, mut f: F) -> Result<(), EvalError> where - F: FnMut(&Self), + F: FnMut(&Self) -> Result<(), EvalError>, { match self { ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) - | ScalarExpr::CallUnmaterializable(_) => (), + | ScalarExpr::CallUnmaterializable(_) => Ok(()), ScalarExpr::CallUnary { expr, .. } => f(expr), ScalarExpr::CallBinary { expr1, expr2, .. } => { - f(expr1); - f(expr2); + f(expr1)?; + f(expr2) } ScalarExpr::CallVariadic { exprs, .. } => { for expr in exprs { - f(expr); + f(expr)?; } + Ok(()) } ScalarExpr::If { cond, then, els } => { - f(cond); - f(then); - f(els); + f(cond)?; + f(then)?; + f(els) } } } - fn visit_mut_post_nolimit(&mut self, f: &mut F) + fn visit_mut_post_nolimit(&mut self, f: &mut F) -> Result<(), EvalError> where - F: FnMut(&mut Self), + F: FnMut(&mut Self) -> Result<(), EvalError>, { - self.visit_mut_children(|e: &mut Self| e.visit_mut_post_nolimit(f)); - f(self); + self.visit_mut_children(|e: &mut Self| e.visit_mut_post_nolimit(f))?; + f(self) } - fn visit_mut_children(&mut self, mut f: F) + fn visit_mut_children(&mut self, mut f: F) -> Result<(), EvalError> where - F: FnMut(&mut Self), + F: FnMut(&mut Self) -> Result<(), EvalError>, { match self { ScalarExpr::Column(_) | ScalarExpr::Literal(_, _) - | ScalarExpr::CallUnmaterializable(_) => (), + | ScalarExpr::CallUnmaterializable(_) => Ok(()), ScalarExpr::CallUnary { expr, .. } => f(expr), ScalarExpr::CallBinary { expr1, expr2, .. } => { - f(expr1); - f(expr2); + f(expr1)?; + f(expr2) } ScalarExpr::CallVariadic { exprs, .. } => { for expr in exprs { - f(expr); + f(expr)?; } + Ok(()) } ScalarExpr::If { cond, then, els } => { - f(cond); - f(then); - f(els); + f(cond)?; + f(then)?; + f(els) } } } @@ -284,7 +286,9 @@ impl ScalarExpr { if let ScalarExpr::CallUnmaterializable(UnmaterializableFunc::Now) = e { contains = true; } - }); + Ok(()) + }) + .unwrap(); contains } @@ -417,7 +421,6 @@ mod test { // EvalError is not Eq, so we need to compare the error message match (actual, expected) { (Ok(l), Ok(r)) => assert_eq!(l, r), - (Err(l), Err(r)) => assert!(matches!(l, r)), (l, r) => panic!("expected: {:?}, actual: {:?}", r, l), } } From 729d0857e712e065ff3bbd85d7b583aad1e659cf Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 27 Feb 2024 10:54:59 +0800 Subject: [PATCH 10/14] chore: better temp lint allow --- src/flow/src/expr/linear.rs | 4 ++-- src/flow/src/lib.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 977ca785db9b..fb171371738b 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -204,7 +204,7 @@ impl MapFilterProject { } // Stable sort predicates by position at which they take effect. self.predicates - .sort_by_key(|(position, predicate)| *position); + .sort_by_key(|(position, _predicate)| *position); Ok(self) } @@ -887,7 +887,7 @@ mod test { let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty()); assert!(matches!(ret, Err(EvalError::InvalidArgument { .. }))); - let mut input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)]; + let input2 = vec![Value::from(4i64), Value::from(2), Value::from(3)]; let ret = safe_mfp .evaluate_into(&mut input2.clone(), &mut Row::empty()) .unwrap(); diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index a60310504764..c144f8ab50be 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![allow(unused)] +#![allow(dead_code)] +#![allow(unused_imports)] // allow unused for now because it should be use later mod adapter; mod expr; From 8bc7f95ae0accce7a8ce6b0d5bb17e16c0551487 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 27 Feb 2024 11:47:44 +0800 Subject: [PATCH 11/14] fix: permute partially --- src/flow/src/expr/scalar.rs | 68 ++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 9013e997668a..1f369c5314f8 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -106,20 +106,23 @@ impl ScalarExpr { /// strict permutation, and it only needs to have entries for /// each column referenced in `self`. pub fn permute(&mut self, permutation: &[usize]) -> Result<(), EvalError> { + // check first so that we don't end up with a partially permuted expression + if self + .get_all_ref_columns() + .into_iter() + .any(|i| i >= permutation.len()) + { + return InvalidArgumentSnafu { + reason: format!( + "permutation {:?} is not a valid permutation for expression {:?}", + permutation, self + ), + } + .fail(); + } self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { - permutation - .get(*old_i) - .map(|new_i| *old_i = *new_i) - .ok_or_else(|| { - InvalidArgumentSnafu { - reason: format!( - "permutation {:?} is not a valid permutation for expression {:?}", - permutation, e - ), - } - .build() - })?; + *old_i = permutation[*old_i]; } Ok(()) })?; @@ -132,20 +135,22 @@ impl ScalarExpr { /// strict permutation, and it only needs to have entries for /// each column referenced in `self`. pub fn permute_map(&mut self, permutation: &BTreeMap) -> Result<(), EvalError> { + // check first so that we don't end up with a partially permuted expression + if !self + .get_all_ref_columns() + .is_subset(&permutation.keys().cloned().collect()) + { + return InvalidArgumentSnafu { + reason: format!( + "permutation {:?} is not a valid permutation for expression {:?}", + permutation, self + ), + } + .fail(); + } self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { - permutation - .get(old_i) - .map(|new_i| *old_i = *new_i) - .ok_or_else(|| { - InvalidArgumentSnafu { - reason: format!( - "permutation {:?} is not a valid permutation for expression {:?}", - permutation, e - ), - } - .build() - })?; + *old_i = permutation[old_i]; } Ok(()) }) @@ -348,6 +353,8 @@ impl ScalarExpr { #[cfg(test)] mod test { + use datatypes::arrow::array::Scalar; + use super::*; #[test] fn test_extract_bound() { @@ -425,4 +432,17 @@ mod test { } } } + + #[test] + fn test_bad_permute() { + let mut expr = ScalarExpr::Column(4); + let permutation = vec![1, 2, 3]; + let res = expr.permute(&permutation); + assert!(matches!(res, Err(EvalError::InvalidArgument { .. }))); + + let mut expr = ScalarExpr::Column(0); + let permute_map = BTreeMap::from([(1, 2), (3, 4)]); + let res = expr.permute_map(&permute_map); + assert!(matches!(res, Err(EvalError::InvalidArgument { .. }))); + } } From cf47ddab1e5c2626edc4ca8e6eb03e249692eb50 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 27 Feb 2024 14:26:15 +0800 Subject: [PATCH 12/14] chore: remove duplicated checks --- src/flow/src/expr/linear.rs | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index fb171371738b..f2cc0a7f9820 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -378,31 +378,10 @@ impl MapFilterProject { shuffle.insert(self.input_arity + index, new_input_arity + index); } - let shuffle_keys = shuffle.keys().cloned().collect::>(); for expr in map.iter_mut() { - if !expr.get_all_ref_columns().is_subset(&shuffle_keys) { - return InvalidArgumentSnafu { - reason: format!( - "Expression's referred columns {:?} is not a subset of shuffle's keys {:?}", - expr.get_all_ref_columns(), - shuffle.keys() - ), - } - .fail(); - } expr.permute_map(&shuffle)?; } for pred in filter.iter_mut() { - if !pred.get_all_ref_columns().is_subset(&shuffle_keys) { - return InvalidArgumentSnafu { - reason: format!( - "Predicate's referred columns {:?} is not a subset of shuffle's keys {:?}", - pred.get_all_ref_columns(), - shuffle.keys() - ), - } - .fail(); - } pred.permute_map(&shuffle)?; } let new_row_len = new_input_arity + map.len(); From 46adf02e2d9c903946e632d9f61f8b6754c54ee5 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 27 Feb 2024 15:36:04 +0800 Subject: [PATCH 13/14] docs: more explain&tests for clarity --- src/flow/src/expr/linear.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index f2cc0a7f9820..31492ab812c6 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -24,6 +24,15 @@ use crate::repr::{self, value_to_internal_ts, Diff, Row}; /// A compound operator that can be applied row-by-row. /// +/// In practice, this operator is a sequence of map, filter, and project in arbitrary order, +/// which can and is stored by reordering the sequence's +/// apply order to a `map` first, `filter` second and `project` third order. +/// +/// input is a row(a sequence of values), which is also being used for store intermediate results, +/// like `map` operator can append new columns to the row according to it's expressions, +/// `filter` operator decide whether this entire row can even be output by decide whether the row satisfy the predicates, +/// `project` operator decide which columns of the row should be output. +/// /// This operator integrates the map, filter, and project operators. /// It applies a sequences of map expressions, which are allowed to /// refer to previous expressions, interleaved with predicates which @@ -899,4 +908,16 @@ mod test { let ret = safe_mfp.evaluate_into(&mut input1, &mut Row::empty()); assert_eq!(ret.unwrap(), Some(Row::new(vec![Value::from(false)]))); } + #[test] + fn test_mfp_chore() { + // project keeps permute columns until it becomes the identity permutation + let mfp = MapFilterProject::new(3) + .project([1, 2, 0]) + .unwrap() + .project([1, 2, 0]) + .unwrap() + .project([1, 2, 0]) + .unwrap(); + assert_eq!(mfp, MapFilterProject::new(3)); + } } From f0148379693a6ba4d6496fbf4d7bd5e18232adb0 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Tue, 27 Feb 2024 16:01:07 +0800 Subject: [PATCH 14/14] refactor: use ensure! instead --- src/flow/src/expr/linear.rs | 20 ++++++++++---------- src/flow/src/expr/scalar.rs | 29 ++++++++++++++--------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 31492ab812c6..830195f5b2ec 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -369,16 +369,16 @@ impl MapFilterProject { } ); } - if shuffle.len() > new_input_arity { - return InvalidArgumentSnafu { + ensure!( + shuffle.len() <= new_input_arity, + InvalidArgumentSnafu { reason: format!( "shuffle's length {} is greater than new_input_arity {}", shuffle.len(), - new_input_arity - ), + self.input_arity + ) } - .fail(); - } + ); // decompose self into map, filter, project for ease of manipulation let (mut map, mut filter, mut project) = self.as_map_filter_project(); @@ -451,16 +451,16 @@ impl SafeMfpPlan { values: &mut Vec, row_buf: &mut Row, ) -> Result, EvalError> { - if values.len() != self.mfp.input_arity { - return InvalidArgumentSnafu { + ensure!( + values.len() == self.mfp.input_arity, + InvalidArgumentSnafu { reason: format!( "values length {} is not equal to input_arity {}", values.len(), self.mfp.input_arity ), } - .fail(); - } + ); let passed_predicates = self.evaluate_inner(values)?; if !passed_predicates { diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 1f369c5314f8..1bffdebd71f2 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, BTreeSet}; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; use serde::{Deserialize, Serialize}; +use snafu::ensure; use crate::expr::error::{ EvalError, InvalidArgumentSnafu, OptimizeSnafu, UnsupportedTemporalFilterSnafu, @@ -107,19 +108,18 @@ impl ScalarExpr { /// each column referenced in `self`. pub fn permute(&mut self, permutation: &[usize]) -> Result<(), EvalError> { // check first so that we don't end up with a partially permuted expression - if self - .get_all_ref_columns() - .into_iter() - .any(|i| i >= permutation.len()) - { - return InvalidArgumentSnafu { + ensure!( + self.get_all_ref_columns() + .into_iter() + .all(|i| i < permutation.len()), + InvalidArgumentSnafu { reason: format!( "permutation {:?} is not a valid permutation for expression {:?}", permutation, self ), } - .fail(); - } + ); + self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { *old_i = permutation[*old_i]; @@ -136,18 +136,17 @@ impl ScalarExpr { /// each column referenced in `self`. pub fn permute_map(&mut self, permutation: &BTreeMap) -> Result<(), EvalError> { // check first so that we don't end up with a partially permuted expression - if !self - .get_all_ref_columns() - .is_subset(&permutation.keys().cloned().collect()) - { - return InvalidArgumentSnafu { + ensure!( + self.get_all_ref_columns() + .is_subset(&permutation.keys().cloned().collect()), + InvalidArgumentSnafu { reason: format!( "permutation {:?} is not a valid permutation for expression {:?}", permutation, self ), } - .fail(); - } + ); + self.visit_mut_post_nolimit(&mut |e| { if let ScalarExpr::Column(old_i) = e { *old_i = permutation[old_i];