diff --git a/Cargo.lock b/Cargo.lock index d77a2da3efac..834d88349a9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3584,6 +3584,20 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "duration-str" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "709d653e7c92498eb29fb86a2a6f0f3502b97530f33aedb32ef848d4d28b31a3" +dependencies = [ + "chrono", + "rust_decimal", + "serde", + "thiserror", + "time", + "winnow 0.6.13", +] + [[package]] name = "dyn-clone" version = "1.0.17" @@ -6216,6 +6230,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minijinja" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e877d961d4f96ce13615862322df7c0b6d169d40cab71a7ef3f9b9e594451e" +dependencies = [ + "serde", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -10576,14 +10599,17 @@ dependencies = [ [[package]] name = "sqlness" -version = "0.5.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0860f149718809371602b42573693e1ed2b1d0aed35fe69e04e4e4e9918d81f7" +checksum = "308a7338f2211813d6e9da117e9b9b7aee5d072872d11a934002fd2bd4ab5276" dependencies = [ "async-trait", "derive_builder 0.11.2", + "duration-str", + "minijinja", "prettydiff", "regex", + "serde_json", "thiserror", "toml 0.5.11", "walkdir", diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index fd07ff1dc2ff..0fd6af199101 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -321,7 +321,7 @@ impl FlownodeManager { schema .get_name(*i) .clone() - .unwrap_or_else(|| format!("Col_{i}")) + .unwrap_or_else(|| format!("col_{i}")) }) .collect_vec() }) @@ -344,7 +344,7 @@ impl FlownodeManager { .get(idx) .cloned() .flatten() - .unwrap_or(format!("Col_{}", idx)); + .unwrap_or(format!("col_{}", idx)); let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable); if schema.typ().time_index == Some(idx) { ret.with_time_index(true) diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index 821f9601985f..24cf05c4b649 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -148,6 +148,8 @@ impl TableSource { column_types, keys, time_index, + // by default table schema's column are all non-auto + auto_columns: vec![], }, names: col_names, }, diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 076e681bed10..4d9ad2f52447 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -206,10 +206,15 @@ impl WorkerHandle { impl Drop for WorkerHandle { fn drop(&mut self) { - if let Err(err) = self.shutdown_blocking() { - common_telemetry::error!("Fail to shutdown worker: {:?}", err) + let ret = futures::executor::block_on(async { self.shutdown().await }); + if let Err(ret) = ret { + common_telemetry::error!( + ret; + "While dropping Worker Handle, failed to shutdown worker, worker might be in inconsistent state." + ); + } else { + info!("Flow Worker shutdown due to Worker Handle dropped.") } - info!("Flow Worker shutdown due to Worker Handle dropped.") } } @@ -496,6 +501,19 @@ mod test { use crate::plan::Plan; use crate::repr::{RelationType, Row}; + #[test] + fn drop_handle() { + let (tx, rx) = oneshot::channel(); + let worker_thread_handle = std::thread::spawn(move || { + let (handle, mut worker) = create_worker(); + tx.send(handle).unwrap(); + worker.run(); + }); + let handle = rx.blocking_recv().unwrap(); + drop(handle); + worker_thread_handle.join().unwrap(); + } + #[tokio::test] pub async fn test_simple_get_with_worker_and_handle() { let (tx, rx) = oneshot::channel(); @@ -532,7 +550,7 @@ mod test { tx.send((Row::empty(), 0, 0)).unwrap(); handle.run_available(0).await.unwrap(); assert_eq!(sink_rx.recv().await.unwrap().0, Row::empty()); - handle.shutdown().await.unwrap(); + drop(handle); worker_thread_handle.join().unwrap(); } } diff --git a/src/flow/src/expr/scalar.rs b/src/flow/src/expr/scalar.rs index 8103089e67da..7335511be0f0 100644 --- a/src/flow/src/expr/scalar.rs +++ b/src/flow/src/expr/scalar.rs @@ -21,10 +21,10 @@ use bytes::BytesMut; use common_error::ext::BoxedError; use common_recordbatch::DfRecordBatch; use datafusion_physical_expr::PhysicalExpr; -use datatypes::arrow_array; use datatypes::data_type::DataType; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; +use datatypes::{arrow_array, value}; use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -155,8 +155,10 @@ pub enum ScalarExpr { exprs: Vec, }, CallDf { - // TODO(discord9): support shuffle + /// invariant: the input args set inside this [`DfScalarFunction`] is + /// always col(0) to col(n-1) where n is the length of `expr` df_scalar_fn: DfScalarFunction, + exprs: Vec, }, /// Conditionally evaluated expressions. /// @@ -189,8 +191,27 @@ impl DfScalarFunction { }) } + pub fn try_from_raw_fn(raw_fn: RawDfScalarFn) -> Result { + Ok(Self { + fn_impl: raw_fn.get_fn_impl()?, + df_schema: Arc::new(raw_fn.input_schema.to_df_schema()?), + raw_fn, + }) + } + + /// eval a list of expressions using input values + fn eval_args(values: &[Value], exprs: &[ScalarExpr]) -> Result, EvalError> { + exprs + .iter() + .map(|expr| expr.eval(values)) + .collect::>() + } + // TODO(discord9): add RecordBatch support - pub fn eval(&self, values: &[Value]) -> Result { + pub fn eval(&self, values: &[Value], exprs: &[ScalarExpr]) -> Result { + // first eval exprs to construct values to feed to datafusion + let values: Vec<_> = Self::eval_args(values, exprs)?; + if values.is_empty() { return InvalidArgumentSnafu { reason: "values is empty".to_string(), @@ -259,16 +280,18 @@ impl<'de> serde::de::Deserialize<'de> for DfScalarFunction { D: serde::de::Deserializer<'de>, { let raw_fn = RawDfScalarFn::deserialize(deserializer)?; - let fn_impl = raw_fn.get_fn_impl().map_err(serde::de::Error::custom)?; - DfScalarFunction::new(raw_fn, fn_impl).map_err(serde::de::Error::custom) + DfScalarFunction::try_from_raw_fn(raw_fn).map_err(serde::de::Error::custom) } } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct RawDfScalarFn { - f: bytes::BytesMut, - input_schema: RelationDesc, - extensions: FunctionExtensions, + /// The raw bytes encoded datafusion scalar function + pub(crate) f: bytes::BytesMut, + /// The input schema of the function + pub(crate) input_schema: RelationDesc, + /// Extension contains mapping from function reference to function name + pub(crate) extensions: FunctionExtensions, } impl RawDfScalarFn { @@ -354,7 +377,7 @@ impl ScalarExpr { Ok(ColumnType::new_nullable(func.signature().output)) } ScalarExpr::If { then, .. } => then.typ(context), - ScalarExpr::CallDf { df_scalar_fn } => { + ScalarExpr::CallDf { df_scalar_fn, .. } => { let arrow_typ = df_scalar_fn .fn_impl // TODO(discord9): get scheme from args instead? @@ -445,7 +468,10 @@ impl ScalarExpr { } .fail(), }, - ScalarExpr::CallDf { df_scalar_fn } => df_scalar_fn.eval(values), + ScalarExpr::CallDf { + df_scalar_fn, + exprs, + } => df_scalar_fn.eval(values, exprs), } } @@ -614,7 +640,15 @@ impl ScalarExpr { f(then)?; f(els) } - _ => Ok(()), + ScalarExpr::CallDf { + df_scalar_fn: _, + exprs, + } => { + for expr in exprs { + f(expr)?; + } + Ok(()) + } } } @@ -650,7 +684,15 @@ impl ScalarExpr { f(then)?; f(els) } - _ => Ok(()), + ScalarExpr::CallDf { + df_scalar_fn: _, + exprs, + } => { + for expr in exprs { + f(expr)?; + } + Ok(()) + } } } } @@ -852,11 +894,15 @@ mod test { .unwrap(); let extensions = FunctionExtensions::from_iter(vec![(0, "abs")]); let raw_fn = RawDfScalarFn::from_proto(&raw_scalar_func, input_schema, extensions).unwrap(); - let fn_impl = raw_fn.get_fn_impl().unwrap(); - let df_func = DfScalarFunction::new(raw_fn, fn_impl).unwrap(); + let df_func = DfScalarFunction::try_from_raw_fn(raw_fn).unwrap(); let as_str = serde_json::to_string(&df_func).unwrap(); let from_str: DfScalarFunction = serde_json::from_str(&as_str).unwrap(); assert_eq!(df_func, from_str); - assert_eq!(df_func.eval(&[Value::Null]).unwrap(), Value::Int64(1)); + assert_eq!( + df_func + .eval(&[Value::Null], &[ScalarExpr::Column(0)]) + .unwrap(), + Value::Int64(1) + ); } } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index ae5c6b46ff2e..43947dc47236 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -21,7 +21,9 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; -use crate::adapter::error::{DatafusionSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu}; +use crate::adapter::error::{ + DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu, +}; use crate::expr::{MapFilterProject, SafeMfpPlan, ScalarExpr}; /// a set of column indices that are "keys" for the collection. @@ -93,13 +95,19 @@ pub struct RelationType { /// /// A collection can contain multiple sets of keys, although it is common to /// have either zero or one sets of key indices. - #[serde(default)] pub keys: Vec, /// optionally indicate the column that is TIME INDEX pub time_index: Option, + /// mark all the columns that are added automatically by flow, but are not present in original sql + pub auto_columns: Vec, } impl RelationType { + pub fn with_autos(mut self, auto_cols: &[usize]) -> Self { + self.auto_columns = auto_cols.to_vec(); + self + } + /// Trying to apply a mpf on current types, will return a new RelationType /// with the new types, will also try to preserve keys&time index information /// if the old key&time index columns are preserve in given mfp @@ -155,10 +163,16 @@ impl RelationType { let time_index = self .time_index .and_then(|old| old_to_new_col.get(&old).cloned()); + let auto_columns = self + .auto_columns + .iter() + .filter_map(|old| old_to_new_col.get(old).cloned()) + .collect_vec(); Ok(Self { column_types: mfp_out_types, keys, time_index, + auto_columns, }) } /// Constructs a `RelationType` representing the relation with no columns and @@ -175,6 +189,7 @@ impl RelationType { column_types, keys: Vec::new(), time_index: None, + auto_columns: vec![], } } @@ -340,6 +355,16 @@ pub struct RelationDesc { } impl RelationDesc { + pub fn len(&self) -> Result { + ensure!( + self.typ.column_types.len() == self.names.len(), + InternalSnafu { + reason: "Expect typ and names field to be of same length" + } + ); + Ok(self.names.len()) + } + pub fn to_df_schema(&self) -> Result { let fields: Vec<_> = self .iter() diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index f6aad8c4bbae..3cc1512692d5 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -348,8 +348,10 @@ impl TypedPlan { let mut output_types = Vec::new(); // give best effort to get column name let mut output_names = Vec::new(); + // mark all auto added cols + let mut auto_cols = vec![]; // first append group_expr as key, then aggr_expr as value - for expr in &group_exprs { + for (idx, expr) in group_exprs.iter().enumerate() { output_types.push(expr.typ.clone()); let col_name = match &expr.expr { ScalarExpr::CallUnary { @@ -359,7 +361,10 @@ impl TypedPlan { ScalarExpr::CallUnary { func: UnaryFunc::TumbleWindowCeiling { .. }, .. - } => Some("window_end".to_string()), + } => { + auto_cols.push(idx); + Some("window_end".to_string()) + } ScalarExpr::Column(col) => input.schema.get_name(*col).clone(), _ => None, }; @@ -380,6 +385,7 @@ impl TypedPlan { RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec()) } .with_time_index(time_index) + .with_autos(&auto_cols) .into_named(output_names) }; @@ -455,17 +461,19 @@ impl TypedPlan { #[cfg(test)] mod test { + use bytes::BytesMut; use common_time::{DateTime, Interval}; use datatypes::prelude::ConcreteDataType; use pretty_assertions::{assert_eq, assert_ne}; use super::*; + use crate::expr::{DfScalarFunction, RawDfScalarFn}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; /// TODO(discord9): add more illegal sql tests #[tokio::test] - async fn tes_missing_key_check() { + async fn test_missing_key_check() { let engine = create_test_query_engine(); let sql = "SELECT avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number"; let plan = sql_to_substrait(engine.clone(), sql).await; @@ -473,6 +481,282 @@ mod test { let mut ctx = create_test_ctx(); assert!(TypedPlan::from_substrait_plan(&mut ctx, &plan).is_err()); } + + #[tokio::test] + async fn test_df_func_basic() { + let engine = create_test_query_engine(); + let sql = "SELECT sum(abs(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_expr = AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }; + let expected = TypedPlan { + schema: RelationType::new(vec![ + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]) + .with_key(vec![2]) + .with_time_index(Some(1)) + .with_autos(&[2]) + .into_named(vec![ + None, + Some("window_start".to_string()), + Some("window_end".to_string()), + ]), + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_named(vec![ + Some("number".to_string()), + Some("ts".to_string()), + ]), + ), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ]) + .unwrap() + .project(vec![2, 3]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .map(vec![ScalarExpr::CallDf { + df_scalar_fn: DfScalarFunction::try_from_raw_fn( + RawDfScalarFn { + f: BytesMut::from( + b"\x08\x01\"\x08\x1a\x06\x12\x04\n\x02\x12\0" + .as_ref(), + ), + input_schema: RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint32_datatype(), + false, + )]) + .into_unnamed(), + extensions: FunctionExtensions { + anchor_to_name: BTreeMap::from([ + (0, "tumble".to_string()), + (1, "abs".to_string()), + (2, "sum".to_string()), + ]), + }, + }, + ) + .unwrap(), + exprs: vec![ScalarExpr::Column(0)], + }]) + .unwrap() + .project(vec![2]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ]) + .with_key(vec![1]) + .with_time_index(Some(0)) + .with_autos(&[1]) + .into_named(vec![ + Some("window_start".to_string()), + Some("window_end".to_string()), + None, + ]), + ), + ), + mfp: MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2), + ScalarExpr::Column(3), + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![4, 5, 6]) + .unwrap(), + }, + }; + assert_eq!(expected, flow_plan); + } + + #[tokio::test] + async fn test_df_func_expr_tree() { + let engine = create_test_query_engine(); + let sql = "SELECT abs(sum(number)) FROM numbers_with_ts GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00');"; + let plan = sql_to_substrait(engine.clone(), sql).await; + + let mut ctx = create_test_ctx(); + let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap(); + + let aggr_expr = AggregateExpr { + func: AggregateFunc::SumUInt32, + expr: ScalarExpr::Column(0), + distinct: false, + }; + let expected = TypedPlan { + schema: RelationType::new(vec![ + ColumnType::new(CDT::uint64_datatype(), true), // sum(number) + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ]) + .with_key(vec![2]) + .with_time_index(Some(1)) + .with_autos(&[2]) + .into_named(vec![ + None, + Some("window_start".to_string()), + Some("window_end".to_string()), + ]), + plan: Plan::Mfp { + input: Box::new( + Plan::Reduce { + input: Box::new( + Plan::Get { + id: crate::expr::Id::Global(GlobalId::User(1)), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(ConcreteDataType::uint32_datatype(), false), + ColumnType::new(ConcreteDataType::datetime_datatype(), false), + ]) + .into_named(vec![ + Some("number".to_string()), + Some("ts".to_string()), + ]), + ), + ), + key_val_plan: KeyValPlan { + key_plan: MapFilterProject::new(2) + .map(vec![ + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowFloor { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ScalarExpr::Column(1).call_unary( + UnaryFunc::TumbleWindowCeiling { + window_size: Interval::from_month_day_nano( + 0, + 0, + 1_000_000_000, + ), + start_time: Some(DateTime::new(1625097600000)), + }, + ), + ]) + .unwrap() + .project(vec![2, 3]) + .unwrap() + .into_safe(), + val_plan: MapFilterProject::new(2) + .project(vec![0, 1]) + .unwrap() + .into_safe(), + }, + reduce_plan: ReducePlan::Accumulable(AccumulablePlan { + full_aggrs: vec![aggr_expr.clone()], + simple_aggrs: vec![AggrWithIndex::new(aggr_expr.clone(), 0, 0)], + distinct_aggrs: vec![], + }), + } + .with_types( + RelationType::new(vec![ + ColumnType::new(CDT::datetime_datatype(), false), // window start + ColumnType::new(CDT::datetime_datatype(), false), // window end + ColumnType::new(CDT::uint64_datatype(), true), //sum(number) + ]) + .with_key(vec![1]) + .with_time_index(Some(0)) + .with_autos(&[1]) + .into_named(vec![ + Some("window_start".to_string()), + Some("window_end".to_string()), + None, + ]), + ), + ), + mfp: MapFilterProject::new(3) + .map(vec![ + ScalarExpr::Column(2), + ScalarExpr::CallDf { + df_scalar_fn: DfScalarFunction::try_from_raw_fn(RawDfScalarFn { + f: BytesMut::from(b"\"\x08\x1a\x06\x12\x04\n\x02\x12\0".as_ref()), + input_schema: RelationType::new(vec![ColumnType::new( + ConcreteDataType::uint64_datatype(), + true, + )]) + .into_unnamed(), + extensions: FunctionExtensions { + anchor_to_name: BTreeMap::from([ + (0, "abs".to_string()), + (1, "tumble".to_string()), + (2, "sum".to_string()), + ]), + }, + }) + .unwrap(), + exprs: vec![ScalarExpr::Column(3)], + }, + ScalarExpr::Column(0), + ScalarExpr::Column(1), + ]) + .unwrap() + .project(vec![4, 5, 6]) + .unwrap(), + }, + }; + assert_eq!(expected, flow_plan); + } + /// TODO(discord9): add more illegal sql tests #[tokio::test] async fn test_tumble_composite() { @@ -581,6 +865,7 @@ mod test { ]) .with_key(vec![1, 2]) .with_time_index(Some(0)) + .with_autos(&[1]) .into_named(vec![ Some("window_start".to_string()), Some("window_end".to_string()), @@ -610,6 +895,7 @@ mod test { ]) .with_key(vec![0, 3]) .with_time_index(Some(2)) + .with_autos(&[3]) .into_named(vec![ Some("number".to_string()), None, @@ -642,15 +928,12 @@ mod test { ]) .with_key(vec![2]) .with_time_index(Some(1)) + .with_autos(&[2]) .into_named(vec![ None, Some("window_start".to_string()), Some("window_end".to_string()), ]), - // TODO(discord9): mfp indirectly ref to key columns - /* - .with_key(vec![1]) - .with_time_index(Some(0)),*/ plan: Plan::Mfp { input: Box::new( Plan::Reduce { @@ -716,6 +999,7 @@ mod test { ]) .with_key(vec![1]) .with_time_index(Some(0)) + .with_autos(&[1]) .into_named(vec![ Some("window_start".to_string()), Some("window_end".to_string()), @@ -760,6 +1044,7 @@ mod test { ]) .with_key(vec![2]) .with_time_index(Some(1)) + .with_autos(&[2]) .into_named(vec![ None, Some("window_start".to_string()), @@ -830,6 +1115,7 @@ mod test { ]) .with_key(vec![1]) .with_time_index(Some(0)) + .with_autos(&[1]) .into_named(vec![ Some("window_start".to_string()), Some("window_end".to_string()), diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 5434ea237b45..a6b312504d28 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -34,7 +34,7 @@ use crate::expr::{ BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc, }; -use crate::repr::{ColumnType, RelationDesc}; +use crate::repr::{ColumnType, RelationDesc, RelationType}; use crate::transform::literal::{from_substrait_literal, from_substrait_type}; use crate::transform::{substrait_proto, FunctionExtensions}; // TODO(discord9): found proper place for this @@ -99,20 +99,69 @@ pub(crate) fn from_scalar_fn_to_df_fn_impl( Ok(phy_expr) } +/// Return an [`Expression`](wrapped in a [`FunctionArgument`]) that references the i-th column of the input relation +pub(crate) fn proto_col(i: usize) -> substrait_proto::proto::FunctionArgument { + use substrait_proto::proto::expression; + let expr = Expression { + rex_type: Some(expression::RexType::Selection(Box::new( + expression::FieldReference { + reference_type: Some(expression::field_reference::ReferenceType::DirectReference( + expression::ReferenceSegment { + reference_type: Some( + expression::reference_segment::ReferenceType::StructField(Box::new( + expression::reference_segment::StructField { + field: i as i32, + child: None, + }, + )), + ), + }, + )), + root_type: None, + }, + ))), + }; + substrait_proto::proto::FunctionArgument { + arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( + expr, + )), + } +} + +/// rewrite ScalarFunction's arguments to Columns 0..n so nested exprs are still handled by us instead of datafusion +fn rewrite_scalar_function(f: &ScalarFunction) -> ScalarFunction { + let mut f_rewrite = f.clone(); + for (idx, raw_expr) in f_rewrite.arguments.iter_mut().enumerate() { + *raw_expr = proto_col(idx); + } + f_rewrite +} + impl TypedExpr { pub fn from_substrait_to_datafusion_scalar_func( f: &ScalarFunction, - input_schema: &RelationDesc, + arg_exprs_typed: Vec, extensions: &FunctionExtensions, ) -> Result { - let phy_expr = from_scalar_fn_to_df_fn_impl(f, input_schema, extensions)?; - let raw_fn = RawDfScalarFn::from_proto(f, input_schema.clone(), extensions.clone())?; - let expr = DfScalarFunction::new(raw_fn, phy_expr)?; - let expr = ScalarExpr::CallDf { df_scalar_fn: expr }; + let (arg_exprs, arg_types): (Vec<_>, Vec<_>) = + arg_exprs_typed.into_iter().map(|e| (e.expr, e.typ)).unzip(); + + let f_rewrite = rewrite_scalar_function(f); + + let input_schema = RelationType::new(arg_types).into_unnamed(); + let raw_fn = + RawDfScalarFn::from_proto(&f_rewrite, input_schema.clone(), extensions.clone())?; + + let df_func = DfScalarFunction::try_from_raw_fn(raw_fn)?; + let expr = ScalarExpr::CallDf { + df_scalar_fn: df_func, + exprs: arg_exprs, + }; // df already know it's own schema, so not providing here let ret_type = expr.typ(&[])?; Ok(TypedExpr::new(expr, ret_type)) } + /// Convert ScalarFunction into Flow's ScalarExpr pub fn from_substrait_scalar_func( f: &ScalarFunction, @@ -242,7 +291,7 @@ impl TypedExpr { } else { let try_as_df = Self::from_substrait_to_datafusion_scalar_func( f, - input_schema, + arg_typed_exprs, extensions, )?; Ok(try_as_df) @@ -395,6 +444,7 @@ mod test { use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait}; + /// test if `WHERE` condition can be converted to Flow's ScalarExpr in mfp's filter #[tokio::test] async fn test_where_and() { @@ -608,39 +658,10 @@ mod test { )), } } - fn col(i: usize) -> substrait_proto::proto::FunctionArgument { - use substrait_proto::proto::expression; - let expr = Expression { - rex_type: Some(expression::RexType::Selection(Box::new( - expression::FieldReference { - reference_type: Some( - expression::field_reference::ReferenceType::DirectReference( - expression::ReferenceSegment { - reference_type: Some( - expression::reference_segment::ReferenceType::StructField( - Box::new(expression::reference_segment::StructField { - field: i as i32, - child: None, - }), - ), - ), - }, - ), - ), - root_type: None, - }, - ))), - }; - substrait_proto::proto::FunctionArgument { - arg_type: Some(substrait_proto::proto::function_argument::ArgType::Value( - expr, - )), - } - } let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0)], + arguments: vec![proto_col(0)], options: vec![], output_type: None, ..Default::default() @@ -663,7 +684,7 @@ mod test { let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0), col(1)], + arguments: vec![proto_col(0), proto_col(1)], options: vec![], output_type: None, ..Default::default() @@ -690,7 +711,7 @@ mod test { let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0), lit("1 second"), lit("2021-07-01 00:00:00")], + arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")], options: vec![], output_type: None, ..Default::default() @@ -718,7 +739,7 @@ mod test { let f = substrait_proto::proto::expression::ScalarFunction { function_reference: 0, - arguments: vec![col(0), lit("1 second")], + arguments: vec![proto_col(0), lit("1 second")], options: vec![], output_type: None, ..Default::default() diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 339fe80586cd..a9d9e29310e9 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use itertools::Itertools; use snafu::OptionExt; +use substrait::substrait_proto_df::proto::{FilterRel, ReadRel}; use substrait_proto::proto::expression::MaskExpression; use substrait_proto::proto::read_rel::ReadType; use substrait_proto::proto::rel::RelType; -use substrait_proto::proto::{plan_rel, Plan as SubPlan, Rel}; +use substrait_proto::proto::{plan_rel, Plan as SubPlan, ProjectRel, Rel}; use crate::adapter::error::{ Error, InternalSnafu, InvalidQuerySnafu, NotImplementedSnafu, PlanSnafu, UnexpectedSnafu, @@ -63,141 +64,164 @@ impl TypedPlan { } } - /// Convert Substrait Rel into Flow's TypedPlan - /// TODO(discord9): SELECT DISTINCT(does it get compile with something else?) - pub fn from_substrait_rel( + pub fn from_substrait_project( ctx: &mut FlownodeContext, - rel: &Rel, + p: &ProjectRel, extensions: &FunctionExtensions, ) -> Result { - match &rel.rel_type { - Some(RelType::Project(p)) => { - let input = if let Some(input) = p.input.as_ref() { - TypedPlan::from_substrait_rel(ctx, input, extensions)? - } else { - return not_impl_err!("Projection without an input is not supported"); - }; + let input = if let Some(input) = p.input.as_ref() { + TypedPlan::from_substrait_rel(ctx, input, extensions)? + } else { + return not_impl_err!("Projection without an input is not supported"); + }; + + // because this `input.schema` is incorrect for pre-expand substrait plan, so we have to use schema before expand multi-value + // function to correctly transform it, and late rewrite it + let schema_before_expand = { + let input_schema = input.schema.clone(); + let auto_columns: HashSet = + HashSet::from_iter(input_schema.typ().auto_columns.clone()); + let not_auto_added_columns = (0..input_schema.len()?) + .filter(|i| !auto_columns.contains(i)) + .collect_vec(); + let mfp = MapFilterProject::new(input_schema.len()?) + .project(not_auto_added_columns)? + .into_safe(); - let mut exprs: Vec = vec![]; - for e in &p.expressions { - let expr = TypedExpr::from_substrait_rex(e, &input.schema, extensions)?; - exprs.push(expr); + input_schema.apply_mfp(&mfp)? + }; + + let mut exprs: Vec = Vec::with_capacity(p.expressions.len()); + for e in &p.expressions { + let expr = TypedExpr::from_substrait_rex(e, &schema_before_expand, extensions)?; + exprs.push(expr); + } + let is_literal = exprs.iter().all(|expr| expr.expr.is_literal()); + if is_literal { + let (literals, lit_types): (Vec<_>, Vec<_>) = exprs + .into_iter() + .map(|TypedExpr { expr, typ }| (expr, typ)) + .unzip(); + let typ = RelationType::new(lit_types); + let row = literals + .into_iter() + .map(|lit| lit.as_literal().expect("A literal")) + .collect_vec(); + let row = repr::Row::new(row); + let plan = Plan::Constant { + rows: vec![(row, repr::Timestamp::MIN, 1)], + }; + Ok(TypedPlan { + schema: typ.into_unnamed(), + plan, + }) + } else { + match input.plan.clone() { + Plan::Reduce { key_val_plan, .. } => { + rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?; } - let is_literal = exprs.iter().all(|expr| expr.expr.is_literal()); - if is_literal { - let (literals, lit_types): (Vec<_>, Vec<_>) = exprs - .into_iter() - .map(|TypedExpr { expr, typ }| (expr, typ)) - .unzip(); - let typ = RelationType::new(lit_types); - let row = literals - .into_iter() - .map(|lit| lit.as_literal().expect("A literal")) - .collect_vec(); - let row = repr::Row::new(row); - let plan = Plan::Constant { - rows: vec![(row, repr::Timestamp::MIN, 1)], - }; - Ok(TypedPlan { - schema: typ.into_unnamed(), - plan, - }) - } else { - match input.plan.clone() { - Plan::Reduce { key_val_plan, .. } => { - rewrite_projection_after_reduce( - key_val_plan, - &input.schema, - &mut exprs, - )?; - } - Plan::Mfp { input, mfp: _ } => { - if let Plan::Reduce { key_val_plan, .. } = input.plan { - rewrite_projection_after_reduce( - key_val_plan, - &input.schema, - &mut exprs, - )?; - } - } - _ => (), + Plan::Mfp { input, mfp: _ } => { + if let Plan::Reduce { key_val_plan, .. } = input.plan { + rewrite_projection_after_reduce(key_val_plan, &input.schema, &mut exprs)?; } - input.projection(exprs) } + _ => (), } - Some(RelType::Filter(filter)) => { - let input = if let Some(input) = filter.input.as_ref() { - TypedPlan::from_substrait_rel(ctx, input, extensions)? - } else { - return not_impl_err!("Filter without an input is not supported"); - }; + input.projection(exprs) + } + } - let expr = if let Some(condition) = filter.condition.as_ref() { - TypedExpr::from_substrait_rex(condition, &input.schema, extensions)? - } else { - return not_impl_err!("Filter without an condition is not valid"); - }; - input.filter(expr) - } - Some(RelType::Read(read)) => { - if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type { - let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu { - reason: "Query context not found", - })?; - let table_reference = match nt.names.len() { - 1 => [ - query_ctx.current_catalog().to_string(), - query_ctx.current_schema().to_string(), - nt.names[0].clone(), - ], - 2 => [ - query_ctx.current_catalog().to_string(), - nt.names[0].clone(), - nt.names[1].clone(), - ], - 3 => [ - nt.names[0].clone(), - nt.names[1].clone(), - nt.names[2].clone(), - ], - _ => InvalidQuerySnafu { - reason: "Expect table to have name", - } - .fail()?, - }; - let table = ctx.table(&table_reference)?; - let get_table = Plan::Get { - id: crate::expr::Id::Global(table.0), - }; - let get_table = TypedPlan { - schema: table.1, - plan: get_table, - }; + pub fn from_substrait_filter( + ctx: &mut FlownodeContext, + filter: &FilterRel, + extensions: &FunctionExtensions, + ) -> Result { + let input = if let Some(input) = filter.input.as_ref() { + TypedPlan::from_substrait_rel(ctx, input, extensions)? + } else { + return not_impl_err!("Filter without an input is not supported"); + }; - if let Some(MaskExpression { - select: Some(projection), - .. - }) = &read.projection - { - let column_indices: Vec = projection - .struct_items - .iter() - .map(|item| item.field as usize) - .collect(); - let input_arity = get_table.schema.typ().column_types.len(); - let mfp = - MapFilterProject::new(input_arity).project(column_indices.clone())?; - get_table.mfp(mfp.into_safe()) - } else { - Ok(get_table) - } - } else { - not_impl_err!("Only NamedTable reads are supported") + let expr = if let Some(condition) = filter.condition.as_ref() { + TypedExpr::from_substrait_rex(condition, &input.schema, extensions)? + } else { + return not_impl_err!("Filter without an condition is not valid"); + }; + input.filter(expr) + } + + pub fn from_substrait_read( + ctx: &mut FlownodeContext, + read: &ReadRel, + _extensions: &FunctionExtensions, + ) -> Result { + if let Some(ReadType::NamedTable(nt)) = &read.read_type { + let query_ctx = ctx.query_context.clone().context(UnexpectedSnafu { + reason: "Query context not found", + })?; + let table_reference = match nt.names.len() { + 1 => [ + query_ctx.current_catalog().to_string(), + query_ctx.current_schema().to_string(), + nt.names[0].clone(), + ], + 2 => [ + query_ctx.current_catalog().to_string(), + nt.names[0].clone(), + nt.names[1].clone(), + ], + 3 => [ + nt.names[0].clone(), + nt.names[1].clone(), + nt.names[2].clone(), + ], + _ => InvalidQuerySnafu { + reason: "Expect table to have name", } + .fail()?, + }; + let table = ctx.table(&table_reference)?; + let get_table = Plan::Get { + id: crate::expr::Id::Global(table.0), + }; + let get_table = TypedPlan { + schema: table.1, + plan: get_table, + }; + + if let Some(MaskExpression { + select: Some(projection), + .. + }) = &read.projection + { + let column_indices: Vec = projection + .struct_items + .iter() + .map(|item| item.field as usize) + .collect(); + let input_arity = get_table.schema.typ().column_types.len(); + let mfp = MapFilterProject::new(input_arity).project(column_indices.clone())?; + get_table.mfp(mfp.into_safe()) + } else { + Ok(get_table) } - Some(RelType::Aggregate(agg)) => { - TypedPlan::from_substrait_agg_rel(ctx, agg, extensions) - } + } else { + not_impl_err!("Only NamedTable reads are supported") + } + } + + /// Convert Substrait Rel into Flow's TypedPlan + /// TODO(discord9): SELECT DISTINCT(does it get compile with something else?) + pub fn from_substrait_rel( + ctx: &mut FlownodeContext, + rel: &Rel, + extensions: &FunctionExtensions, + ) -> Result { + match &rel.rel_type { + Some(RelType::Project(p)) => Self::from_substrait_project(ctx, p.as_ref(), extensions), + Some(RelType::Filter(filter)) => Self::from_substrait_filter(ctx, filter, extensions), + Some(RelType::Read(read)) => Self::from_substrait_read(ctx, read, extensions), + Some(RelType::Aggregate(agg)) => Self::from_substrait_agg_rel(ctx, agg, extensions), _ => not_impl_err!("Unsupported relation type: {:?}", rel.rel_type), } } diff --git a/tests/cases/standalone/flow/basic.result b/tests/cases/standalone/flow/basic.result new file mode 100644 index 000000000000..c9a3c7714abc --- /dev/null +++ b/tests/cases/standalone/flow/basic.result @@ -0,0 +1,61 @@ +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 3s +SELECT col_0, window_start, window_end FROM out_num_cnt; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers; + +Affected Rows: 0 + +DROP TABLE numbers_input; + +Affected Rows: 0 + +DROP TABLE out_num_cnt; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow/basic.sql b/tests/cases/standalone/flow/basic.sql new file mode 100644 index 000000000000..9043875dd13a --- /dev/null +++ b/tests/cases/standalone/flow/basic.sql @@ -0,0 +1,31 @@ +CREATE TABLE numbers_input ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_numbers +SINK TO out_num_cnt +AS +SELECT sum(number) FROM numbers_input GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +INSERT INTO numbers_input +VALUES + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS SLEEP 3s +SELECT col_0, window_start, window_end FROM out_num_cnt; + +INSERT INTO numbers_input +VALUES + (23,"2021-07-01 00:00:01.000"), + (24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt; + +DROP FLOW test_numbers; +DROP TABLE numbers_input; +DROP TABLE out_num_cnt; diff --git a/tests/cases/standalone/flow/df_func.result b/tests/cases/standalone/flow/df_func.result new file mode 100644 index 000000000000..7ab393eeb10e --- /dev/null +++ b/tests/cases/standalone/flow/df_func.result @@ -0,0 +1,126 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 42 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 47 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +Affected Rows: 0 + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 2 + +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | ++-------+---------------------+---------------------+ + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + ++-------+---------------------+---------------------+ +| col_0 | window_start | window_end | ++-------+---------------------+---------------------+ +| 2 | 2021-07-01T00:00:00 | 2021-07-01T00:00:01 | +| 1 | 2021-07-01T00:00:01 | 2021-07-01T00:00:02 | ++-------+---------------------+---------------------+ + +DROP FLOW test_numbers_df_func; + +Affected Rows: 0 + +DROP TABLE numbers_input_df_func; + +Affected Rows: 0 + +DROP TABLE out_num_cnt_df_func; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/flow/df_func.sql b/tests/cases/standalone/flow/df_func.sql new file mode 100644 index 000000000000..b9a22cb9da6d --- /dev/null +++ b/tests/cases/standalone/flow/df_func.sql @@ -0,0 +1,67 @@ +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +-- call `sum(abs(number))` where `abs` is DataFusion Function and `sum` is flow function +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT sum(abs(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; + +CREATE TABLE numbers_input_df_func ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +-- call `abs(sum(number))`to make sure that calling `abs` function(impl by datafusion) on `sum` function(impl by flow) is working +CREATE FLOW test_numbers_df_func +SINK TO out_num_cnt_df_func +AS +SELECT abs(sum(number)) FROM numbers_input_df_func GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); + +INSERT INTO numbers_input_df_func +VALUES + (-20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- sleep a little bit longer to make sure that table is created and data is inserted +-- SQLNESS SLEEP 3s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +INSERT INTO numbers_input_df_func +VALUES + (23,"2021-07-01 00:00:01.000"), + (-24,"2021-07-01 00:00:01.500"); + +-- SQLNESS SLEEP 2s +SELECT col_0, window_start, window_end FROM out_num_cnt_df_func; + +DROP FLOW test_numbers_df_func; +DROP TABLE numbers_input_df_func; +DROP TABLE out_num_cnt_df_func; diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 6118c863fb15..1bac0c593337 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -17,7 +17,8 @@ common-recordbatch.workspace = true common-time.workspace = true serde.workspace = true serde_json.workspace = true -sqlness = { version = "0.5" } +# sqlness 0.6.0 have a bug causing `cargo sqlness` to fail(see https://github.com/CeresDB/sqlness/issues/68) which is fixed in 0.6.1 +sqlness = "0.6.1" tempfile.workspace = true tinytemplate = "1.2" tokio.workspace = true