Skip to content

Commit

Permalink
multi stage and running total pass tests
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Dec 3, 2024
1 parent 8503379 commit 4edaa86
Show file tree
Hide file tree
Showing 27 changed files with 711 additions and 160 deletions.
1 change: 0 additions & 1 deletion packages/cubejs-backend-shared/src/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ export const timeSeries = (granularity: string, dateRange: QueryDateRange, optio
// moment.range works with strings
const range = moment.range(<any>dateRange[0], <any>dateRange[1]);

console.log("!!!! timeSeries:", TIME_SERIES[granularity](range, options.timestampPrecision));
return TIME_SERIES[granularity](range, options.timestampPrecision);
};

Expand Down
4 changes: 2 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,6 @@ export class BaseQuery {
offset = offset || 'end';
return this.timeDimensions.map(
d => [d, (dateFrom, dateTo, dateField, dimensionDateFrom, dimensionDateTo, isFromStartToEnd) => {
console.log("!!!!!! IsFromStartToEnd: ", isFromStartToEnd, " --");
// dateFrom based window
const conditions = [];
if (trailingInterval !== 'unbounded') {
Expand Down Expand Up @@ -1420,7 +1419,6 @@ export class BaseQuery {
)
).join(' AND ');

console.log("!!! date join contdition sql: ", dateJoinConditionSql);

Check failure on line 1422 in packages/cubejs-schema-compiler/src/adapter/BaseQuery.js

View workflow job for this annotation

GitHub Actions / lint

More than 1 blank line not allowed
return this.overTimeSeriesSelect(
cumulativeMeasures,
Expand Down Expand Up @@ -3292,6 +3290,8 @@ export class BaseQuery {
cube: 'CUBE({{ exprs_concat }})',
negative: '-({{ expr }})',
not: 'NOT ({{ expr }})',
add_interval: '{{ date }} + interval \'{{ interval }}\'',
sub_interval: '{{ date }} - interval \'{{ interval }}\'',
true: 'TRUE',
false: 'FALSE',
like: '{{ expr }} {% if negated %}NOT {% endif %}LIKE {{ pattern }}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ describe('SQL Generation', () => {
offset: 'start'
}
},
revenueRolling3day: {
revenueRollingThreeDay: {
type: 'sum',
sql: 'amount',
rollingWindow: {
Expand Down Expand Up @@ -666,7 +666,7 @@ describe('SQL Generation', () => {
console.log(query.buildSqlAndParams());

// TODO ordering doesn't work for running total
return dbRunner.testQuery(query.buildSqlAndParams()).then(res => {
return dbRunner.testQuery(query.buildSqlAndParamsTest()).then(res => {
console.log(JSON.stringify(res));
expect(res).toEqual(
[{
Expand Down Expand Up @@ -704,7 +704,7 @@ describe('SQL Generation', () => {
});
});

it('rolling 1', async () => runQueryTest({
it('rolling', async () => runQueryTest({
measures: [
'visitors.revenueRolling'
],
Expand All @@ -730,7 +730,7 @@ describe('SQL Generation', () => {
{ visitors__created_at_day: '2017-01-10T00:00:00.000Z', visitors__revenue_rolling: null }
]));

it('rolling multiplied 1', async () => runQueryTest({
it('rolling multiplied', async () => runQueryTest({
measures: [
'visitors.revenueRolling',
'visitor_checkins.visitor_checkins_count'
Expand Down Expand Up @@ -769,7 +769,7 @@ describe('SQL Generation', () => {

it('rolling month', async () => runQueryTest({
measures: [
'visitors.revenueRolling3day'
'visitors.revenueRollingThreeDay'
],
timeDimensions: [{
dimension: 'visitors.created_at',
Expand All @@ -781,7 +781,7 @@ describe('SQL Generation', () => {
}],
timezone: 'America/Los_Angeles'
}, [
{ visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling3day: '900' }
{ visitors__created_at_week: '2017-01-09T00:00:00.000Z', visitors__revenue_rolling_three_day: '900' }
]));

it('rolling count', async () => runQueryTest({
Expand Down
10 changes: 8 additions & 2 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use cubenativeutils::CubeError;
use std::fmt;
use std::rc::Rc;

#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub enum FilterGroupOperator {
Or,
And,
Expand All @@ -18,13 +18,19 @@ pub struct FilterGroup {
pub items: Vec<FilterItem>,
}

impl PartialEq for FilterGroup {
fn eq(&self, other: &Self) -> bool {
self.operator == other.operator && self.items == other.items
}
}

impl FilterGroup {
pub fn new(operator: FilterGroupOperator, items: Vec<FilterItem>) -> Self {
Self { operator, items }
}
}

#[derive(Clone)]
#[derive(Clone, PartialEq)]
pub enum FilterItem {
Group(Rc<FilterGroup>),
Item(Rc<BaseFilter>),
Expand Down
93 changes: 32 additions & 61 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{time_series, Schema, SingleAliasedSource};
use crate::planner::sql_templates::PlanSqlTemplates;
use crate::planner::{BaseJoinCondition, BaseMember, VisitorContext};
use cubenativeutils::CubeError;
use lazy_static::lazy_static;

use std::rc::Rc;

Expand Down Expand Up @@ -33,85 +34,55 @@ impl RollingWindowJoinCondition {
}
}

/*
*
offset = offset || 'end';
return this.timeDimensions.map(
d => [d, (dateFrom, dateTo, dateField, dimensionDateFrom, dimensionDateTo, isFromStartToEnd) => {
// dateFrom based window
const conditions = [];
if (trailingInterval !== 'unbounded') {
const startDate = isFromStartToEnd || offset === 'start' ? dateFrom : dateTo;
const trailingStart = trailingInterval ? this.subtractInterval(startDate, trailingInterval) : startDate;
const sign = offset === 'start' ? '>=' : '>';
conditions.push(`${dateField} ${sign} ${trailingStart}`);
}
if (leadingInterval !== 'unbounded') {
const endDate = isFromStartToEnd || offset === 'end' ? dateTo : dateFrom;
const leadingEnd = leadingInterval ? this.addInterval(endDate, leadingInterval) : endDate;
const sign = offset === 'end' ? '<=' : '<';
conditions.push(`${dateField} ${sign} ${leadingEnd}`);
}
return conditions.length ? conditions.join(' AND ') : '1 = 1';
}]
);
*/
pub fn to_sql(
&self,
templates: &PlanSqlTemplates,
context: Rc<VisitorContext>,
schema: Rc<Schema>,
) -> Result<String, CubeError> {
let mut conditions = vec![];
/* let date_column_alias = if let Some(column) = schema.find_column_for_member(&self.time_dimension.full_name(), &None) {
templates.column_reference(&source, &column.alias.clone())
} else {
dimension.to_sql(context.clone(), schema.clone())
} */
let date_column_alias =
self.resolve_time_column_alias(templates, context.clone(), schema.clone())?;
if let Some(trailing_interval) = &self.trailing_interval {
if trailing_interval != "unbounded" {
let start_date = if self.offset == "start" {
templates
.column_reference(&Some(self.time_series_source.clone()), "date_from")?
} else {
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?
};

let trailing_start = if let Some(trailing_interval) = &self.trailing_interval {
format!("{start_date} - interval '{trailing_interval}'")
} else {
start_date
};
lazy_static! {
static ref UNBOUNDED: Option<String> = Some("unbounded".to_string());
}

let sign = if self.offset == "start" { ">=" } else { ">" };
if self.trailing_interval != *UNBOUNDED {
let start_date = if self.offset == "start" {
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?
} else {
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?
};

conditions.push(format!("{date_column_alias} {sign} {trailing_start}"));
}
let trailing_start = if let Some(trailing_interval) = &self.trailing_interval {
format!("{start_date} - interval '{trailing_interval}'")
} else {
start_date
};

let sign = if self.offset == "start" { ">=" } else { ">" };

conditions.push(format!("{date_column_alias} {sign} {trailing_start}"));
}

if let Some(leading_interval) = &self.trailing_interval {
if leading_interval != "unbounded" {
let end_date = if self.offset == "end" {
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?
} else {
templates
.column_reference(&Some(self.time_series_source.clone()), "date_from")?
};
if self.leading_interval != *UNBOUNDED {
let end_date = if self.offset == "end" {
templates.column_reference(&Some(self.time_series_source.clone()), "date_to")?
} else {
templates.column_reference(&Some(self.time_series_source.clone()), "date_from")?
};

let leading_end = if let Some(leading_interval) = &self.leading_interval {
format!("{end_date} + interval '{leading_interval}'")
} else {
end_date
};
let leading_end = if let Some(leading_interval) = &self.leading_interval {
format!("{end_date} + interval '{leading_interval}'")
} else {
end_date
};

let sign = if self.offset == "end" { "<=" } else { "<" };
let sign = if self.offset == "end" { "<=" } else { "<" };

conditions.push(format!("{date_column_alias} {sign} {leading_end}"));
}
conditions.push(format!("{date_column_alias} {sign} {leading_end}"));
}

let result = if conditions.is_empty() {
templates.always_true()?
} else {
Expand Down
7 changes: 3 additions & 4 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ pub struct TimeSeries {

impl TimeSeries {
pub fn make_schema(&self, self_alias: Option<String>) -> Schema {
/* let column = SchemaColumn::new(
let column = SchemaColumn::new(
self_alias,
format!("from_date"),
format!("date_from"),
self.time_dimension_name.clone(),
);
Schema::new(vec![column], vec![]) */
Schema::empty()
Schema::new(vec![column], vec![])
}

pub fn to_sql(&self, templates: &PlanSqlTemplates) -> Result<String, CubeError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,14 @@ impl BaseMeasure {
self.rolling_window().is_some()
}

pub fn is_running_total(&self) -> bool {
self.measure_type() == "runningTotal"
}

pub fn is_cumulative(&self) -> bool {
self.is_rolling_window() || self.is_running_total()
}

//FIXME dublicate with symbol
pub fn measure_type(&self) -> &String {
&self.definition.static_data().measure_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct BaseTimeDimension {
query_tools: Rc<QueryTools>,
granularity: Option<String>,
date_range: Option<Vec<String>>,
alias_suffix: String,
}

impl BaseMember for BaseTimeDimension {
Expand Down Expand Up @@ -55,12 +56,7 @@ impl BaseMember for BaseTimeDimension {
}

fn alias_suffix(&self) -> Option<String> {
let granularity = if let Some(granularity) = &self.granularity {
granularity
} else {
"day"
};
Some(granularity.to_string())
Some(self.alias_suffix.clone())
}
}

Expand All @@ -71,14 +67,30 @@ impl BaseTimeDimension {
granularity: Option<String>,
date_range: Option<Vec<String>>,
) -> Result<Rc<Self>, CubeError> {
let alias_suffix = if let Some(granularity) = &granularity {
granularity.clone()
} else {
"day".to_string()
};
Ok(Rc::new(Self {
dimension: BaseDimension::try_new_required(member_evaluator, query_tools.clone())?,
query_tools,
granularity,
date_range,
alias_suffix,
}))
}

pub fn change_granularity(&self, new_granularity: Option<String>) -> Rc<Self> {
Rc::new(Self {
dimension: self.dimension.clone(),
query_tools: self.query_tools.clone(),
granularity: new_granularity,
date_range: self.date_range.clone(),
alias_suffix: self.alias_suffix.clone(),
})
}

pub fn get_granularity(&self) -> Option<String> {
self.granularity.clone()
}
Expand Down
Loading

0 comments on commit 4edaa86

Please sign in to comment.