diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 3ac1a2de9e600..5d6e3fa86cee5 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -11,7 +11,7 @@ import cronParser from 'cron-parser'; import moment from 'moment-timezone'; import inflection from 'inflection'; -import { FROM_PARTITION_RANGE, inDbTimeZone, MAX_SOURCE_ROW_LIMIT, QueryAlias, getEnv } from '@cubejs-backend/shared'; +import { FROM_PARTITION_RANGE, inDbTimeZone, MAX_SOURCE_ROW_LIMIT, QueryAlias, getEnv, timeSeries as timeSeriesBase } from '@cubejs-backend/shared'; import { buildSqlAndParams as nativeBuildSqlAndParams, @@ -628,6 +628,11 @@ export class BaseQuery { return res; } + // FIXME helper for native generator, maybe should be moved entire to rust + generateTimeSeries(granularity, dateRange) { + return timeSeriesBase(granularity, dateRange); + } + get shouldReuseParams() { return false; } @@ -3232,7 +3237,16 @@ export class BaseQuery { '{% if offset is not none %}\nOFFSET {{ offset }}{% endif %}', group_by_exprs: '{{ group_by | map(attribute=\'index\') | join(\', \') }}', join: '{{ join_type }} JOIN {{ source }} ON {{ condition }}', - cte: '{{ alias }} AS ({{ query | indent(2, true) }})' + cte: '{{ alias }} AS ({{ query | indent(2, true) }})', + time_series_select: 'SELECT date_from::timestamp AS "date_from",\n' + + 'date_to::timestamp AS "date_to" \n' + + 'FROM(\n' + + ' VALUES ' + + '{% for time_item in seria %}' + + '(\'{{ time_item | join(\'\\\', \\\'\') }}\')' + + '{% if not loop.last %}, {% endif %}' + + '{% endfor %}' + + ') AS dates (date_from, date_to)' }, expressions: { column_reference: '{% if table_name %}{{ table_name }}.{% endif %}{{ name }}', @@ -3253,6 +3267,8 @@ export class BaseQuery { cube: 'CUBE({{ exprs_concat }})', negative: '-({{ expr }})', not: 'NOT ({{ expr }})', + add_interval: '{{ date }} + interval \'{{ interval }}\'', + sub_interval: '{{ date }} - interval \'{{ interval }}\'', true: 'TRUE', false: 'FALSE', like: '{{ expr }} {% if negated %}NOT {% endif %}LIKE {{ pattern }}', diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts index f9493faba34a0..a595e0523dc17 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts @@ -64,7 +64,7 @@ describe('SQL Generation', () => { offset: 'start' } }, - revenueRolling3day: { + revenueRollingThreeDay: { type: 'sum', sql: 'amount', rollingWindow: { @@ -769,7 +769,7 @@ describe('SQL Generation', () => { it('rolling month', async () => runQueryTest({ measures: [ - 'visitors.revenueRolling3day' + 'visitors.revenueRollingThreeDay' ], timeDimensions: [{ dimension: 'visitors.created_at', @@ -781,7 +781,7 @@ describe('SQL Generation', () => { }], timezone: 'America/Los_Angeles' }, [ - { visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling3day: '900' } + { visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling_three_day: '900' } ])); it('rolling count', async () => runQueryTest({ diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs index d5aef3c6d812d..5ade3438edc5c 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_tools.rs @@ -38,4 +38,9 @@ pub trait BaseTools { fn filter_group_function(&self) -> Result, CubeError>; fn timestamp_precision(&self) -> Result; fn in_db_time_zone(&self, date: String) -> Result; + fn generate_time_series( + &self, + granularity: String, + date_range: Vec, + ) -> Result>, CubeError>; } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs index 5b23f4e971e4e..4c1cdb21f2596 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/measure_definition.rs @@ -21,6 +21,13 @@ pub struct TimeShiftReference { pub time_dimension: String, } +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct RollingWindow { + pub trailing: Option, + pub leading: Option, + pub offset: Option, +} + #[derive(Serialize, Deserialize, Debug)] pub struct MeasureDefinitionStatic { #[serde(rename = "type")] @@ -37,6 +44,8 @@ pub struct MeasureDefinitionStatic { pub group_by_references: Option>, #[serde(rename = "timeShiftReferences")] pub time_shift_references: Option>, + #[serde(rename = "rollingWindow")] + pub rolling_window: Option, } #[nativebridge::native_bridge(MeasureDefinitionStatic)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs index f118aefe5d8e5..9776ad592a37a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs @@ -6,7 +6,7 @@ use cubenativeutils::CubeError; use std::fmt; use std::rc::Rc; -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum FilterGroupOperator { Or, And, @@ -18,13 +18,19 @@ pub struct FilterGroup { pub items: Vec, } +impl PartialEq for FilterGroup { + fn eq(&self, other: &Self) -> bool { + self.operator == other.operator && self.items == other.items + } +} + impl FilterGroup { pub fn new(operator: FilterGroupOperator, items: Vec) -> Self { Self { operator, items } } } -#[derive(Clone)] +#[derive(Clone, PartialEq)] pub enum FilterItem { Group(Rc), Item(Rc), diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs index 19e8283bc5e68..eb3a7347bd103 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs @@ -2,9 +2,113 @@ use super::{Schema, SingleAliasedSource}; use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::{BaseJoinCondition, BaseMember, VisitorContext}; use cubenativeutils::CubeError; +use lazy_static::lazy_static; use std::rc::Rc; +pub struct RollingWindowJoinCondition { + data_source: String, + time_series_source: String, + trailing_interval: Option, + leading_interval: Option, + offset: String, + time_dimension: Rc, +} + +impl RollingWindowJoinCondition { + pub fn new( + data_source: String, + time_series_source: String, + trailing_interval: Option, + leading_interval: Option, + offset: String, + time_dimension: Rc, + ) -> Self { + Self { + data_source, + time_series_source, + trailing_interval, + leading_interval, + offset, + time_dimension, + } + } + + pub fn to_sql( + &self, + templates: &PlanSqlTemplates, + context: Rc, + schema: Rc, + ) -> Result { + let mut conditions = vec![]; + let date_column_alias = + self.resolve_time_column_alias(templates, context.clone(), schema.clone())?; + + lazy_static! { + static ref UNBOUNDED: Option = Some("unbounded".to_string()); + } + + if self.trailing_interval != *UNBOUNDED { + let start_date = if self.offset == "start" { + templates.column_reference(&Some(self.time_series_source.clone()), "date_from")? + } else { + templates.column_reference(&Some(self.time_series_source.clone()), "date_to")? + }; + + let trailing_start = if let Some(trailing_interval) = &self.trailing_interval { + format!("{start_date} - interval '{trailing_interval}'") + } else { + start_date + }; + + let sign = if self.offset == "start" { ">=" } else { ">" }; + + conditions.push(format!("{date_column_alias} {sign} {trailing_start}")); + } + + if self.leading_interval != *UNBOUNDED { + let end_date = if self.offset == "end" { + templates.column_reference(&Some(self.time_series_source.clone()), "date_to")? + } else { + templates.column_reference(&Some(self.time_series_source.clone()), "date_from")? + }; + + let leading_end = if let Some(leading_interval) = &self.leading_interval { + format!("{end_date} + interval '{leading_interval}'") + } else { + end_date + }; + + let sign = if self.offset == "end" { "<=" } else { "<" }; + + conditions.push(format!("{date_column_alias} {sign} {leading_end}")); + } + let result = if conditions.is_empty() { + templates.always_true()? + } else { + conditions.join(" AND ") + }; + Ok(result) + } + + fn resolve_time_column_alias( + &self, + templates: &PlanSqlTemplates, + context: Rc, + schema: Rc, + ) -> Result { + let schema = schema.extract_source_schema(&self.data_source); + let source = Some(self.data_source.clone()); + if let Some(column) = + schema.find_column_for_member(&self.time_dimension.full_name(), &source) + { + templates.column_reference(&source, &column.alias.clone()) + } else { + self.time_dimension.to_sql(context.clone(), schema.clone()) + } + } +} + pub struct DimensionJoinCondition { left_source: String, right_source: String, @@ -92,6 +196,7 @@ impl DimensionJoinCondition { pub enum JoinCondition { DimensionJoinCondition(DimensionJoinCondition), BaseJoinCondition(Rc), + RollingWindowJoinCondition(RollingWindowJoinCondition), } impl JoinCondition { @@ -109,6 +214,24 @@ impl JoinCondition { )) } + pub fn new_rolling_join( + data_source: String, + time_series_source: String, + trailing_interval: Option, + leading_interval: Option, + offset: String, + time_dimension: Rc, + ) -> Self { + Self::RollingWindowJoinCondition(RollingWindowJoinCondition::new( + data_source, + time_series_source, + trailing_interval, + leading_interval, + offset, + time_dimension, + )) + } + pub fn new_base_join(base: Rc) -> Self { Self::BaseJoinCondition(base) } @@ -122,6 +245,9 @@ impl JoinCondition { match &self { JoinCondition::DimensionJoinCondition(cond) => cond.to_sql(templates, context, schema), JoinCondition::BaseJoinCondition(cond) => cond.to_sql(context, schema), + JoinCondition::RollingWindowJoinCondition(cond) => { + cond.to_sql(templates, context, schema) + } } } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs index 5d98909183278..7dc4a188a3fbf 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs @@ -8,6 +8,7 @@ pub mod order; pub mod query_plan; pub mod schema; pub mod select; +pub mod time_series; pub mod union; pub use builder::{JoinBuilder, SelectBuilder}; @@ -15,9 +16,10 @@ pub use cte::Cte; pub use expression::{Expr, MemberExpression}; pub use filter::{Filter, FilterGroup, FilterItem}; pub use from::{From, FromSource, SingleAliasedSource, SingleSource}; -pub use join::{Join, JoinCondition, JoinItem}; +pub use join::{Join, JoinCondition, JoinItem, RollingWindowJoinCondition}; pub use order::OrderBy; pub use query_plan::QueryPlan; pub use schema::{Schema, SchemaColumn, SchemaCube}; pub use select::{AliasedExpr, Select}; +pub use time_series::TimeSeries; pub use union::Union; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs b/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs index e8224fb74aebf..8fd8b9144f214 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs @@ -1,4 +1,4 @@ -use super::{Schema, Select, Union}; +use super::{Schema, Select, TimeSeries, Union}; use crate::planner::sql_templates::PlanSqlTemplates; use cubenativeutils::CubeError; use std::rc::Rc; @@ -6,6 +6,7 @@ use std::rc::Rc; pub enum QueryPlan { Select(Rc