From 49a82911509e7be1281588f206809abfc2d106e4 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 18 Jun 2024 20:21:01 +0800 Subject: [PATCH 01/16] tests: flow sqlness tests --- Cargo.lock | 33 ++++++++++++++-- Cargo.toml | 4 ++ src/flow/src/adapter.rs | 4 +- tests/cases/standalone/flow/basic.result | 49 ++++++++++++++++++++++++ tests/cases/standalone/flow/basic.sql | 27 +++++++++++++ tests/runner/Cargo.toml | 2 +- 6 files changed, 112 insertions(+), 7 deletions(-) create mode 100644 tests/cases/standalone/flow/basic.result create mode 100644 tests/cases/standalone/flow/basic.sql diff --git a/Cargo.lock b/Cargo.lock index d77a2da3efac..377974a7f161 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3584,6 +3584,20 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "duration-str" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "709d653e7c92498eb29fb86a2a6f0f3502b97530f33aedb32ef848d4d28b31a3" +dependencies = [ + "chrono", + "rust_decimal", + "serde", + "thiserror", + "time", + "winnow 0.6.8", +] + [[package]] name = "dyn-clone" version = "1.0.17" @@ -5696,7 +5710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.52.5", + "windows-targets 0.48.5", ] [[package]] @@ -6216,6 +6230,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minijinja" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e877d961d4f96ce13615862322df7c0b6d169d40cab71a7ef3f9b9e594451e" +dependencies = [ + "serde", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -10576,14 +10599,16 @@ dependencies = [ [[package]] name = "sqlness" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0860f149718809371602b42573693e1ed2b1d0aed35fe69e04e4e4e9918d81f7" +version = "0.6.0" +source = "git+https://github.com/CeresDB/sqlness.git?rev=5469532cdaf87791b3af5d7e2918e92725f8dfc5#5469532cdaf87791b3af5d7e2918e92725f8dfc5" dependencies = [ "async-trait", "derive_builder 0.11.2", + "duration-str", + "minijinja", "prettydiff", "regex", + "serde_json", "thiserror", "toml 0.5.11", "walkdir", diff --git a/Cargo.toml b/Cargo.toml index 7f9608ceb5c1..7fcbbf1eaa0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -262,3 +262,7 @@ strip = true [profile.dev.package.tests-fuzz] debug = false strip = true + +# use sqlness with the latest changes which support SLEEP, will remove this after sqlness with the changes is released +[patch.crates-io] +sqlness = { git = "https://github.com/CeresDB/sqlness.git", rev = "5469532cdaf87791b3af5d7e2918e92725f8dfc5" } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index fd07ff1dc2ff..0fd6af199101 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -321,7 +321,7 @@ impl FlownodeManager { schema .get_name(*i) .clone() - .unwrap_or_else(|| format!("Col_{i}")) + .unwrap_or_else(|| format!("col_{i}")) }) .collect_vec() }) @@ -344,7 +344,7 @@ impl FlownodeManager { .get(idx) .cloned() .flatten() - .unwrap_or(format!("Col_{}", idx)); + .unwrap_or(format!("col_{}", idx)); let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); if schema.typ().time_index == Some(idx) { ret.with_time_index(true) diff --git a/tests/cases/standalone/flow/basic.result b/tests/cases/standalone/flow/basic.result new file mode 100644 index 000000000000..ee5d7c8317a9 --- /dev/null +++ b/tests/cases/standalone/flow/basic.result @@ -0,0 +1,49 @@ +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + diff --git a/tests/cases/standalone/flow/basic.sql b/tests/cases/standalone/flow/basic.sql new file mode 100644 index 000000000000..0ffa81985616 --- /dev/null +++ b/tests/cases/standalone/flow/basic.sql @@ -0,0 +1,27 @@ +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt; + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt; \ No newline at end of file diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 6118c863fb15..875a8193a7c6 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -17,7 +17,7 @@ common-recordbatch.workspace = true common-time.workspace = true serde.workspace = true serde_json.workspace = true -sqlness = { version = "0.5" } +sqlness = { version = "0.6" } tempfile.workspace = true tinytemplate = "1.2" tokio.workspace = true From d61cae621c5f9447fb890c1dab766b22eaa7a248 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 18 Jun 2024 20:32:25 +0800 Subject: [PATCH 02/16] tests: WIP df func test --- tests/cases/standalone/flow/df_func.sql | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 tests/cases/standalone/flow/df_func.sql diff --git a/tests/cases/standalone/flow/df_func.sql b/tests/cases/standalone/flow/df_func.sql new file mode 100644 index 000000000000..8e00a0f14a68 --- /dev/null +++ b/tests/cases/standalone/flow/df_func.sql @@ -0,0 +1,27 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +INSERT INTO test_numbers_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +INSERT INTO test_numbers_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; \ No newline at end of file From 20114e00b8adc7dfc3cbb0ee29b868203ecd3c26 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 19 Jun 2024 17:07:02 +0800 Subject: [PATCH 03/16] fix: use schema before expand for transform expr --- src/flow/src/adapter/table_source.rs | 2 + src/flow/src/expr/scalar.rs | 46 +++++++++-- src/flow/src/repr/relation.rs | 28 ++++++- src/flow/src/transform/aggr.rs | 16 +++- src/flow/src/transform/expr.rs | 118 ++++++++++++++++++--------- src/flow/src/transform/plan.rs | 19 ++++- 6 files changed, 176 insertions(+), 53 deletions(-) diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 821f9601985f..24cf05c4b649 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -148,6 +148,8 @@ impl TableSource { column_types, keys, time_index, + // by default table schema's column are all non-auto + auto_columns: vec![], }, names: col_names, }, diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 8103089e67da..4bd8e365c64f 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -155,8 +155,10 @@ pub enum ScalarExpr { exprs: Vec, }, CallDf { - // TODO(discord9): support shuffle + /// invariant: the input args set insidet this [`DfScalarFunction`] is + /// always col(0) to col(n-1) where n is the length of `expr` df_scalar_fn: DfScalarFunction, + exprs: Vec, }, /// Conditionally evaluated expressions. /// @@ -190,7 +192,13 @@ impl DfScalarFunction { } // TODO(discord9): add RecordBatch support - pub fn eval(&self, values: &[Value]) -> Result { + pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { + // first eval exprs to construct values to feed to datafusion + let values: Vec<_> = exprs + .iter() + .map(|expr| expr.eval(values)) + .collect::>()?; + if values.is_empty() { return InvalidArgumentSnafu { reason: "values is empty".to_string(), @@ -354,7 +362,7 @@ impl ScalarExpr { Ok(ColumnType::new_nullable(func.signature().output)) } ScalarExpr::If { then, .. } => then.typ(context), - ScalarExpr::CallDf { df_scalar_fn } => { + ScalarExpr::CallDf { df_scalar_fn, .. } => { let arrow_typ = df_scalar_fn .fn_impl // TODO(discord9): get scheme from args instead? @@ -445,7 +453,10 @@ impl ScalarExpr { } .fail(), }, - ScalarExpr::CallDf { df_scalar_fn } => df_scalar_fn.eval(values), + ScalarExpr::CallDf { + df_scalar_fn, + exprs, + } => df_scalar_fn.eval(values, exprs), } } @@ -614,7 +625,15 @@ impl ScalarExpr { f(then)?; f(els) } - _ => Ok(()), + ScalarExpr::CallDf { + df_scalar_fn: _, + exprs, + } => { + for expr in exprs { + f(expr)?; + } + Ok(()) + } } } @@ -650,7 +669,15 @@ impl ScalarExpr { f(then)?; f(els) } - _ => Ok(()), + ScalarExpr::CallDf { + df_scalar_fn: _, + exprs, + } => { + for expr in exprs { + f(expr)?; + } + Ok(()) + } } } } @@ -857,6 +884,11 @@ mod test { let as_str = serde_json::to_string(&df_func).unwrap(); let from_str: DfScalarFunction = serde_json::from_str(&as_str).unwrap(); assert_eq!(df_func, from_str); - assert_eq!(df_func.eval(&[Value::Null]).unwrap(), Value::Int64(1)); + assert_eq!( + df_func + .eval(&[Value::Null], &[ScalarExpr::Column(0)]) + .unwrap(), + Value::Int64(1) + ); } } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index ae5c6b46ff2e..295cc206840f 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -21,7 +21,9 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use crate::adapter::error::{DatafusionSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu}; +use crate::adapter::error::{ + DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu, +}; use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr}; /// a set of column indices that are "keys" for the collection. @@ -93,13 +95,19 @@ pub struct RelationType { /// /// A collection can contain multiple sets of keys, although it is common to /// have either zero or one sets of key indices. - #[serde(default)] pub keys: Vec, /// optionally indicate the column that is TIME INDEX pub time_index: Option, + /// mark all the columns that are added automately by flow, but not present in original sql + pub auto_columns: Vec, } impl RelationType { + pub fn with_autos(mut self, auto_cols: &[usize]) -> Self { + self.auto_columns = auto_cols.to_vec(); + self + } + /// Trying to apply a mpf on current types, will return a new RelationType /// with the new types, will also try to preserve keys&time index information /// if the old key&time index columns are preserve in given mfp @@ -155,10 +163,16 @@ impl RelationType { let time_index = self .time_index .and_then(|old| old_to_new_col.get(&old).cloned()); + let auto_columns = self + .auto_columns + .iter() + .filter_map(|old| old_to_new_col.get(old).cloned()) + .collect_vec(); Ok(Self { column_types: mfp_out_types, keys, time_index, + auto_columns, }) } /// Constructs a `RelationType` representing the relation with no columns and @@ -175,6 +189,7 @@ impl RelationType { column_types, keys: Vec::new(), time_index: None, + auto_columns: vec![], } } @@ -340,6 +355,15 @@ pub struct RelationDesc { } impl RelationDesc { + pub fn len(&self) -> Result { + ensure!( + self.typ.column_types.len() == self.names.len(), + InternalSnafu { + reason: "Expect typ and names field to be of same length" + } + ); + Ok(self.names.len()) + } pub fn to_df_schema(&self) -> Result { let fields: Vec<_> = self .iter() diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index f6aad8c4bbae..e182abdccf13 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -348,8 +348,10 @@ impl TypedPlan { let mut output_types = Vec::new(); // give best effort to get column name let mut output_names = Vec::new(); + // mark all auto added cols + let mut auto_cols = vec![]; // first append group_expr as key, then aggr_expr as value - for expr in &group_exprs { + for (idx, expr) in group_exprs.iter().enumerate() { output_types.push(expr.typ.clone()); let col_name = match &expr.expr { ScalarExpr::CallUnary { @@ -359,7 +361,10 @@ impl TypedPlan { ScalarExpr::CallUnary { func: UnaryFunc::TumbleWindowCeiling { .. }, .. - } => Some("window_end".to_string()), + } => { + auto_cols.push(idx); + Some("window_end".to_string()) + } ScalarExpr::Column(col) => input.schema.get_name(*col).clone(), _ => None, }; @@ -380,6 +385,7 @@ impl TypedPlan { RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec()) } .with_time_index(time_index) + .with_autos(&auto_cols) .into_named(output_names) }; @@ -581,6 +587,7 @@ mod test { ]) .with_key(vec![1, 2]) .with_time_index(Some(0)) + .with_autos(&[1]) .into_named(vec![ Some("window_start".to_string()), Some("window_end".to_string()), @@ -610,6 +617,7 @@ mod test { ]) .with_key(vec![0, 3]) .with_time_index(Some(2)) + .with_autos(&[3]) .into_named(vec![ Some("number".to_string()), None, @@ -642,6 +650,7 @@ mod test { ]) .with_key(vec![2]) .with_time_index(Some(1)) + .with_autos(&[2]) .into_named(vec![ None, Some("window_start".to_string()), @@ -716,6 +725,7 @@ mod test { ]) .with_key(vec![1]) .with_time_index(Some(0)) + .with_autos(&[1]) .into_named(vec![ Some("window_start".to_string()), Some("window_end".to_string()), @@ -760,6 +770,7 @@ mod test { ]) .with_key(vec![2]) .with_time_index(Some(1)) + .with_autos(&[2]) .into_named(vec![ None, Some("window_start".to_string()), @@ -830,6 +841,7 @@ mod test { ]) .with_key(vec![1]) .with_time_index(Some(0)) + .with_autos(&[1]) .into_named(vec![ Some("window_start".to_string()), Some("window_end".to_string()), diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 5434ea237b45..0d072ff493df 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -34,7 +34,7 @@ use crate::expr::{ BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc, }; -use crate::repr::{ColumnType, RelationDesc}; +use crate::repr::{ColumnType, RelationDesc, RelationType}; use crate::transform::literal::{from_substrait_literal, from_substrait_type}; use crate::transform::{substrait_proto, FunctionExtensions}; // TODO(discord9): found proper place for this @@ -99,16 +99,58 @@ pub(crate) fn from_scalar_fn_to_df_fn_impl( Ok(phy_expr) } +/// Return a [`Expression`](wrapped in a [`FunctionArgument`]) that references the i-th column of the input relation +pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument { + use substrait_proto::proto::expression; + let expr = Expression { + rex_type: Some(expression::RexType::Selection(Box::new( + expression::FieldReference { + reference_type: Some(expression::field_reference::ReferenceType::DirectReference( + expression::ReferenceSegment { + reference_type: Some( + expression::reference_segment::ReferenceType::StructField(Box::new( + expression::reference_segment::StructField { + field: i as i32, + child: None, + }, + )), + ), + }, + )), + root_type: None, + }, + ))), + }; + substrait_proto::proto::FunctionArgument { + arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( + expr, + )), + } +} + impl TypedExpr { pub fn from_substrait_to_datafusion_scalar_func( f: &ScalarFunction, - input_schema: &RelationDesc, + arg_exprs_typed: Vec, extensions: &FunctionExtensions, ) -> Result { - let phy_expr = from_scalar_fn_to_df_fn_impl(f, input_schema, extensions)?; - let raw_fn = RawDfScalarFn::from_proto(f, input_schema.clone(), extensions.clone())?; - let expr = DfScalarFunction::new(raw_fn, phy_expr)?; - let expr = ScalarExpr::CallDf { df_scalar_fn: expr }; + let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = + arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip(); + // rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion + let mut f_rewrite = f.clone(); + for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() { + *raw_expr = proto_col(idx); + } + let input_schema = RelationType::new(arg_types).into_unnamed(); + dbg!(&input_schema, &f_rewrite, &extensions); + let phy_expr = from_scalar_fn_to_df_fn_impl(&f_rewrite, &input_schema, extensions)?; + let raw_fn = + RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?; + let df_func = DfScalarFunction::new(raw_fn, phy_expr)?; + let expr = ScalarExpr::CallDf { + df_scalar_fn: df_func, + exprs: arg_exprs, + }; // df already know it's own schema, so not providing here let ret_type = expr.typ(&[])?; Ok(TypedExpr::new(expr, ret_type)) @@ -240,9 +282,10 @@ impl TypedExpr { ret_type, )) } else { + dbg!(&fn_name, &arg_typed_exprs, &input_schema); let try_as_df = Self::from_substrait_to_datafusion_scalar_func( f, - input_schema, + arg_typed_exprs, extensions, )?; Ok(try_as_df) @@ -395,6 +438,30 @@ mod test { use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + + #[tokio::test] + async fn test_df_func_basic() { + let engine = create_test_query_engine(); + let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + } + + #[tokio::test] + async fn test_df_func_expr_tree() { + let engine = create_test_query_engine(); + let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + println!("{:#?}", plan); + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + println!("{:#?}", flow_plan); + } + /// test if `WHERE` condition can be converted to Flow's ScalarExpr in mfp's filter #[tokio::test] async fn test_where_and() { @@ -608,39 +675,10 @@ mod test { )), } } - fn col(i: usize) -> substrait_proto::proto::FunctionArgument { - use substrait_proto::proto::expression; - let expr = Expression { - rex_type: Some(expression::RexType::Selection(Box::new( - expression::FieldReference { - reference_type: Some( - expression::field_reference::ReferenceType::DirectReference( - expression::ReferenceSegment { - reference_type: Some( - expression::reference_segment::ReferenceType::StructField( - Box::new(expression::reference_segment::StructField { - field: i as i32, - child: None, - }), - ), - ), - }, - ), - ), - root_type: None, - }, - ))), - }; - substrait_proto::proto::FunctionArgument { - arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( - expr, - )), - } - } let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0)], + arguments: vec![proto_col(0)], options: vec![], output_type: None, ..Default::default() @@ -663,7 +701,7 @@ mod test { let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0), col(1)], + arguments: vec![proto_col(0), proto_col(1)], options: vec![], output_type: None, ..Default::default() @@ -690,7 +728,7 @@ mod test { let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0), lit("1 second"), lit("2021-07-01 00:00:00")], + arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")], options: vec![], output_type: None, ..Default::default() @@ -718,7 +756,7 @@ mod test { let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0), lit("1 second")], + arguments: vec![proto_col(0), lit("1 second")], options: vec![], output_type: None, ..Default::default() diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 339fe80586cd..ff4fb35c289a 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use itertools::Itertools; use snafu::OptionExt; @@ -79,8 +79,23 @@ impl TypedPlan { }; let mut exprs: Vec = vec![]; + // because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value + // function to correctly transform it, and late rewrite it + let schema_before_expand = { + let input_schema = input.schema.clone(); + let auto_columns: HashSet = + HashSet::from_iter(input_schema.typ().auto_columns.clone()); + let not_auto_added_columns = (0..input_schema.len()?) + .filter(|i| !auto_columns.contains(i)) + .collect_vec(); + let mfp = MapFilterProject::new(input_schema.len()?) + .project(not_auto_added_columns)? + .into_safe(); + + input_schema.apply_mfp(&mfp)? + }; for e in &p.expressions { - let expr = TypedExpr::from_substrait_rex(e, &input.schema, extensions)?; + let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions)?; exprs.push(expr); } let is_literal = exprs.iter().all(|expr| expr.expr.is_literal()); From a66e17261c84dd18bd9d6233c7ac9fe8a7a0fd32 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 19 Jun 2024 17:07:16 +0800 Subject: [PATCH 04/16] tests: some basic flow tests --- tests/cases/standalone/flow/basic.result | 12 ++ tests/cases/standalone/flow/basic.sql | 6 +- tests/cases/standalone/flow/df_func.result | 122 +++++++++++++++++++++ tests/cases/standalone/flow/df_func.sql | 42 ++++++- 4 files changed, 178 insertions(+), 4 deletions(-) create mode 100644 tests/cases/standalone/flow/df_func.result diff --git a/tests/cases/standalone/flow/basic.result b/tests/cases/standalone/flow/basic.result index ee5d7c8317a9..ea5d0180551a 100644 --- a/tests/cases/standalone/flow/basic.result +++ b/tests/cases/standalone/flow/basic.result @@ -47,3 +47,15 @@ SELECT col_0, window_start, window_end FROM out_num_cnt; | 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | +-------+---------------------+---------------------+ +DROP FLOW test_numbers; + +Affected Rows: 0 + +DROP TABLE numbers_input; + +Affected Rows: 0 + +DROP TABLE out_num_cnt; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow/basic.sql b/tests/cases/standalone/flow/basic.sql index 0ffa81985616..1ecceeae3f2c 100644 --- a/tests/cases/standalone/flow/basic.sql +++ b/tests/cases/standalone/flow/basic.sql @@ -24,4 +24,8 @@ VALUES (24,"2021-07-01 00:00:01.500"); -- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt; \ No newline at end of file +SELECT col_0, window_start, window_end FROM out_num_cnt; + +DROP FLOW test_numbers; +DROP TABLE numbers_input; +DROP TABLE out_num_cnt; \ No newline at end of file diff --git a/tests/cases/standalone/flow/df_func.result b/tests/cases/standalone/flow/df_func.result new file mode 100644 index 000000000000..52eceda45ccd --- /dev/null +++ b/tests/cases/standalone/flow/df_func.result @@ -0,0 +1,122 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow/df_func.sql b/tests/cases/standalone/flow/df_func.sql index 8e00a0f14a68..ae5a4196759a 100644 --- a/tests/cases/standalone/flow/df_func.sql +++ b/tests/cases/standalone/flow/df_func.sql @@ -10,7 +10,7 @@ SINK TO out_num_cnt_df_func AS SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); -INSERT INTO test_numbers_df_func +INSERT INTO numbers_input_df_func VALUES (-20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); @@ -18,10 +18,46 @@ VALUES -- SQLNESS SLEEP 2s SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; -INSERT INTO test_numbers_df_func +INSERT INTO numbers_input_df_func VALUES (23,"2021-07-01 00:00:01.000"), (-24,"2021-07-01 00:00:01.500"); -- SQLNESS SLEEP 2s -SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; \ No newline at end of file +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; \ No newline at end of file From 1951bc85859f3a8b61878ff20a03bd11e956c46e Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 19 Jun 2024 18:38:39 +0800 Subject: [PATCH 05/16] tests: unit test --- src/flow/src/expr/scalar.rs | 23 ++- src/flow/src/transform/aggr.rs | 284 ++++++++++++++++++++++++++++++++- src/flow/src/transform/expr.rs | 28 +--- 3 files changed, 296 insertions(+), 39 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 4bd8e365c64f..acb006a07564 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -191,6 +191,14 @@ impl DfScalarFunction { }) } + pub fn try_from_raw_fn(raw_fn: RawDfScalarFn) -> Result { + Ok(Self { + fn_impl: raw_fn.get_fn_impl()?, + df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?), + raw_fn, + }) + } + // TODO(discord9): add RecordBatch support pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { // first eval exprs to construct values to feed to datafusion @@ -267,16 +275,18 @@ impl<'de> serde::de::Deserialize<'de> for DfScalarFunction { D: serde::de::Deserializer<'de>, { let raw_fn = RawDfScalarFn::deserialize(deserializer)?; - let fn_impl = raw_fn.get_fn_impl().map_err(serde::de::Error::custom)?; - DfScalarFunction::new(raw_fn, fn_impl).map_err(serde::de::Error::custom) + DfScalarFunction::try_from_raw_fn(raw_fn).map_err(serde::de::Error::custom) } } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct RawDfScalarFn { - f: bytes::BytesMut, - input_schema: RelationDesc, - extensions: FunctionExtensions, + /// The raw bytes encoded datafusion scalar function + pub(crate) f: bytes::BytesMut, + /// The input schema of the function + pub(crate) input_schema: RelationDesc, + /// Extension contains mapping from function reference to function name + pub(crate) extensions: FunctionExtensions, } impl RawDfScalarFn { @@ -879,8 +889,7 @@ mod test { .unwrap(); let extensions = FunctionExtensions::from_iter(vec![(0, "abs")]); let raw_fn = RawDfScalarFn::from_proto(&raw_scalar_func, input_schema, extensions).unwrap(); - let fn_impl = raw_fn.get_fn_impl().unwrap(); - let df_func = DfScalarFunction::new(raw_fn, fn_impl).unwrap(); + let df_func = DfScalarFunction::try_from_raw_fn(raw_fn).unwrap(); let as_str = serde_json::to_string(&df_func).unwrap(); let from_str: DfScalarFunction = serde_json::from_str(&as_str).unwrap(); assert_eq!(df_func, from_str); diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index e182abdccf13..3cc1512692d5 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -461,17 +461,19 @@ impl TypedPlan { #[cfg(test)] mod test { + use bytes::BytesMut; use common_time::{DateTime, Interval}; use datatypes::prelude::ConcreteDataType; use pretty_assertions::{assert_eq, assert_ne}; use super::*; + use crate::expr::{DfScalarFunction, RawDfScalarFn}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; /// TODO(discord9): add more illegal sql tests #[tokio::test] - async fn tes_missing_key_check() { + async fn test_missing_key_check() { let engine = create_test_query_engine(); let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; let plan = sql_to_substrait(engine.clone(), sql).await; @@ -479,6 +481,282 @@ mod test { let mut ctx = create_test_ctx(); assert!(TypedPlan::from_substrait_plan(&mut ctx, &plan).is_err()); } + + #[tokio::test] + async fn test_df_func_basic() { + let engine = create_test_query_engine(); + let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_expr = AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }; + let expected = TypedPlan { + schema: RelationType::new(vec![ + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]) + .with_key(vec![2]) + .with_time_index(Some(1)) + .with_autos(&[2]) + .into_named(vec![ + None, + Some("window_start".to_string()), + Some("window_end".to_string()), + ]), + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_named(vec![ + Some("number".to_string()), + Some("ts".to_string()), + ]), + ), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ]) + .unwrap() + .project(vec![2, 3]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .map(vec![ScalarExpr::CallDf { + df_scalar_fn: DfScalarFunction::try_from_raw_fn( + RawDfScalarFn { + f: BytesMut::from( + b"\x08\x01\"\x08\x1a\x06\x12\x04\n\x02\x12\0" + .as_ref(), + ), + input_schema: RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_unnamed(), + extensions: FunctionExtensions { + anchor_to_name: BTreeMap::from([ + (0, "tumble".to_string()), + (1, "abs".to_string()), + (2, "sum".to_string()), + ]), + }, + }, + ) + .unwrap(), + exprs: vec![ScalarExpr::Column(0)], + }]) + .unwrap() + .project(vec![2]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ]) + .with_key(vec![1]) + .with_time_index(Some(0)) + .with_autos(&[1]) + .into_named(vec![ + Some("window_start".to_string()), + Some("window_end".to_string()), + None, + ]), + ), + ), + mfp: MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2), + ScalarExpr::Column(3), + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![4, 5, 6]) + .unwrap(), + }, + }; + assert_eq!(expected, flow_plan); + } + + #[tokio::test] + async fn test_df_func_expr_tree() { + let engine = create_test_query_engine(); + let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_expr = AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }; + let expected = TypedPlan { + schema: RelationType::new(vec![ + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]) + .with_key(vec![2]) + .with_time_index(Some(1)) + .with_autos(&[2]) + .into_named(vec![ + None, + Some("window_start".to_string()), + Some("window_end".to_string()), + ]), + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_named(vec![ + Some("number".to_string()), + Some("ts".to_string()), + ]), + ), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ]) + .unwrap() + .project(vec![2, 3]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .project(vec![0, 1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ]) + .with_key(vec![1]) + .with_time_index(Some(0)) + .with_autos(&[1]) + .into_named(vec![ + Some("window_start".to_string()), + Some("window_end".to_string()), + None, + ]), + ), + ), + mfp: MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2), + ScalarExpr::CallDf { + df_scalar_fn: DfScalarFunction::try_from_raw_fn(RawDfScalarFn { + f: BytesMut::from(b"\"\x08\x1a\x06\x12\x04\n\x02\x12\0".as_ref()), + input_schema: RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint64_datatype(), + true, + )]) + .into_unnamed(), + extensions: FunctionExtensions { + anchor_to_name: BTreeMap::from([ + (0, "abs".to_string()), + (1, "tumble".to_string()), + (2, "sum".to_string()), + ]), + }, + }) + .unwrap(), + exprs: vec![ScalarExpr::Column(3)], + }, + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![4, 5, 6]) + .unwrap(), + }, + }; + assert_eq!(expected, flow_plan); + } + /// TODO(discord9): add more illegal sql tests #[tokio::test] async fn test_tumble_composite() { @@ -656,10 +934,6 @@ mod test { Some("window_start".to_string()), Some("window_end".to_string()), ]), - // TODO(discord9): mfp indirectly ref to key columns - /* - .with_key(vec![1]) - .with_time_index(Some(0)),*/ plan: Plan::Mfp { input: Box::new( Plan::Reduce { diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 0d072ff493df..6b7b41930d2c 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -142,11 +142,9 @@ impl TypedExpr { *raw_expr = proto_col(idx); } let input_schema = RelationType::new(arg_types).into_unnamed(); - dbg!(&input_schema, &f_rewrite, &extensions); - let phy_expr = from_scalar_fn_to_df_fn_impl(&f_rewrite, &input_schema, extensions)?; let raw_fn = RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?; - let df_func = DfScalarFunction::new(raw_fn, phy_expr)?; + let df_func = DfScalarFunction::try_from_raw_fn(raw_fn)?; let expr = ScalarExpr::CallDf { df_scalar_fn: df_func, exprs: arg_exprs, @@ -282,7 +280,6 @@ impl TypedExpr { ret_type, )) } else { - dbg!(&fn_name, &arg_typed_exprs, &input_schema); let try_as_df = Self::from_substrait_to_datafusion_scalar_func( f, arg_typed_exprs, @@ -439,29 +436,6 @@ mod test { use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; - #[tokio::test] - async fn test_df_func_basic() { - let engine = create_test_query_engine(); - let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); - } - - #[tokio::test] - async fn test_df_func_expr_tree() { - let engine = create_test_query_engine(); - let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; - let plan = sql_to_substrait(engine.clone(), sql).await; - - println!("{:#?}", plan); - let mut ctx = create_test_ctx(); - let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); - - println!("{:#?}", flow_plan); - } - /// test if `WHERE` condition can be converted to Flow's ScalarExpr in mfp's filter #[tokio::test] async fn test_where_and() { From 19224d0e31ec5e1dbfa0d53153ef60c074984987 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 20 Jun 2024 11:09:14 +0800 Subject: [PATCH 06/16] chore: dep use rev not patch --- Cargo.lock | 2 +- Cargo.toml | 4 ---- tests/runner/Cargo.toml | 3 ++- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 377974a7f161..8538df00b288 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3595,7 +3595,7 @@ dependencies = [ "serde", "thiserror", "time", - "winnow 0.6.8", + "winnow 0.6.13", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7fcbbf1eaa0e..7f9608ceb5c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -262,7 +262,3 @@ strip = true [profile.dev.package.tests-fuzz] debug = false strip = true - -# use sqlness with the latest changes which support SLEEP, will remove this after sqlness with the changes is released -[patch.crates-io] -sqlness = { git = "https://github.com/CeresDB/sqlness.git", rev = "5469532cdaf87791b3af5d7e2918e92725f8dfc5" } diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 875a8193a7c6..bcf9eaade421 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -17,7 +17,8 @@ common-recordbatch.workspace = true common-time.workspace = true serde.workspace = true serde_json.workspace = true -sqlness = { version = "0.6" } +# use sqlness with the latest changes which support SLEEP, will remove this after sqlness with the changes is released +sqlness = { git = "https://github.com/CeresDB/sqlness.git", rev = "5469532cdaf87791b3af5d7e2918e92725f8dfc5" } tempfile.workspace = true tinytemplate = "1.2" tokio.workspace = true From 12ac61733c3a7d50eb36d8f3955ddc10c425edd2 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 20 Jun 2024 12:31:26 +0800 Subject: [PATCH 07/16] fix: wired sqlness error? --- .../common/types/timestamp/incorrect_timestamp.result | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result index 4b8895bb7ac4..7c730da5580f 100644 --- a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result +++ b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result @@ -44,7 +44,11 @@ Error: 2000(InvalidSyntax), Failed to parse value: Failed to parse 1900a01a01 00 INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); -Error: 2000(InvalidSyntax), Failed to parse value: Failed to parse 1900-1-1 00;00;00 to Timestamp value +Error: 2000(InvalidSyntax), sql parser error: Unterminated string literal at Line: 1, Column 31 + +Error: 1001(Unsupported), SQL statement is not supported: 00;, keyword: 00 + +Error: 2000(InvalidSyntax), sql parser error: Unterminated string literal at Line: 1, Column 3 INSERT INTO timestamp VALUES ('1900-1-1 00a00a00'); From e3fefd6611eb3bb71618a1eaa172a03d739d02f1 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 21 Jun 2024 15:18:47 +0800 Subject: [PATCH 08/16] refactor: per review --- src/flow/src/expr/scalar.rs | 2 +- src/flow/src/repr/relation.rs | 2 +- src/flow/src/transform/expr.rs | 2 +- src/flow/src/transform/plan.rs | 285 ++++++++++++------------ tests/cases/standalone/flow/basic.sql | 2 +- tests/cases/standalone/flow/df_func.sql | 2 +- 6 files changed, 152 insertions(+), 143 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index acb006a07564..3c34f8111efe 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -155,7 +155,7 @@ pub enum ScalarExpr { exprs: Vec, }, CallDf { - /// invariant: the input args set insidet this [`DfScalarFunction`] is + /// invariant: the input args set inside this [`DfScalarFunction`] is /// always col(0) to col(n-1) where n is the length of `expr` df_scalar_fn: DfScalarFunction, exprs: Vec, diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 295cc206840f..63c214363225 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -98,7 +98,7 @@ pub struct RelationType { pub keys: Vec, /// optionally indicate the column that is TIME INDEX pub time_index: Option, - /// mark all the columns that are added automately by flow, but not present in original sql + /// mark all the columns that are added automatically by flow, but are not present in original sql pub auto_columns: Vec, } diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 6b7b41930d2c..13f503a0f008 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -99,7 +99,7 @@ pub(crate) fn from_scalar_fn_to_df_fn_impl( Ok(phy_expr) } -/// Return a [`Expression`](wrapped in a [`FunctionArgument`]) that references the i-th column of the input relation +/// Return an [`Expression`](wrapped in a [`FunctionArgument`]) that references the i-th column of the input relation pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument { use substrait_proto::proto::expression; let expr = Expression { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index ff4fb35c289a..a9d9e29310e9 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -16,10 +16,11 @@ use std::collections::{BTreeMap, BTreeSet, HashSet}; use itertools::Itertools; use snafu::OptionExt; +use substrait::substrait_proto_df::proto::{FilterRel, ReadRel}; use substrait_proto::proto::expression::MaskExpression; use substrait_proto::proto::read_rel::ReadType; use substrait_proto::proto::rel::RelType; -use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel}; +use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel}; use crate::adapter::error::{ Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, @@ -63,156 +64,164 @@ impl TypedPlan { } } - /// Convert Substrait Rel into Flow's TypedPlan - /// TODO(discord9): SELECT DISTINCT(does it get compile with something else?) - pub fn from_substrait_rel( + pub fn from_substrait_project( ctx: &mut FlownodeContext, - rel: &Rel, + p: &ProjectRel, extensions: &FunctionExtensions, ) -> Result { - match &rel.rel_type { - Some(RelType::Project(p)) => { - let input = if let Some(input) = p.input.as_ref() { - TypedPlan::from_substrait_rel(ctx, input, extensions)? - } else { - return not_impl_err!("Projection without an input is not supported"); - }; + let input = if let Some(input) = p.input.as_ref() { + TypedPlan::from_substrait_rel(ctx, input, extensions)? + } else { + return not_impl_err!("Projection without an input is not supported"); + }; + + // because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value + // function to correctly transform it, and late rewrite it + let schema_before_expand = { + let input_schema = input.schema.clone(); + let auto_columns: HashSet = + HashSet::from_iter(input_schema.typ().auto_columns.clone()); + let not_auto_added_columns = (0..input_schema.len()?) + .filter(|i| !auto_columns.contains(i)) + .collect_vec(); + let mfp = MapFilterProject::new(input_schema.len()?) + .project(not_auto_added_columns)? + .into_safe(); - let mut exprs: Vec = vec![]; - // because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value - // function to correctly transform it, and late rewrite it - let schema_before_expand = { - let input_schema = input.schema.clone(); - let auto_columns: HashSet = - HashSet::from_iter(input_schema.typ().auto_columns.clone()); - let not_auto_added_columns = (0..input_schema.len()?) - .filter(|i| !auto_columns.contains(i)) - .collect_vec(); - let mfp = MapFilterProject::new(input_schema.len()?) - .project(not_auto_added_columns)? - .into_safe(); + input_schema.apply_mfp(&mfp)? + }; - input_schema.apply_mfp(&mfp)? - }; - for e in &p.expressions { - let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions)?; - exprs.push(expr); + let mut exprs: Vec = Vec::with_capacity(p.expressions.len()); + for e in &p.expressions { + let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions)?; + exprs.push(expr); + } + let is_literal = exprs.iter().all(|expr| expr.expr.is_literal()); + if is_literal { + let (literals, lit_types): (Vec<_>, Vec<_>) = exprs + .into_iter() + .map(|TypedExpr { expr, typ }| (expr, typ)) + .unzip(); + let typ = RelationType::new(lit_types); + let row = literals + .into_iter() + .map(|lit| lit.as_literal().expect("A literal")) + .collect_vec(); + let row = repr::Row::new(row); + let plan = Plan::Constant { + rows: vec![(row, repr::Timestamp::MIN, 1)], + }; + Ok(TypedPlan { + schema: typ.into_unnamed(), + plan, + }) + } else { + match input.plan.clone() { + Plan::Reduce { key_val_plan, .. } => { + rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?; } - let is_literal = exprs.iter().all(|expr| expr.expr.is_literal()); - if is_literal { - let (literals, lit_types): (Vec<_>, Vec<_>) = exprs - .into_iter() - .map(|TypedExpr { expr, typ }| (expr, typ)) - .unzip(); - let typ = RelationType::new(lit_types); - let row = literals - .into_iter() - .map(|lit| lit.as_literal().expect("A literal")) - .collect_vec(); - let row = repr::Row::new(row); - let plan = Plan::Constant { - rows: vec![(row, repr::Timestamp::MIN, 1)], - }; - Ok(TypedPlan { - schema: typ.into_unnamed(), - plan, - }) - } else { - match input.plan.clone() { - Plan::Reduce { key_val_plan, .. } => { - rewrite_projection_after_reduce( - key_val_plan, - &input.schema, - &mut exprs, - )?; - } - Plan::Mfp { input, mfp: _ } => { - if let Plan::Reduce { key_val_plan, .. } = input.plan { - rewrite_projection_after_reduce( - key_val_plan, - &input.schema, - &mut exprs, - )?; - } - } - _ => (), + Plan::Mfp { input, mfp: _ } => { + if let Plan::Reduce { key_val_plan, .. } = input.plan { + rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?; } - input.projection(exprs) } + _ => (), } - Some(RelType::Filter(filter)) => { - let input = if let Some(input) = filter.input.as_ref() { - TypedPlan::from_substrait_rel(ctx, input, extensions)? - } else { - return not_impl_err!("Filter without an input is not supported"); - }; + input.projection(exprs) + } + } - let expr = if let Some(condition) = filter.condition.as_ref() { - TypedExpr::from_substrait_rex(condition, &input.schema, extensions)? - } else { - return not_impl_err!("Filter without an condition is not valid"); - }; - input.filter(expr) - } - Some(RelType::Read(read)) => { - if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type { - let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu { - reason: "Query context not found", - })?; - let table_reference = match nt.names.len() { - 1 => [ - query_ctx.current_catalog().to_string(), - query_ctx.current_schema().to_string(), - nt.names[0].clone(), - ], - 2 => [ - query_ctx.current_catalog().to_string(), - nt.names[0].clone(), - nt.names[1].clone(), - ], - 3 => [ - nt.names[0].clone(), - nt.names[1].clone(), - nt.names[2].clone(), - ], - _ => InvalidQuerySnafu { - reason: "Expect table to have name", - } - .fail()?, - }; - let table = ctx.table(&table_reference)?; - let get_table = Plan::Get { - id: crate::expr::Id::Global(table.0), - }; - let get_table = TypedPlan { - schema: table.1, - plan: get_table, - }; + pub fn from_substrait_filter( + ctx: &mut FlownodeContext, + filter: &FilterRel, + extensions: &FunctionExtensions, + ) -> Result { + let input = if let Some(input) = filter.input.as_ref() { + TypedPlan::from_substrait_rel(ctx, input, extensions)? + } else { + return not_impl_err!("Filter without an input is not supported"); + }; - if let Some(MaskExpression { - select: Some(projection), - .. - }) = &read.projection - { - let column_indices: Vec = projection - .struct_items - .iter() - .map(|item| item.field as usize) - .collect(); - let input_arity = get_table.schema.typ().column_types.len(); - let mfp = - MapFilterProject::new(input_arity).project(column_indices.clone())?; - get_table.mfp(mfp.into_safe()) - } else { - Ok(get_table) - } - } else { - not_impl_err!("Only NamedTable reads are supported") + let expr = if let Some(condition) = filter.condition.as_ref() { + TypedExpr::from_substrait_rex(condition, &input.schema, extensions)? + } else { + return not_impl_err!("Filter without an condition is not valid"); + }; + input.filter(expr) + } + + pub fn from_substrait_read( + ctx: &mut FlownodeContext, + read: &ReadRel, + _extensions: &FunctionExtensions, + ) -> Result { + if let Some(ReadType::NamedTable(nt)) = &read.read_type { + let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu { + reason: "Query context not found", + })?; + let table_reference = match nt.names.len() { + 1 => [ + query_ctx.current_catalog().to_string(), + query_ctx.current_schema().to_string(), + nt.names[0].clone(), + ], + 2 => [ + query_ctx.current_catalog().to_string(), + nt.names[0].clone(), + nt.names[1].clone(), + ], + 3 => [ + nt.names[0].clone(), + nt.names[1].clone(), + nt.names[2].clone(), + ], + _ => InvalidQuerySnafu { + reason: "Expect table to have name", } + .fail()?, + }; + let table = ctx.table(&table_reference)?; + let get_table = Plan::Get { + id: crate::expr::Id::Global(table.0), + }; + let get_table = TypedPlan { + schema: table.1, + plan: get_table, + }; + + if let Some(MaskExpression { + select: Some(projection), + .. + }) = &read.projection + { + let column_indices: Vec = projection + .struct_items + .iter() + .map(|item| item.field as usize) + .collect(); + let input_arity = get_table.schema.typ().column_types.len(); + let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?; + get_table.mfp(mfp.into_safe()) + } else { + Ok(get_table) } - Some(RelType::Aggregate(agg)) => { - TypedPlan::from_substrait_agg_rel(ctx, agg, extensions) - } + } else { + not_impl_err!("Only NamedTable reads are supported") + } + } + + /// Convert Substrait Rel into Flow's TypedPlan + /// TODO(discord9): SELECT DISTINCT(does it get compile with something else?) + pub fn from_substrait_rel( + ctx: &mut FlownodeContext, + rel: &Rel, + extensions: &FunctionExtensions, + ) -> Result { + match &rel.rel_type { + Some(RelType::Project(p)) => Self::from_substrait_project(ctx, p.as_ref(), extensions), + Some(RelType::Filter(filter)) => Self::from_substrait_filter(ctx, filter, extensions), + Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions), + Some(RelType::Aggregate(agg)) => Self::from_substrait_agg_rel(ctx, agg, extensions), _ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type), } } diff --git a/tests/cases/standalone/flow/basic.sql b/tests/cases/standalone/flow/basic.sql index 1ecceeae3f2c..c09100ce9313 100644 --- a/tests/cases/standalone/flow/basic.sql +++ b/tests/cases/standalone/flow/basic.sql @@ -28,4 +28,4 @@ SELECT col_0, window_start, window_end FROM out_num_cnt; DROP FLOW test_numbers; DROP TABLE numbers_input; -DROP TABLE out_num_cnt; \ No newline at end of file +DROP TABLE out_num_cnt; diff --git a/tests/cases/standalone/flow/df_func.sql b/tests/cases/standalone/flow/df_func.sql index ae5a4196759a..64869f6d7f6b 100644 --- a/tests/cases/standalone/flow/df_func.sql +++ b/tests/cases/standalone/flow/df_func.sql @@ -60,4 +60,4 @@ SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; DROP FLOW test_numbers_df_func; DROP TABLE numbers_input_df_func; -DROP TABLE out_num_cnt_df_func; \ No newline at end of file +DROP TABLE out_num_cnt_df_func; From 06d42d22d9596735e602d713da6c1c6ce140010d Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 21 Jun 2024 16:41:16 +0800 Subject: [PATCH 09/16] fix: temp sqlness bug --- .../common/types/timestamp/incorrect_timestamp.result | 10 ++-------- .../common/types/timestamp/incorrect_timestamp.sql | 3 ++- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result index 7c730da5580f..440e60ca525d 100644 --- a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result +++ b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result @@ -42,14 +42,8 @@ INSERT INTO timestamp VALUES ('1900a01a01 00:00:00'); Error: 2000(InvalidSyntax), Failed to parse value: Failed to parse 1900a01a01 00:00:00 to Timestamp value -INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); - -Error: 2000(InvalidSyntax), sql parser error: Unterminated string literal at Line: 1, Column 31 - -Error: 1001(Unsupported), SQL statement is not supported: 00;, keyword: 00 - -Error: 2000(InvalidSyntax), sql parser error: Unterminated string literal at Line: 1, Column 3 - +-- FIXME(discord9): a sqlness bug is causing this sql to fail to parse, see issue: https://github.com/CeresDB/sqlness/issues/68 +-- INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); INSERT INTO timestamp VALUES ('1900-1-1 00a00a00'); Error: 2000(InvalidSyntax), Failed to parse value: Failed to parse 1900-1-1 00a00a00 to Timestamp value diff --git a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql index 1afa7c7e8aca..8f80f8f2be2b 100644 --- a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql +++ b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql @@ -20,7 +20,8 @@ INSERT INTO timestamp VALUES ('1900-1-1 59:59:23'); INSERT INTO timestamp VALUES ('1900a01a01 00:00:00'); -INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); +-- FIXME(discord9): a sqlness bug is causing this sql to fail to parse, see issue: https://github.com/CeresDB/sqlness/issues/68 +-- INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); INSERT INTO timestamp VALUES ('1900-1-1 00a00a00'); From decda3790724fe7e35c55e51f763072bb74057a1 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Jun 2024 11:21:39 +0800 Subject: [PATCH 10/16] fix: use fixed sqlness --- Cargo.lock | 7 ++++--- .../common/types/timestamp/incorrect_timestamp.result | 6 ++++-- .../common/types/timestamp/incorrect_timestamp.sql | 3 +-- tests/runner/Cargo.toml | 3 +-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8538df00b288..834d88349a9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5710,7 +5710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -10599,8 +10599,9 @@ dependencies = [ [[package]] name = "sqlness" -version = "0.6.0" -source = "git+https://github.com/CeresDB/sqlness.git?rev=5469532cdaf87791b3af5d7e2918e92725f8dfc5#5469532cdaf87791b3af5d7e2918e92725f8dfc5" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "308a7338f2211813d6e9da117e9b9b7aee5d072872d11a934002fd2bd4ab5276" dependencies = [ "async-trait", "derive_builder 0.11.2", diff --git a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result index 440e60ca525d..4b8895bb7ac4 100644 --- a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result +++ b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.result @@ -42,8 +42,10 @@ INSERT INTO timestamp VALUES ('1900a01a01 00:00:00'); Error: 2000(InvalidSyntax), Failed to parse value: Failed to parse 1900a01a01 00:00:00 to Timestamp value --- FIXME(discord9): a sqlness bug is causing this sql to fail to parse, see issue: https://github.com/CeresDB/sqlness/issues/68 --- INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); +INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); + +Error: 2000(InvalidSyntax), Failed to parse value: Failed to parse 1900-1-1 00;00;00 to Timestamp value + INSERT INTO timestamp VALUES ('1900-1-1 00a00a00'); Error: 2000(InvalidSyntax), Failed to parse value: Failed to parse 1900-1-1 00a00a00 to Timestamp value diff --git a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql index 8f80f8f2be2b..1afa7c7e8aca 100644 --- a/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql +++ b/tests/cases/standalone/common/types/timestamp/incorrect_timestamp.sql @@ -20,8 +20,7 @@ INSERT INTO timestamp VALUES ('1900-1-1 59:59:23'); INSERT INTO timestamp VALUES ('1900a01a01 00:00:00'); --- FIXME(discord9): a sqlness bug is causing this sql to fail to parse, see issue: https://github.com/CeresDB/sqlness/issues/68 --- INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); +INSERT INTO timestamp VALUES ('1900-1-1 00;00;00'); INSERT INTO timestamp VALUES ('1900-1-1 00a00a00'); diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index bcf9eaade421..7bf1c4ad2e04 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -17,8 +17,7 @@ common-recordbatch.workspace = true common-time.workspace = true serde.workspace = true serde_json.workspace = true -# use sqlness with the latest changes which support SLEEP, will remove this after sqlness with the changes is released -sqlness = { git = "https://github.com/CeresDB/sqlness.git", rev = "5469532cdaf87791b3af5d7e2918e92725f8dfc5" } +sqlness = "0.6.1" tempfile.workspace = true tinytemplate = "1.2" tokio.workspace = true From e5e6698e133a06d21da1fc51763c4fd604f2728c Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Jun 2024 11:50:33 +0800 Subject: [PATCH 11/16] fix: impl drop as async shutdown --- src/flow/src/adapter/worker.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 076e681bed10..4599690ab587 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -206,10 +206,15 @@ impl WorkerHandle { impl Drop for WorkerHandle { fn drop(&mut self) { - if let Err(err) = self.shutdown_blocking() { - common_telemetry::error!("Fail to shutdown worker: {:?}", err) + let ret = futures::executor::block_on(async { self.shutdown().await }); + if let Err(ret) = ret { + common_telemetry::error!( + ret; + "While dropping Worker Handle, failed to shutdown worker, worker might be in inconsistent state." + ); + } else { + info!("Flow Worker shutdown due to Worker Handle dropped.") } - info!("Flow Worker shutdown due to Worker Handle dropped.") } } @@ -532,7 +537,7 @@ mod test { tx.send((Row::empty(), 0, 0)).unwrap(); handle.run_available(0).await.unwrap(); assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); - handle.shutdown().await.unwrap(); + drop(handle); worker_thread_handle.join().unwrap(); } } From 9996b1b8c1e4edfc7335d5058f283bb8342b7cc9 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Jun 2024 11:51:02 +0800 Subject: [PATCH 12/16] refactor: per bot's review --- src/flow/src/expr/scalar.rs | 15 ++++++++++----- src/flow/src/transform/expr.rs | 19 ++++++++++++++----- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 3c34f8111efe..7335511be0f0 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -21,10 +21,10 @@ use bytes::BytesMut; use common_error::ext::BoxedError; use common_recordbatch::DfRecordBatch; use datafusion_physical_expr::PhysicalExpr; -use datatypes::arrow_array; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; +use datatypes::{arrow_array, value}; use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -199,13 +199,18 @@ impl DfScalarFunction { }) } + /// eval a list of expressions using input values + fn eval_args(values: &[Value], exprs: &[ScalarExpr]) -> Result, EvalError> { + exprs + .iter() + .map(|expr| expr.eval(values)) + .collect::>() + } + // TODO(discord9): add RecordBatch support pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { // first eval exprs to construct values to feed to datafusion - let values: Vec<_> = exprs - .iter() - .map(|expr| expr.eval(values)) - .collect::>()?; + let values: Vec<_> = Self::eval_args(values, exprs)?; if values.is_empty() { return InvalidArgumentSnafu { diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 13f503a0f008..a6b312504d28 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -128,6 +128,15 @@ pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument { } } +/// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion +fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction { + let mut f_rewrite = f.clone(); + for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() { + *raw_expr = proto_col(idx); + } + f_rewrite +} + impl TypedExpr { pub fn from_substrait_to_datafusion_scalar_func( f: &ScalarFunction, @@ -136,14 +145,13 @@ impl TypedExpr { ) -> Result { let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip(); - // rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion - let mut f_rewrite = f.clone(); - for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() { - *raw_expr = proto_col(idx); - } + + let f_rewrite = rewrite_scalar_function(f); + let input_schema = RelationType::new(arg_types).into_unnamed(); let raw_fn = RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?; + let df_func = DfScalarFunction::try_from_raw_fn(raw_fn)?; let expr = ScalarExpr::CallDf { df_scalar_fn: df_func, @@ -153,6 +161,7 @@ impl TypedExpr { let ret_type = expr.typ(&[])?; Ok(TypedExpr::new(expr, ret_type)) } + /// Convert ScalarFunction into Flow's ScalarExpr pub fn from_substrait_scalar_func( f: &ScalarFunction, From 768ffcf053d552e8b0bece8676a72ebbfbb407a1 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Jun 2024 14:24:39 +0800 Subject: [PATCH 13/16] tests: drop worker handler both sync/async --- src/flow/src/adapter/worker.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 4599690ab587..4d9ad2f52447 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -501,6 +501,19 @@ mod test { use crate::plan::Plan; use crate::repr::{RelationType, Row}; + #[test] + fn drop_handle() { + let (tx, rx) = oneshot::channel(); + let worker_thread_handle = std::thread::spawn(move || { + let (handle, mut worker) = create_worker(); + tx.send(handle).unwrap(); + worker.run(); + }); + let handle = rx.blocking_recv().unwrap(); + drop(handle); + worker_thread_handle.join().unwrap(); + } + #[tokio::test] pub async fn test_simple_get_with_worker_and_handle() { let (tx, rx) = oneshot::channel(); From 84b958c2feea7c1052bfec354b9961069a82d5be Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Jun 2024 14:31:13 +0800 Subject: [PATCH 14/16] docs: add rationale for test --- tests/cases/standalone/flow/df_func.result | 2 ++ tests/cases/standalone/flow/df_func.sql | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tests/cases/standalone/flow/df_func.result b/tests/cases/standalone/flow/df_func.result index 52eceda45ccd..72b57d63daf5 100644 --- a/tests/cases/standalone/flow/df_func.result +++ b/tests/cases/standalone/flow/df_func.result @@ -7,6 +7,7 @@ CREATE TABLE numbers_input_df_func ( Affected Rows: 0 +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func AS @@ -68,6 +69,7 @@ CREATE TABLE numbers_input_df_func ( Affected Rows: 0 +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func AS diff --git a/tests/cases/standalone/flow/df_func.sql b/tests/cases/standalone/flow/df_func.sql index 64869f6d7f6b..a9f9f4cc0437 100644 --- a/tests/cases/standalone/flow/df_func.sql +++ b/tests/cases/standalone/flow/df_func.sql @@ -5,6 +5,7 @@ CREATE TABLE numbers_input_df_func ( TIME INDEX(ts) ); +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func AS @@ -37,6 +38,7 @@ CREATE TABLE numbers_input_df_func ( TIME INDEX(ts) ); +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working CREATE FLOW test_numbers_df_func SINK TO out_num_cnt_df_func AS From 561f99bf7c4b5648bfc415069ae703580b59dc1c Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Jun 2024 14:54:07 +0800 Subject: [PATCH 15/16] refactor: per review --- src/flow/src/repr/relation.rs | 1 + tests/cases/standalone/flow/basic.result | 2 +- tests/cases/standalone/flow/basic.sql | 2 +- tests/cases/standalone/flow/df_func.result | 6 ++++-- tests/cases/standalone/flow/df_func.sql | 6 ++++-- tests/runner/Cargo.toml | 1 + 6 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 63c214363225..43947dc47236 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -364,6 +364,7 @@ impl RelationDesc { ); Ok(self.names.len()) } + pub fn to_df_schema(&self) -> Result { let fields: Vec<_> = self .iter() diff --git a/tests/cases/standalone/flow/basic.result b/tests/cases/standalone/flow/basic.result index ea5d0180551a..c9a3c7714abc 100644 --- a/tests/cases/standalone/flow/basic.result +++ b/tests/cases/standalone/flow/basic.result @@ -21,7 +21,7 @@ VALUES Affected Rows: 2 --- SQLNESS SLEEP 2s +-- SQLNESS SLEEP 3s SELECT col_0, window_start, window_end FROM out_num_cnt; +-------+---------------------+---------------------+ diff --git a/tests/cases/standalone/flow/basic.sql b/tests/cases/standalone/flow/basic.sql index c09100ce9313..9043875dd13a 100644 --- a/tests/cases/standalone/flow/basic.sql +++ b/tests/cases/standalone/flow/basic.sql @@ -15,7 +15,7 @@ VALUES (20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); --- SQLNESS SLEEP 2s +-- SQLNESS SLEEP 3s SELECT col_0, window_start, window_end FROM out_num_cnt; INSERT INTO numbers_input diff --git a/tests/cases/standalone/flow/df_func.result b/tests/cases/standalone/flow/df_func.result index 72b57d63daf5..7ab393eeb10e 100644 --- a/tests/cases/standalone/flow/df_func.result +++ b/tests/cases/standalone/flow/df_func.result @@ -22,7 +22,8 @@ VALUES Affected Rows: 2 --- SQLNESS SLEEP 2s +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; +-------+---------------------+---------------------+ @@ -84,7 +85,8 @@ VALUES Affected Rows: 2 --- SQLNESS SLEEP 2s +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; +-------+---------------------+---------------------+ diff --git a/tests/cases/standalone/flow/df_func.sql b/tests/cases/standalone/flow/df_func.sql index a9f9f4cc0437..b9a22cb9da6d 100644 --- a/tests/cases/standalone/flow/df_func.sql +++ b/tests/cases/standalone/flow/df_func.sql @@ -16,7 +16,8 @@ VALUES (-20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); --- SQLNESS SLEEP 2s +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; INSERT INTO numbers_input_df_func @@ -49,7 +50,8 @@ VALUES (-20, "2021-07-01 00:00:00.200"), (22, "2021-07-01 00:00:00.600"); --- SQLNESS SLEEP 2s +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; INSERT INTO numbers_input_df_func diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 7bf1c4ad2e04..d492d7931979 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -17,6 +17,7 @@ common-recordbatch.workspace = true common-time.workspace = true serde.workspace = true serde_json.workspace = true +# sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1 sqlness = "0.6.1" tempfile.workspace = true tinytemplate = "1.2" From fa89509b7966b5f9a86f4440568f44eadeec777e Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Jun 2024 15:00:34 +0800 Subject: [PATCH 16/16] chore: fmt --- tests/runner/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index d492d7931979..1bac0c593337 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -17,7 +17,7 @@ common-recordbatch.workspace = true common-time.workspace = true serde.workspace = true serde_json.workspace = true -# sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1 +# sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1 sqlness = "0.6.1" tempfile.workspace = true tinytemplate = "1.2"