Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sql planner rolling window #8982

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cronParser from 'cron-parser';

import moment from 'moment-timezone';
import inflection from 'inflection';
import { FROM_PARTITION_RANGE, inDbTimeZone, MAX_SOURCE_ROW_LIMIT, QueryAlias, getEnv } from '@cubejs-backend/shared';
import { FROM_PARTITION_RANGE, inDbTimeZone, MAX_SOURCE_ROW_LIMIT, QueryAlias, getEnv, timeSeries as timeSeriesBase } from '@cubejs-backend/shared';

import {
buildSqlAndParams as nativeBuildSqlAndParams,
Expand Down Expand Up @@ -628,6 +628,11 @@ export class BaseQuery {
return res;
}

// FIXME helper for native generator, maybe should be moved entire to rust
generateTimeSeries(granularity, dateRange) {
return timeSeriesBase(granularity, dateRange);
}

get shouldReuseParams() {
return false;
}
Expand Down Expand Up @@ -3232,7 +3237,16 @@ export class BaseQuery {
'{% if offset is not none %}\nOFFSET {{ offset }}{% endif %}',
group_by_exprs: '{{ group_by | map(attribute=\'index\') | join(\', \') }}',
join: '{{ join_type }} JOIN {{ source }} ON {{ condition }}',
cte: '{{ alias }} AS ({{ query | indent(2, true) }})'
cte: '{{ alias }} AS ({{ query | indent(2, true) }})',
time_series_select: 'SELECT date_from::timestamp AS "date_from",\n' +
'date_to::timestamp AS "date_to" \n' +
'FROM(\n' +
' VALUES ' +
'{% for time_item in seria %}' +
'(\'{{ time_item | join(\'\\\', \\\'\') }}\')' +
'{% if not loop.last %}, {% endif %}' +
'{% endfor %}' +
') AS dates (date_from, date_to)'
},
expressions: {
column_reference: '{% if table_name %}{{ table_name }}.{% endif %}{{ name }}',
Expand All @@ -3253,6 +3267,8 @@ export class BaseQuery {
cube: 'CUBE({{ exprs_concat }})',
negative: '-({{ expr }})',
not: 'NOT ({{ expr }})',
add_interval: '{{ date }} + interval \'{{ interval }}\'',
sub_interval: '{{ date }} - interval \'{{ interval }}\'',
true: 'TRUE',
false: 'FALSE',
like: '{{ expr }} {% if negated %}NOT {% endif %}LIKE {{ pattern }}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ describe('SQL Generation', () => {
offset: 'start'
}
},
revenueRolling3day: {
revenueRollingThreeDay: {
type: 'sum',
sql: 'amount',
rollingWindow: {
Expand Down Expand Up @@ -769,7 +769,7 @@ describe('SQL Generation', () => {

it('rolling month', async () => runQueryTest({
measures: [
'visitors.revenueRolling3day'
'visitors.revenueRollingThreeDay'
],
timeDimensions: [{
dimension: 'visitors.created_at',
Expand All @@ -781,7 +781,7 @@ describe('SQL Generation', () => {
}],
timezone: 'America/Los_Angeles'
}, [
{ visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling3day: '900' }
{ visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling_three_day: '900' }
]));

it('rolling count', async () => runQueryTest({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ pub trait BaseTools {
fn filter_group_function(&self) -> Result<Rc<dyn FilterGroup>, CubeError>;
fn timestamp_precision(&self) -> Result<u32, CubeError>;
fn in_db_time_zone(&self, date: String) -> Result<String, CubeError>;
fn generate_time_series(
&self,
granularity: String,
date_range: Vec<String>,
) -> Result<Vec<Vec<String>>, CubeError>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ pub struct TimeShiftReference {
pub time_dimension: String,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct RollingWindow {
pub trailing: Option<String>,
pub leading: Option<String>,
pub offset: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct MeasureDefinitionStatic {
#[serde(rename = "type")]
Expand All @@ -37,6 +44,8 @@ pub struct MeasureDefinitionStatic {
pub group_by_references: Option<Vec<String>>,
#[serde(rename = "timeShiftReferences")]
pub time_shift_references: Option<Vec<TimeShiftReference>>,
#[serde(rename = "rollingWindow")]
pub rolling_window: Option<RollingWindow>,
}

#[nativebridge::native_bridge(MeasureDefinitionStatic)]
Expand Down
10 changes: 8 additions & 2 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use cubenativeutils::CubeError;
use std::fmt;
use std::rc::Rc;

#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub enum FilterGroupOperator {
Or,
And,
Expand All @@ -18,13 +18,19 @@ pub struct FilterGroup {
pub items: Vec<FilterItem>,
}

impl PartialEq for FilterGroup {
fn eq(&self, other: &Self) -> bool {
self.operator == other.operator && self.items == other.items
}
}

impl FilterGroup {
pub fn new(operator: FilterGroupOperator, items: Vec<FilterItem>) -> Self {
Self { operator, items }
}
}

#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub enum FilterItem {
Group(Rc<FilterGroup>),
Item(Rc<BaseFilter>),
Expand Down
126 changes: 126 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,113 @@ use super::{Schema, SingleAliasedSource};
use crate::planner::sql_templates::PlanSqlTemplates;
use crate::planner::{BaseJoinCondition, BaseMember, VisitorContext};
use cubenativeutils::CubeError;
use lazy_static::lazy_static;

use std::rc::Rc;

pub struct RollingWindowJoinCondition {
data_source: String,
time_series_source: String,
trailing_interval: Option<String>,
leading_interval: Option<String>,
offset: String,
time_dimension: Rc<dyn BaseMember>,
}

impl RollingWindowJoinCondition {
pub fn new(
data_source: String,
time_series_source: String,
trailing_interval: Option<String>,
leading_interval: Option<String>,
offset: String,
time_dimension: Rc<dyn BaseMember>,
) -> Self {
Self {
data_source,
time_series_source,
trailing_interval,
leading_interval,
offset,
time_dimension,
}
}

pub fn to_sql(
&self,
templates: &PlanSqlTemplates,
context: Rc<VisitorContext>,
schema: Rc<Schema>,
) -> Result<String, CubeError> {
let mut conditions = vec![];
let date_column_alias =
self.resolve_time_column_alias(templates, context.clone(), schema.clone())?;

lazy_static! {
static ref UNBOUNDED: Option<String> = Some("unbounded".to_string());
}

if self.trailing_interval != *UNBOUNDED {
let start_date = if self.offset == "start" {
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?
} else {
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?
};

let trailing_start = if let Some(trailing_interval) = &self.trailing_interval {
format!("{start_date} - interval '{trailing_interval}'")
} else {
start_date
};

let sign = if self.offset == "start" { ">=" } else { ">" };

conditions.push(format!("{date_column_alias} {sign} {trailing_start}"));
}

if self.leading_interval != *UNBOUNDED {
let end_date = if self.offset == "end" {
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?
} else {
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?
};

let leading_end = if let Some(leading_interval) = &self.leading_interval {
format!("{end_date} + interval '{leading_interval}'")
} else {
end_date
};

let sign = if self.offset == "end" { "<=" } else { "<" };

conditions.push(format!("{date_column_alias} {sign} {leading_end}"));
}
let result = if conditions.is_empty() {
templates.always_true()?
} else {
conditions.join(" AND ")
};
Ok(result)
}

fn resolve_time_column_alias(
&self,
templates: &PlanSqlTemplates,
context: Rc<VisitorContext>,
schema: Rc<Schema>,
) -> Result<String, CubeError> {
let schema = schema.extract_source_schema(&self.data_source);
let source = Some(self.data_source.clone());
if let Some(column) =
schema.find_column_for_member(&self.time_dimension.full_name(), &source)
{
templates.column_reference(&source, &column.alias.clone())
} else {
self.time_dimension.to_sql(context.clone(), schema.clone())
}
}
}

pub struct DimensionJoinCondition {
left_source: String,
right_source: String,
Expand Down Expand Up @@ -92,6 +196,7 @@ impl DimensionJoinCondition {
pub enum JoinCondition {
DimensionJoinCondition(DimensionJoinCondition),
BaseJoinCondition(Rc<dyn BaseJoinCondition>),
RollingWindowJoinCondition(RollingWindowJoinCondition),
}

impl JoinCondition {
Expand All @@ -109,6 +214,24 @@ impl JoinCondition {
))
}

pub fn new_rolling_join(
data_source: String,
time_series_source: String,
trailing_interval: Option<String>,
leading_interval: Option<String>,
offset: String,
time_dimension: Rc<dyn BaseMember>,
) -> Self {
Self::RollingWindowJoinCondition(RollingWindowJoinCondition::new(
data_source,
time_series_source,
trailing_interval,
leading_interval,
offset,
time_dimension,
))
}

pub fn new_base_join(base: Rc<dyn BaseJoinCondition>) -> Self {
Self::BaseJoinCondition(base)
}
Expand All @@ -122,6 +245,9 @@ impl JoinCondition {
match &self {
JoinCondition::DimensionJoinCondition(cond) => cond.to_sql(templates, context, schema),
JoinCondition::BaseJoinCondition(cond) => cond.to_sql(context, schema),
JoinCondition::RollingWindowJoinCondition(cond) => {
cond.to_sql(templates, context, schema)
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ pub mod order;
pub mod query_plan;
pub mod schema;
pub mod select;
pub mod time_series;
pub mod union;

pub use builder::{JoinBuilder, SelectBuilder};
pub use cte::Cte;
pub use expression::{Expr, MemberExpression};
pub use filter::{Filter, FilterGroup, FilterItem};
pub use from::{From, FromSource, SingleAliasedSource, SingleSource};
pub use join::{Join, JoinCondition, JoinItem};
pub use join::{Join, JoinCondition, JoinItem, RollingWindowJoinCondition};
pub use order::OrderBy;
pub use query_plan::QueryPlan;
pub use schema::{Schema, SchemaColumn, SchemaCube};
pub use select::{AliasedExpr, Select};
pub use time_series::TimeSeries;
pub use union::Union;
5 changes: 4 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use super::{Schema, Select, Union};
use super::{Schema, Select, TimeSeries, Union};
use crate::planner::sql_templates::PlanSqlTemplates;
use cubenativeutils::CubeError;
use std::rc::Rc;

pub enum QueryPlan {
Select(Rc<Select>),
Union(Rc<Union>),
TimeSeries(Rc<TimeSeries>),
}

impl QueryPlan {
pub fn make_schema(&self, self_alias: Option<String>) -> Schema {
match self {
QueryPlan::Select(select) => select.make_schema(self_alias),
QueryPlan::Union(union) => union.make_schema(self_alias),
QueryPlan::TimeSeries(series) => series.make_schema(self_alias),
}
}
pub fn to_sql(&self, templates: &PlanSqlTemplates) -> Result<String, CubeError> {
match self {
QueryPlan::Select(s) => s.to_sql(templates),
QueryPlan::Union(u) => u.to_sql(templates),
QueryPlan::TimeSeries(series) => series.to_sql(templates),
}
}
}
29 changes: 29 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use super::{Schema, SchemaColumn};
use crate::planner::sql_templates::PlanSqlTemplates;
use cubenativeutils::CubeError;

pub struct TimeSeries {
pub time_dimension_name: String,
pub from_date: Option<String>,
pub to_date: Option<String>,
pub seria: Vec<Vec<String>>,
}

impl TimeSeries {
pub fn make_schema(&self, self_alias: Option<String>) -> Schema {
let column = SchemaColumn::new(
self_alias,
format!("date_from"),
self.time_dimension_name.clone(),
);
Schema::new(vec![column], vec![])
}

pub fn to_sql(&self, templates: &PlanSqlTemplates) -> Result<String, CubeError> {
templates.time_series_select(
self.from_date.clone(),
self.to_date.clone(),
self.seria.clone(),
)
}
}
Loading
Loading