From 4edaa866c8a4a9adec915d509e3a5e1cefd935e5 Mon Sep 17 00:00:00 2001 From: Alexandr Romanenko Date: Tue, 3 Dec 2024 16:55:23 +0100 Subject: [PATCH] multi stage and running total pass tests --- packages/cubejs-backend-shared/src/time.ts | 1 - .../src/adapter/BaseQuery.js | 4 +- .../postgres/sql-generation.test.ts | 12 +- .../cubesqlplanner/src/plan/filter.rs | 10 +- .../cubesqlplanner/src/plan/join.rs | 93 ++++----- .../cubesqlplanner/src/plan/time_series.rs | 7 +- .../src/planner/base_measure.rs | 8 + .../src/planner/base_time_dimension.rs | 24 ++- .../src/planner/filter/base_filter.rs | 75 +++++++- .../src/planner/filter/compiler.rs | 6 +- .../src/planner/filter/filter_operator.rs | 3 + .../cubesqlplanner/src/planner/filter/mod.rs | 1 + .../src/planner/granularity_helper.rs | 149 ++++++++++++++ .../cubesqlplanner/src/planner/mod.rs | 2 + .../planners/multi_stage/applied_state.rs | 181 ++++++++++++++++-- .../planner/planners/multi_stage/member.rs | 6 + .../multi_stage/member_query_planner.rs | 47 ++--- .../planners/multi_stage_query_planner.rs | 73 ++++++- .../collectors/has_multi_stage_members.rs | 4 +- .../sql_node_transformers/set_schema.rs | 12 +- .../sql_evaluator/sql_nodes/evaluate_sql.rs | 18 +- .../sql_evaluator/sql_nodes/factory.rs | 22 ++- .../sql_evaluator/sql_nodes/final_measure.rs | 6 +- .../planner/sql_evaluator/sql_nodes/mod.rs | 2 + .../sql_evaluator/sql_nodes/rolling_window.rs | 76 ++++++++ .../sql_evaluator/symbols/measure_symbol.rs | 8 + .../src/planner/sql_templates/filter.rs | 21 ++ 27 files changed, 711 insertions(+), 160 deletions(-) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/planner/granularity_helper.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/rolling_window.rs diff --git a/packages/cubejs-backend-shared/src/time.ts b/packages/cubejs-backend-shared/src/time.ts index e823863b21c1d..2d795cf074bb8 100644 --- a/packages/cubejs-backend-shared/src/time.ts +++ b/packages/cubejs-backend-shared/src/time.ts @@ -179,7 +179,6 @@ export const timeSeries = (granularity: string, dateRange: QueryDateRange, optio // moment.range works with strings const range = moment.range(dateRange[0], dateRange[1]); - console.log("!!!! timeSeries:", TIME_SERIES[granularity](range, options.timestampPrecision)); return TIME_SERIES[granularity](range, options.timestampPrecision); }; diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 96f3e4ee57162..5e3f728dd73ab 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -734,7 +734,6 @@ export class BaseQuery { offset = offset || 'end'; return this.timeDimensions.map( d => [d, (dateFrom, dateTo, dateField, dimensionDateFrom, dimensionDateTo, isFromStartToEnd) => { - console.log("!!!!!! IsFromStartToEnd: ", isFromStartToEnd, " --"); // dateFrom based window const conditions = []; if (trailingInterval !== 'unbounded') { @@ -1420,7 +1419,6 @@ export class BaseQuery { ) ).join(' AND '); - console.log("!!! date join contdition sql: ", dateJoinConditionSql); return this.overTimeSeriesSelect( cumulativeMeasures, @@ -3292,6 +3290,8 @@ export class BaseQuery { cube: 'CUBE({{ exprs_concat }})', negative: '-({{ expr }})', not: 'NOT ({{ expr }})', + add_interval: '{{ date }} + interval \'{{ interval }}\'', + sub_interval: '{{ date }} - interval \'{{ interval }}\'', true: 'TRUE', false: 'FALSE', like: '{{ expr }} {% if negated %}NOT {% endif %}LIKE {{ pattern }}', diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts index bc27c47b4f0ba..103a9f6dbee12 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts @@ -64,7 +64,7 @@ describe('SQL Generation', () => { offset: 'start' } }, - revenueRolling3day: { + revenueRollingThreeDay: { type: 'sum', sql: 'amount', rollingWindow: { @@ -666,7 +666,7 @@ describe('SQL Generation', () => { console.log(query.buildSqlAndParams()); // TODO ordering doesn't work for running total - return dbRunner.testQuery(query.buildSqlAndParams()).then(res => { + return dbRunner.testQuery(query.buildSqlAndParamsTest()).then(res => { console.log(JSON.stringify(res)); expect(res).toEqual( [{ @@ -704,7 +704,7 @@ describe('SQL Generation', () => { }); }); - it('rolling 1', async () => runQueryTest({ + it('rolling', async () => runQueryTest({ measures: [ 'visitors.revenueRolling' ], @@ -730,7 +730,7 @@ describe('SQL Generation', () => { { visitors__created_at_day: '2017-01-10T00:00:00.000Z', visitors__revenue_rolling: null } ])); - it('rolling multiplied 1', async () => runQueryTest({ + it('rolling multiplied', async () => runQueryTest({ measures: [ 'visitors.revenueRolling', 'visitor_checkins.visitor_checkins_count' @@ -769,7 +769,7 @@ describe('SQL Generation', () => { it('rolling month', async () => runQueryTest({ measures: [ - 'visitors.revenueRolling3day' + 'visitors.revenueRollingThreeDay' ], timeDimensions: [{ dimension: 'visitors.created_at', @@ -781,7 +781,7 @@ describe('SQL Generation', () => { }], timezone: 'America/Los_Angeles' }, [ - { visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling3day: '900' } + { visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling_three_day: '900' } ])); it('rolling count', async () => runQueryTest({ diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs index f118aefe5d8e5..9776ad592a37a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs @@ -6,7 +6,7 @@ use cubenativeutils::CubeError; use std::fmt; use std::rc::Rc; -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum FilterGroupOperator { Or, And, @@ -18,13 +18,19 @@ pub struct FilterGroup { pub items: Vec, } +impl PartialEq for FilterGroup { + fn eq(&self, other: &Self) -> bool { + self.operator == other.operator && self.items == other.items + } +} + impl FilterGroup { pub fn new(operator: FilterGroupOperator, items: Vec) -> Self { Self { operator, items } } } -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum FilterItem { Group(Rc), Item(Rc), diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs index 0d06cb2cf33b9..bd32f03c2b825 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs @@ -2,6 +2,7 @@ use super::{time_series, Schema, SingleAliasedSource}; use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::{BaseJoinCondition, BaseMember, VisitorContext}; use cubenativeutils::CubeError; +use lazy_static::lazy_static; use std::rc::Rc; @@ -33,29 +34,6 @@ impl RollingWindowJoinCondition { } } - /* - * - offset = offset || 'end'; - return this.timeDimensions.map( - d => [d, (dateFrom, dateTo, dateField, dimensionDateFrom, dimensionDateTo, isFromStartToEnd) => { - // dateFrom based window - const conditions = []; - if (trailingInterval !== 'unbounded') { - const startDate = isFromStartToEnd || offset === 'start' ? dateFrom : dateTo; - const trailingStart = trailingInterval ? this.subtractInterval(startDate, trailingInterval) : startDate; - const sign = offset === 'start' ? '>=' : '>'; - conditions.push(`${dateField} ${sign} ${trailingStart}`); - } - if (leadingInterval !== 'unbounded') { - const endDate = isFromStartToEnd || offset === 'end' ? dateTo : dateFrom; - const leadingEnd = leadingInterval ? this.addInterval(endDate, leadingInterval) : endDate; - const sign = offset === 'end' ? '<=' : '<'; - conditions.push(`${dateField} ${sign} ${leadingEnd}`); - } - return conditions.length ? conditions.join(' AND ') : '1 = 1'; - }] - ); - */ pub fn to_sql( &self, templates: &PlanSqlTemplates, @@ -63,55 +41,48 @@ impl RollingWindowJoinCondition { schema: Rc, ) -> Result { let mut conditions = vec![]; - /* let date_column_alias = if let Some(column) = schema.find_column_for_member(&self.time_dimension.full_name(), &None) { - templates.column_reference(&source, &column.alias.clone()) - } else { - dimension.to_sql(context.clone(), schema.clone()) - } */ let date_column_alias = self.resolve_time_column_alias(templates, context.clone(), schema.clone())?; - if let Some(trailing_interval) = &self.trailing_interval { - if trailing_interval != "unbounded" { - let start_date = if self.offset == "start" { - templates - .column_reference(&Some(self.time_series_source.clone()), "date_from")? - } else { - templates.column_reference(&Some(self.time_series_source.clone()), "date_to")? - }; - let trailing_start = if let Some(trailing_interval) = &self.trailing_interval { - format!("{start_date} - interval '{trailing_interval}'") - } else { - start_date - }; + lazy_static! { + static ref UNBOUNDED: Option = Some("unbounded".to_string()); + } - let sign = if self.offset == "start" { ">=" } else { ">" }; + if self.trailing_interval != *UNBOUNDED { + let start_date = if self.offset == "start" { + templates.column_reference(&Some(self.time_series_source.clone()), "date_from")? + } else { + templates.column_reference(&Some(self.time_series_source.clone()), "date_to")? + }; - conditions.push(format!("{date_column_alias} {sign} {trailing_start}")); - } + let trailing_start = if let Some(trailing_interval) = &self.trailing_interval { + format!("{start_date} - interval '{trailing_interval}'") + } else { + start_date + }; + + let sign = if self.offset == "start" { ">=" } else { ">" }; + + conditions.push(format!("{date_column_alias} {sign} {trailing_start}")); } - if let Some(leading_interval) = &self.trailing_interval { - if leading_interval != "unbounded" { - let end_date = if self.offset == "end" { - templates.column_reference(&Some(self.time_series_source.clone()), "date_to")? - } else { - templates - .column_reference(&Some(self.time_series_source.clone()), "date_from")? - }; + if self.leading_interval != *UNBOUNDED { + let end_date = if self.offset == "end" { + templates.column_reference(&Some(self.time_series_source.clone()), "date_to")? + } else { + templates.column_reference(&Some(self.time_series_source.clone()), "date_from")? + }; - let leading_end = if let Some(leading_interval) = &self.leading_interval { - format!("{end_date} + interval '{leading_interval}'") - } else { - end_date - }; + let leading_end = if let Some(leading_interval) = &self.leading_interval { + format!("{end_date} + interval '{leading_interval}'") + } else { + end_date + }; - let sign = if self.offset == "end" { "<=" } else { "<" }; + let sign = if self.offset == "end" { "<=" } else { "<" }; - conditions.push(format!("{date_column_alias} {sign} {leading_end}")); - } + conditions.push(format!("{date_column_alias} {sign} {leading_end}")); } - let result = if conditions.is_empty() { templates.always_true()? } else { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs index b2c891bac287a..774fa0e825878 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs @@ -12,13 +12,12 @@ pub struct TimeSeries { impl TimeSeries { pub fn make_schema(&self, self_alias: Option) -> Schema { - /* let column = SchemaColumn::new( + let column = SchemaColumn::new( self_alias, - format!("from_date"), + format!("date_from"), self.time_dimension_name.clone(), ); - Schema::new(vec![column], vec![]) */ - Schema::empty() + Schema::new(vec![column], vec![]) } pub fn to_sql(&self, templates: &PlanSqlTemplates) -> Result { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_measure.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_measure.rs index 66b4f263483f8..5fa5ce929c703 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_measure.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_measure.rs @@ -205,6 +205,14 @@ impl BaseMeasure { self.rolling_window().is_some() } + pub fn is_running_total(&self) -> bool { + self.measure_type() == "runningTotal" + } + + pub fn is_cumulative(&self) -> bool { + self.is_rolling_window() || self.is_running_total() + } + //FIXME dublicate with symbol pub fn measure_type(&self) -> &String { &self.definition.static_data().measure_type diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_time_dimension.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_time_dimension.rs index 38fa14ff5f7e7..f6e297971e249 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_time_dimension.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_time_dimension.rs @@ -11,6 +11,7 @@ pub struct BaseTimeDimension { query_tools: Rc, granularity: Option, date_range: Option>, + alias_suffix: String, } impl BaseMember for BaseTimeDimension { @@ -55,12 +56,7 @@ impl BaseMember for BaseTimeDimension { } fn alias_suffix(&self) -> Option { - let granularity = if let Some(granularity) = &self.granularity { - granularity - } else { - "day" - }; - Some(granularity.to_string()) + Some(self.alias_suffix.clone()) } } @@ -71,14 +67,30 @@ impl BaseTimeDimension { granularity: Option, date_range: Option>, ) -> Result, CubeError> { + let alias_suffix = if let Some(granularity) = &granularity { + granularity.clone() + } else { + "day".to_string() + }; Ok(Rc::new(Self { dimension: BaseDimension::try_new_required(member_evaluator, query_tools.clone())?, query_tools, granularity, date_range, + alias_suffix, })) } + pub fn change_granularity(&self, new_granularity: Option) -> Rc { + Rc::new(Self { + dimension: self.dimension.clone(), + query_tools: self.query_tools.clone(), + granularity: new_granularity, + date_range: self.date_range.clone(), + alias_suffix: self.alias_suffix.clone(), + }) + } + pub fn get_granularity(&self) -> Option { self.granularity.clone() } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs index 02247f982135e..fb1de9080e192 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs @@ -26,6 +26,14 @@ pub struct BaseFilter { templates: FilterTemplates, } +impl PartialEq for BaseFilter { + fn eq(&self, other: &Self) -> bool { + self.filter_type == other.filter_type + && self.filter_operator == other.filter_operator + && self.values == other.values + } +} + lazy_static! { static ref DATE_TIME_LOCAL_MS_RE: Regex = Regex::new(r"^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d\.\d\d\d$").unwrap(); @@ -39,7 +47,7 @@ impl BaseFilter { query_tools: Rc, member_evaluator: Rc, filter_type: FilterType, - filter_operator: String, + filter_operator: FilterOperator, values: Option>>, ) -> Result, CubeError> { let templates = FilterTemplates::new(query_tools.templates_render()); @@ -52,12 +60,35 @@ impl BaseFilter { query_tools, member_evaluator, filter_type, - filter_operator: FilterOperator::from_str(&filter_operator)?, + filter_operator, values, templates, })) } + pub fn change_operator( + &self, + filter_operator: FilterOperator, + values: Vec>, + ) -> Rc { + Rc::new(Self { + query_tools: self.query_tools.clone(), + member_evaluator: self.member_evaluator.clone(), + filter_type: self.filter_type.clone(), + filter_operator, + values, + templates: self.templates.clone(), + }) + } + + pub fn values(&self) -> &Vec> { + &self.values + } + + pub fn filter_operator(&self) -> &FilterOperator { + &self.filter_operator + } + pub fn member_name(&self) -> String { self.member_evaluator.full_name() } @@ -77,6 +108,7 @@ impl BaseFilter { FilterOperator::Equal => self.equals_where(&member_sql)?, FilterOperator::NotEqual => self.not_equals_where(&member_sql)?, FilterOperator::InDateRange => self.in_date_range(&member_sql)?, + FilterOperator::InDateRangeExtended => self.in_date_range_extended(&member_sql)?, FilterOperator::In => self.in_where(&member_sql)?, FilterOperator::NotIn => self.not_in_where(&member_sql)?, FilterOperator::Set => self.set_where(&member_sql)?, @@ -127,6 +159,45 @@ impl BaseFilter { .time_range_filter(member_sql.to_string(), from, to) } + fn extend_date_range_bound( + &self, + date: String, + interval: &Option, + is_sub: bool, + ) -> Result { + if let Some(interval) = interval { + if interval != "unbounded" { + if is_sub { + self.templates.sub_interval(date, interval.clone()) + } else { + self.templates.add_interval(date, interval.clone()) + } + } else { + Ok(date.to_string()) + } + } else { + Ok(date.to_string()) + } + } + fn in_date_range_extended(&self, member_sql: &str) -> Result { + let (from, to) = self.allocate_date_params()?; + + let from = if self.values.len() >= 3 { + self.extend_date_range_bound(from, &self.values[2], true)? + } else { + from + }; + + let to = if self.values.len() >= 4 { + self.extend_date_range_bound(to, &self.values[3], false)? + } else { + to + }; + + self.templates + .time_range_filter(member_sql.to_string(), from, to) + } + fn in_where(&self, member_sql: &str) -> Result { let need_null_check = self.is_need_null_chek(false); self.templates.in_where( diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/compiler.rs index d103229021eeb..17e8a64a6be20 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/compiler.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/compiler.rs @@ -1,4 +1,5 @@ use super::base_filter::{BaseFilter, FilterType}; +use super::FilterOperator; use crate::cube_bridge::base_query_options::FilterItem as NativeFilterItem; use crate::plan::filter::{FilterGroup, FilterGroupOperator, FilterItem}; use crate::planner::query_tools::QueryTools; @@ -6,6 +7,7 @@ use crate::planner::sql_evaluator::Compiler; use crate::planner::BaseTimeDimension; use cubenativeutils::CubeError; use std::rc::Rc; +use std::str::FromStr; pub struct FilterCompiler<'a> { evaluator_compiler: &'a mut Compiler, @@ -43,7 +45,7 @@ impl<'a> FilterCompiler<'a> { self.query_tools.clone(), item.member_evaluator(), FilterType::Dimension, - "InDateRange".to_string(), + FilterOperator::InDateRange, Some(date_range.into_iter().map(|v| Some(v)).collect()), )?; self.time_dimension_filters.push(FilterItem::Item(filter)); @@ -92,7 +94,7 @@ impl<'a> FilterCompiler<'a> { self.query_tools.clone(), evaluator, item_type.clone(), - operator.clone(), + FilterOperator::from_str(&operator)?, item.values.clone(), )?)) } else { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/filter_operator.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/filter_operator.rs index 5b3eeae125f06..d66caf4368928 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/filter_operator.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/filter_operator.rs @@ -1,9 +1,12 @@ use cubenativeutils::CubeError; use std::str::FromStr; + +#[derive(Clone, PartialEq, Debug)] pub enum FilterOperator { Equal, NotEqual, InDateRange, + InDateRangeExtended, In, NotIn, Set, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs index 02277136c6979..01f95fd0d8ba6 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs @@ -3,3 +3,4 @@ pub mod compiler; pub mod filter_operator; pub use base_filter::BaseFilter; +pub use filter_operator::FilterOperator; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/granularity_helper.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/granularity_helper.rs new file mode 100644 index 0000000000000..b1e4bf1863b80 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/granularity_helper.rs @@ -0,0 +1,149 @@ +use cubenativeutils::CubeError; +use itertools::Itertools; +use lazy_static::lazy_static; +use std::collections::HashMap; + +pub struct GranularityHelper {} + +impl GranularityHelper { + pub fn min_granularity( + granularity_a: &Option, + granularity_b: &Option, + ) -> Result, CubeError> { + if let Some((granularity_a, granularity_b)) = + granularity_a.as_ref().zip(granularity_b.as_ref()) + { + let a_parents = Self::granularity_parents(granularity_a)?; + let b_parents = Self::granularity_parents(granularity_b)?; + let diff_position = a_parents + .iter() + .zip(b_parents.iter()) + .find_position(|(a, b)| a != b); + if let Some((diff_position, _)) = diff_position { + if diff_position == 0 { + Err(CubeError::user(format!( + "Can't find common parent for '{granularity_a}' and '{granularity_b}'" + ))) + } else { + Ok(Some(a_parents[diff_position - 1].clone())) + } + } else { + if a_parents.len() >= b_parents.len() { + Ok(Some(b_parents.last().unwrap().clone())) + } else { + Ok(Some(a_parents.last().unwrap().clone())) + } + } + } else if granularity_a.is_some() { + Ok(granularity_a.clone()) + } else { + Ok(granularity_b.clone()) + } + } + + pub fn granularity_from_interval(interval: &Option) -> Option { + if let Some(interval) = interval { + if interval.find("day").is_some() { + Some("day".to_string()) + } else if interval.find("month").is_some() { + Some("month".to_string()) + } else if interval.find("year").is_some() { + Some("year".to_string()) + } else if interval.find("week").is_some() { + Some("week".to_string()) + } else if interval.find("hour").is_some() { + Some("hour".to_string()) + } else { + None + } + } else { + None + } + } + + pub fn granularity_parents(granularity: &str) -> Result<&Vec, CubeError> { + if let Some(parents) = Self::standard_granularity_parents().get(granularity) { + Ok(parents) + } else { + Err(CubeError::user(format!( + "Granularity {} not found", + granularity + ))) + } + } + + pub fn standard_granularity_parents() -> &'static HashMap> { + lazy_static! { + static ref STANDARD_GRANULARITIES_PARENTS: HashMap> = { + let mut map = HashMap::new(); + map.insert( + "year".to_string(), + vec![ + "second".to_string(), + "minute".to_string(), + "hour".to_string(), + "day".to_string(), + "month".to_string(), + "quarter".to_string(), + "year".to_string(), + ], + ); + map.insert( + "quarter".to_string(), + vec![ + "second".to_string(), + "minute".to_string(), + "hour".to_string(), + "day".to_string(), + "month".to_string(), + "quarter".to_string(), + ], + ); + map.insert( + "month".to_string(), + vec![ + "second".to_string(), + "minute".to_string(), + "hour".to_string(), + "day".to_string(), + "month".to_string(), + ], + ); + map.insert( + "week".to_string(), + vec![ + "second".to_string(), + "minute".to_string(), + "hour".to_string(), + "day".to_string(), + "week".to_string(), + ], + ); + map.insert( + "day".to_string(), + vec![ + "second".to_string(), + "minute".to_string(), + "hour".to_string(), + "day".to_string(), + ], + ); + map.insert( + "hour".to_string(), + vec![ + "second".to_string(), + "minute".to_string(), + "hour".to_string(), + ], + ); + map.insert( + "minute".to_string(), + vec!["second".to_string(), "minute".to_string()], + ); + map.insert("second".to_string(), vec!["second".to_string()]); + map + }; + } + &STANDARD_GRANULARITIES_PARENTS + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs index a6def37da64c1..cc457b6318ecc 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs @@ -6,6 +6,7 @@ pub mod base_member; pub mod base_query; pub mod base_time_dimension; pub mod filter; +pub mod granularity_helper; pub mod params_allocator; pub mod planners; pub mod query_properties; @@ -22,6 +23,7 @@ pub use base_measure::BaseMeasure; pub use base_member::{BaseMember, BaseMemberHelper}; pub use base_query::BaseQuery; pub use base_time_dimension::BaseTimeDimension; +pub use granularity_helper::GranularityHelper; pub use params_allocator::ParamsAllocator; pub use query_properties::{FullKeyAggregateMeasures, OrderByItem, QueryProperties}; pub use visitor_context::{evaluate_with_context, VisitorContext}; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/applied_state.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/applied_state.rs index c47854234ad8d..5f729bb51f8ad 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/applied_state.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/applied_state.rs @@ -1,5 +1,7 @@ +use crate::plan::{FilterGroup, FilterItem}; +use crate::planner::filter::FilterOperator; use crate::planner::planners::multi_stage::MultiStageTimeShift; -use crate::planner::BaseDimension; +use crate::planner::{BaseDimension, BaseTimeDimension}; use itertools::Itertools; use std::cmp::PartialEq; use std::collections::{HashMap, HashSet}; @@ -8,27 +10,39 @@ use std::rc::Rc; #[derive(Clone)] pub struct MultiStageAppliedState { + time_dimensions: Vec>, dimensions: Vec>, - allowed_filter_members: HashSet, + time_dimensions_filters: Vec, + dimensions_filters: Vec, + measures_filters: Vec, time_shifts: HashMap, } impl MultiStageAppliedState { pub fn new( + time_dimensions: Vec>, dimensions: Vec>, - allowed_filter_members: HashSet, + time_dimensions_filters: Vec, + dimensions_filters: Vec, + measures_filters: Vec, ) -> Rc { Rc::new(Self { + time_dimensions, dimensions, - allowed_filter_members, + time_dimensions_filters, + dimensions_filters, + measures_filters, time_shifts: HashMap::new(), }) } pub fn clone_state(&self) -> Self { Self { + time_dimensions: self.time_dimensions.clone(), dimensions: self.dimensions.clone(), - allowed_filter_members: self.allowed_filter_members.clone(), + time_dimensions_filters: self.time_dimensions_filters.clone(), + dimensions_filters: self.dimensions_filters.clone(), + measures_filters: self.measures_filters.clone(), time_shifts: self.time_shifts.clone(), } } @@ -54,35 +68,165 @@ impl MultiStageAppliedState { &self.time_shifts } - pub fn is_filter_allowed(&self, name: &str) -> bool { - self.allowed_filter_members.contains(name) + pub fn time_dimensions_filters(&self) -> &Vec { + &self.time_dimensions_filters } - pub fn allowed_filter_members(&self) -> &HashSet { - &self.allowed_filter_members + pub fn dimensions_filters(&self) -> &Vec { + &self.dimensions_filters } - pub fn disallow_filter(&mut self, name: &str) { - self.allowed_filter_members.take(name); + pub fn measures_filters(&self) -> &Vec { + &self.measures_filters } pub fn dimensions(&self) -> &Vec> { &self.dimensions } + + pub fn time_dimensions(&self) -> &Vec> { + &self.time_dimensions + } + + pub fn change_time_dimension_granularity( + &mut self, + dimension_name: &str, + new_granularity: Option, + ) { + if let Some(time_dimension) = self + .time_dimensions + .iter_mut() + .find(|dim| dim.member_evaluator().full_name() == dimension_name) + { + *time_dimension = time_dimension.change_granularity(new_granularity); + } + } + + pub fn remove_filter_for_member(&mut self, member_name: &String) { + self.time_dimensions_filters = + self.extract_filters_exclude_member(member_name, &self.time_dimensions_filters); + self.dimensions_filters = + self.extract_filters_exclude_member(member_name, &self.dimensions_filters); + self.measures_filters = + self.extract_filters_exclude_member(member_name, &self.measures_filters); + } + + fn extract_filters_exclude_member( + &self, + member_name: &String, + filters: &Vec, + ) -> Vec { + let mut result = Vec::new(); + for item in filters.iter() { + match item { + FilterItem::Group(group) => { + let new_group = FilterItem::Group(Rc::new(FilterGroup::new( + group.operator.clone(), + self.extract_filters_exclude_member(member_name, &group.items), + ))); + result.push(new_group); + } + FilterItem::Item(itm) => { + if &itm.member_name() != member_name { + result.push(FilterItem::Item(itm.clone())); + } + } + } + } + result + } + + pub fn has_filters_for_member(&self, member_name: &String) -> bool { + self.has_filters_for_member_impl(member_name, &self.time_dimensions_filters) + || self.has_filters_for_member_impl(member_name, &self.dimensions_filters) + || self.has_filters_for_member_impl(member_name, &self.measures_filters) + } + + fn has_filters_for_member_impl(&self, member_name: &String, filters: &Vec) -> bool { + for item in filters.iter() { + match item { + FilterItem::Group(group) => { + if self.has_filters_for_member_impl(member_name, &group.items) { + return true; + } + } + FilterItem::Item(itm) => { + if &itm.member_name() == member_name { + return true; + } + } + } + } + false + } + + pub fn expand_date_range_filter( + &mut self, + member_name: &String, + left_interval: Option, + right_interval: Option, + ) { + self.time_dimensions_filters = self.expand_date_range_filter_impl( + member_name, + &self.time_dimensions_filters, + &left_interval, + &right_interval, + ); + } + + fn expand_date_range_filter_impl( + &self, + member_name: &String, + filters: &Vec, + left_interval: &Option, + right_interval: &Option, + ) -> Vec { + let mut result = Vec::new(); + for item in filters.iter() { + match item { + FilterItem::Group(group) => { + let new_group = FilterItem::Group(Rc::new(FilterGroup::new( + group.operator.clone(), + self.expand_date_range_filter_impl( + member_name, + filters, + left_interval, + right_interval, + ), + ))); + result.push(new_group); + } + FilterItem::Item(itm) => { + let itm = if &itm.member_name() == member_name + && matches!(itm.filter_operator(), FilterOperator::InDateRange) + { + let mut values = itm.values().clone(); + values.push(left_interval.clone()); + values.push(right_interval.clone()); + itm.change_operator(FilterOperator::InDateRangeExtended, values) + } else { + itm.clone() + }; + result.push(FilterItem::Item(itm)); + } + } + } + result + } } impl PartialEq for MultiStageAppliedState { fn eq(&self, other: &Self) -> bool { - let dims_eq = if !self.dimensions.len() == other.dimensions.len() { - false - } else { - self.dimensions + let dims_eq = self.dimensions.len() == other.dimensions.len() + && self + .dimensions .iter() .zip(other.dimensions.iter()) - .all(|(a, b)| a.member_evaluator().full_name() == b.member_evaluator().full_name()) - }; + .all(|(a, b)| a.member_evaluator().full_name() == b.member_evaluator().full_name()); dims_eq - && self.allowed_filter_members == other.allowed_filter_members + && self.time_dimensions_filters == other.time_dimensions_filters + && self.dimensions_filters == other.dimensions_filters + && self.measures_filters == other.measures_filters && self.time_shifts == other.time_shifts } } @@ -98,7 +242,6 @@ impl Debug for MultiStageAppliedState { .map(|d| d.member_evaluator().full_name()) .join(", "), ) - .field("allowed_filter_members", &self.allowed_filter_members) .field("time_shifts", &self.time_shifts) .finish() } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs index 6bb827a7c8c2e..34c7b059c4252 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member.rs @@ -73,12 +73,18 @@ pub struct RollingWindowDescription { pub offset: String, } +#[derive(Clone)] +pub struct RunningTotalDescription { + pub time_dimension: Rc, +} + #[derive(Clone)] pub enum MultiStageInodeMemberType { Rank, Aggregate, Calculate, RollingWindow(RollingWindowDescription), + RunningTotal(RunningTotalDescription), } #[derive(Clone)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs index a2d73ee0bda9e..8d681c3dbc6e8 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage/member_query_planner.rs @@ -109,7 +109,7 @@ impl MultiStageMemberQueryPlanner { rolling_window_desc.time_dimension.clone(), ); let cte_schema = cte_schemas.get(input).unwrap().clone(); - join_builder.inner_join_table_reference( + join_builder.left_join_table_reference( input.clone(), cte_schema, Some(format!("rolling_{}", i + 1)), @@ -125,11 +125,24 @@ impl MultiStageMemberQueryPlanner { .collect_vec(); let context_factory = SqlNodesFactory::new(); - let node_context = context_factory.default_node_processor(); + let node_context = context_factory.rolling_window_node_processor(); let mut select_builder = SelectBuilder::new(from, VisitorContext::new(None, node_context)); for dim in dimensions.iter() { - select_builder.add_projection_member(&dim, None, None); + if dim.full_name() == rolling_window_desc.time_dimension.full_name() { + select_builder.add_projection_member( + &dim, + Some(root_alias.clone()), + Some( + cte_schemas + .get(&inputs[1]) + .unwrap() + .resolve_member_alias(&dim, &Some(inputs[1].clone())), + ), + ); + } else { + select_builder.add_projection_member(&dim, None, None); + } } let query_member = self.query_member_as_base_member()?; @@ -267,29 +280,17 @@ impl MultiStageMemberQueryPlanner { { vec![measure] } else { - println!("!!! kkkk {}", self.description.alias()); vec![] }; - let allowed_filter_members = self.description.state().allowed_filter_members().clone(); - let cte_query_properties = QueryProperties::try_new_from_precompiled( self.query_tools.clone(), measures, self.description.state().dimensions().clone(), - self.query_properties.time_dimensions().clone(), - self.extract_filters( - &allowed_filter_members, - self.query_properties.time_dimensions_filters(), - ), - self.extract_filters( - &allowed_filter_members, - self.query_properties.dimensions_filters(), - ), - self.extract_filters( - &allowed_filter_members, - self.query_properties.measures_filters(), - ), + self.description.state().time_dimensions().clone(), + self.description.state().time_dimensions_filters().clone(), + self.description.state().dimensions_filters().clone(), + self.description.state().measures_filters().clone(), vec![], None, None, @@ -364,7 +365,7 @@ impl MultiStageMemberQueryPlanner { fn all_dimensions(&self) -> Vec> { BaseMemberHelper::iter_as_base_member(self.description.state().dimensions()) .chain(BaseMemberHelper::iter_as_base_member( - self.query_properties.time_dimensions(), + self.description.state().time_dimensions(), )) .collect_vec() } @@ -381,7 +382,7 @@ impl MultiStageMemberQueryPlanner { fn all_input_dimensions(&self) -> Vec> { BaseMemberHelper::iter_as_base_member(&self.input_dimensions()) .chain(BaseMemberHelper::iter_as_base_member( - self.query_properties.time_dimensions(), + self.description.state().time_dimensions(), )) .collect_vec() } @@ -488,7 +489,7 @@ impl MultiStageMemberQueryPlanner { fn subquery_order(&self) -> Result, CubeError> { let order_items = QueryProperties::default_order( &self.input_dimensions(), - &self.query_properties.time_dimensions(), + &self.description.state().time_dimensions(), &self.raw_input_measures()?, ); Ok(OrderPlanner::custom_order( @@ -506,7 +507,7 @@ impl MultiStageMemberQueryPlanner { let order_items = QueryProperties::default_order( &self.description.state().dimensions(), - &self.query_properties.time_dimensions(), + &self.description.state().time_dimensions(), &measures, ); let mut all_members = self.all_dimensions().clone(); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage_query_planner.rs index d5b8e965d19b7..1237a0ece2c75 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multi_stage_query_planner.rs @@ -3,6 +3,7 @@ use super::multi_stage::{ MultiStageLeafMemberType, MultiStageMember, MultiStageMemberQueryPlanner, MultiStageMemberType, MultiStageQueryDescription, MultiStageTimeShift, RollingWindowDescription, }; +use crate::cube_bridge::measure_definition::RollingWindow; use crate::plan::{Cte, From, Schema, Select, SelectBuilder}; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::collectors::has_multi_stage_members; @@ -10,7 +11,7 @@ use crate::planner::sql_evaluator::collectors::member_childs; use crate::planner::sql_evaluator::sql_nodes::SqlNodesFactory; use crate::planner::sql_evaluator::EvaluationNode; use crate::planner::{BaseDimension, BaseMeasure, VisitorContext}; -use crate::planner::{BaseTimeDimension, QueryProperties}; +use crate::planner::{BaseTimeDimension, GranularityHelper, QueryProperties}; use cubenativeutils::CubeError; use itertools::Itertools; use std::collections::HashMap; @@ -45,10 +46,12 @@ impl MultiStageQueryPlanner { return Ok((vec![], vec![])); } let mut descriptions = Vec::new(); - let all_filter_members = self.query_properties.all_filtered_members(); let state = MultiStageAppliedState::new( + self.query_properties.time_dimensions().clone(), self.query_properties.dimensions().clone(), - all_filter_members, + self.query_properties.time_dimensions_filters().clone(), + self.query_properties.dimensions_filters().clone(), + self.query_properties.measures_filters().clone(), ); let top_level_ctes = multi_stage_members @@ -190,7 +193,7 @@ impl MultiStageQueryPlanner { MultiStageMemberType::Leaf(MultiStageLeafMemberType::Measure), member, ), - state.clone(), + state, vec![], alias.clone(), ); @@ -198,6 +201,36 @@ impl MultiStageQueryPlanner { Ok(description) } + fn make_rolling_base_state( + &self, + time_dimension: Rc, + rolling_window: &RollingWindow, + state: Rc, + ) -> Result, CubeError> { + let time_dimension_name = time_dimension.member_evaluator().full_name(); + let mut new_state = state.clone_state(); + let trailing_granularity = + GranularityHelper::granularity_from_interval(&rolling_window.trailing); + let leading_granularity = + GranularityHelper::granularity_from_interval(&rolling_window.leading); + let window_granularity = + GranularityHelper::min_granularity(&trailing_granularity, &leading_granularity)?; + let result_granularity = GranularityHelper::min_granularity( + &window_granularity, + &time_dimension.get_granularity(), + )?; + + new_state.change_time_dimension_granularity(&time_dimension_name, result_granularity); + + new_state.expand_date_range_filter( + &time_dimension_name, + rolling_window.trailing.clone(), + rolling_window.leading.clone(), + ); + + Ok(Rc::new(new_state)) + } + fn try_make_rolling_window( &self, member: Rc, @@ -205,8 +238,22 @@ impl MultiStageQueryPlanner { descriptions: &mut Vec>, ) -> Result>, CubeError> { if let Some(measure) = BaseMeasure::try_new(member.clone(), self.query_tools.clone())? { - if let Some(rolling_window) = measure.rolling_window() { + if measure.is_cumulative() { + let rolling_window = if let Some(rolling_window) = measure.rolling_window() { + rolling_window.clone() + } else { + RollingWindow { + trailing: Some("unbounded".to_string()), + leading: None, + offset: None, + } + }; let time_dimensions = self.query_properties.time_dimensions(); + if time_dimensions.len() == 0 { + let rolling_base = + self.add_rolling_window_base(member.clone(), state.clone(), descriptions)?; + return Ok(Some(rolling_base)); + } if time_dimensions.len() != 1 { return Err(CubeError::internal( "Rolling window requires one time dimension".to_string(), @@ -219,7 +266,15 @@ impl MultiStageQueryPlanner { let input = vec![ self.add_time_series(time_dimension.clone(), state.clone(), descriptions)?, - self.add_rolling_window_base(source.clone(), state.clone(), descriptions)?, + self.add_rolling_window_base( + source.clone(), + self.make_rolling_base_state( + time_dimension.clone(), + &rolling_window, + state.clone(), + )?, + descriptions, + )?, ]; let time_dimension = time_dimensions[0].clone(); @@ -308,7 +363,7 @@ impl MultiStageQueryPlanner { let new_state = if !dimensions_to_add.is_empty() || !multi_stage_member.time_shifts().is_empty() - || state.is_filter_allowed(&member_name) + || state.has_filters_for_member(&member_name) { let mut new_state = state.clone_state(); if !dimensions_to_add.is_empty() { @@ -317,8 +372,8 @@ impl MultiStageQueryPlanner { if !multi_stage_member.time_shifts().is_empty() { new_state.add_time_shifts(multi_stage_member.time_shifts().clone()); } - if state.is_filter_allowed(&member_name) { - new_state.disallow_filter(&member_name); + if state.has_filters_for_member(&member_name) { + new_state.remove_filter_for_member(&member_name); } Rc::new(new_state) } else { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/has_multi_stage_members.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/has_multi_stage_members.rs index 30ed48ab13893..f7b8b6234be21 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/has_multi_stage_members.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/collectors/has_multi_stage_members.rs @@ -26,7 +26,9 @@ impl TraversalVisitor for HasMultiStageMembersCollector { MemberSymbolType::Measure(s) => { if s.is_multi_stage() { self.has_multi_stage = true; - } else if !self.ignore_cumulative && s.is_rolling_window() { + } else if !self.ignore_cumulative + && (s.is_rolling_window() || s.measure_type() == "runningTotal") + { self.has_multi_stage = true; } else { for filter_node in s.measure_filters() { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_node_transformers/set_schema.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_node_transformers/set_schema.rs index d7e580edcac19..a8a63d4f21b55 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_node_transformers/set_schema.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_node_transformers/set_schema.rs @@ -2,7 +2,8 @@ use crate::plan::schema::Schema; use crate::planner::sql_evaluator::sql_nodes::final_measure::FinalMeasureSqlNode; use crate::planner::sql_evaluator::sql_nodes::{ AutoPrefixSqlNode, EvaluateSqlNode, MeasureFilterSqlNode, MultiStageRankNode, - MultiStageWindowNode, RenderReferencesSqlNode, RootSqlNode, SqlNode, TimeShiftSqlNode, + MultiStageWindowNode, RenderReferencesSqlNode, RollingWindowNode, RootSqlNode, SqlNode, + TimeShiftSqlNode, }; use std::rc::Rc; @@ -55,6 +56,13 @@ pub fn set_schema_impl(sql_node: Rc, schema: Rc) -> Rc() + { + let input = set_schema_impl(rolling_window.input().clone(), schema.clone()); + RollingWindowNode::new(input) } else if let Some(render_references) = sql_node .clone() .as_any() @@ -81,6 +89,6 @@ pub fn set_schema_impl(sql_node: Rc, schema: Rc) -> Rc ev.evaluate_sql(args), - MemberSymbolType::Measure(ev) => { - let res = if ev.is_splitted_source() { - //FIXME hack for working with - //measures like rolling window - if !args.is_empty() { - match &args[0] { - MemberSqlArg::String(s) => s.clone(), - _ => "".to_string(), - } - } else { - "".to_string() - } - } else { - ev.evaluate_sql(args)? - }; - Ok(res) - } + MemberSymbolType::Measure(ev) => ev.evaluate_sql(args), MemberSymbolType::CubeTable(ev) => ev.evaluate_sql(args), MemberSymbolType::CubeName(ev) => ev.evaluate_sql(args), MemberSymbolType::SimpleSql(ev) => ev.evaluate_sql(args), diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/factory.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/factory.rs index 9d8729896e84d..3368a23781deb 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/factory.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/factory.rs @@ -1,7 +1,7 @@ use super::{ AutoPrefixSqlNode, EvaluateSqlNode, FinalMeasureSqlNode, MeasureFilterSqlNode, - MultiStageRankNode, MultiStageWindowNode, RenderReferencesSqlNode, RootSqlNode, SqlNode, - TimeShiftSqlNode, + MultiStageRankNode, MultiStageWindowNode, RenderReferencesSqlNode, RollingWindowNode, + RootSqlNode, SqlNode, TimeShiftSqlNode, }; use std::collections::HashMap; use std::rc::Rc; @@ -73,6 +73,24 @@ impl SqlNodesFactory { references_processor } + pub fn rolling_window_node_processor(&self) -> Rc { + let evaluate_sql_processor = EvaluateSqlNode::new(); + let auto_prefix_processor = AutoPrefixSqlNode::new(evaluate_sql_processor.clone()); + let measure_filter_processor = MeasureFilterSqlNode::new(auto_prefix_processor.clone()); + let final_measure_processor = FinalMeasureSqlNode::new(measure_filter_processor.clone()); + + let rolling_window_processor = RollingWindowNode::new(final_measure_processor.clone()); + + let root_processor = RootSqlNode::new( + self.dimension_processor(auto_prefix_processor.clone()), + rolling_window_processor.clone(), + auto_prefix_processor.clone(), + evaluate_sql_processor.clone(), + ); + let references_processor = RenderReferencesSqlNode::new(root_processor); + references_processor + } + fn dimension_processor(&self, input: Rc) -> Rc { if let Some(time_shifts) = &self.time_shifts { TimeShiftSqlNode::new(time_shifts.clone(), input) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/final_measure.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/final_measure.rs index 2fd786a986f9e..c5469d5206242 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/final_measure.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/final_measure.rs @@ -40,7 +40,11 @@ impl SqlNode for FinalMeasureSqlNode { if ev.is_calculated() { input } else { - let measure_type = ev.measure_type(); + let measure_type = if ev.measure_type() == "runningTotal" { + "sum" + } else { + &ev.measure_type() + }; format!("{}({})", measure_type, input) } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/mod.rs index 9cb6190581a9d..f316c5d8facb3 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/mod.rs @@ -6,6 +6,7 @@ pub mod measure_filter; pub mod multi_stage_rank; pub mod multi_stage_window; pub mod render_references; +pub mod rolling_window; pub mod root_processor; pub mod sql_node; pub mod time_shift; @@ -18,6 +19,7 @@ pub use measure_filter::MeasureFilterSqlNode; pub use multi_stage_rank::MultiStageRankNode; pub use multi_stage_window::MultiStageWindowNode; pub use render_references::RenderReferencesSqlNode; +pub use rolling_window::RollingWindowNode; pub use root_processor::RootSqlNode; pub use sql_node::SqlNode; pub use time_shift::TimeShiftSqlNode; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/rolling_window.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/rolling_window.rs new file mode 100644 index 0000000000000..f97ac113e2fc4 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/sql_nodes/rolling_window.rs @@ -0,0 +1,76 @@ +use super::SqlNode; +use crate::cube_bridge::memeber_sql::MemberSqlArg; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_evaluator::{EvaluationNode, MemberSymbolType, SqlEvaluatorVisitor}; +use cubenativeutils::CubeError; +use std::any::Any; +use std::rc::Rc; + +pub struct RollingWindowNode { + input: Rc, +} + +impl RollingWindowNode { + pub fn new(input: Rc) -> Rc { + Rc::new(Self { input }) + } + + pub fn input(&self) -> &Rc { + &self.input + } +} + +impl SqlNode for RollingWindowNode { + fn to_sql( + &self, + visitor: &mut SqlEvaluatorVisitor, + node: &Rc, + query_tools: Rc, + node_processor: Rc, + ) -> Result { + let res = match node.symbol() { + MemberSymbolType::Measure(m) => { + if m.is_cumulative() && m.is_splitted_source() { + let args = visitor.evaluate_deps(node, node_processor.clone())?; + //FIXME hack for working with + //measures like rolling window + let input = if !args.is_empty() { + match &args[0] { + MemberSqlArg::String(s) => s.clone(), + _ => "".to_string(), + } + } else { + "".to_string() + }; + let aggregate_function = if m.measure_type() == "sum" + || m.measure_type() == "count" + || m.measure_type() == "runningTotal" + { + "sum" + } else { + m.measure_type() + }; + + format!("{}({})", aggregate_function, input) + } else { + self.input + .to_sql(visitor, node, query_tools.clone(), node_processor.clone())? + } + } + _ => { + return Err(CubeError::internal(format!( + "Unexpected evaluation node type for RollingWindowNode" + ))); + } + }; + Ok(res) + } + + fn as_any(self: Rc) -> Rc { + self.clone() + } + + fn childs(&self) -> Vec> { + vec![self.input.clone()] + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs index be6d0b60603c6..261ec2c5f7e8b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs @@ -115,6 +115,14 @@ impl MeasureSymbol { self.rolling_window().is_some() } + pub fn is_running_total(&self) -> bool { + self.measure_type() == "runningTotal" + } + + pub fn is_cumulative(&self) -> bool { + self.is_rolling_window() || self.is_running_total() + } + pub fn measure_filters(&self) -> &Vec> { &self.measure_filters } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/filter.rs index f0d23de97e232..c7eaaf9a8ed11 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_templates/filter.rs @@ -3,6 +3,7 @@ use cubenativeutils::CubeError; use minijinja::context; use std::rc::Rc; +#[derive(Clone)] pub struct FilterTemplates { render: Rc, } @@ -103,6 +104,26 @@ impl FilterTemplates { ) } + pub fn add_interval(&self, date: String, interval: String) -> Result { + self.render.render_template( + &"expressions/add_interval", + context! { + date => date, + interval => interval + }, + ) + } + + pub fn sub_interval(&self, date: String, interval: String) -> Result { + self.render.render_template( + &"expressions/sub_interval", + context! { + date => date, + interval => interval + }, + ) + } + pub fn set_where(&self, column: String) -> Result { self.render.render_template( &"filters/set_where",