Skip to content

Commit

Permalink
in work
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Nov 22, 2024
1 parent f62b46b commit 698a5f8
Show file tree
Hide file tree
Showing 22 changed files with 765 additions and 145 deletions.
1 change: 1 addition & 0 deletions packages/cubejs-backend-shared/src/time.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ export const timeSeries = (granularity: string, dateRange: QueryDateRange, optio
// moment.range works with strings
const range = moment.range(<any>dateRange[0], <any>dateRange[1]);

console.log("!!!! timeSeries:", TIME_SERIES[granularity](range, options.timestampPrecision));
return TIME_SERIES[granularity](range, options.timestampPrecision);
};

Expand Down
43 changes: 41 additions & 2 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import cronParser from 'cron-parser';

import moment from 'moment-timezone';
import inflection from 'inflection';
import { FROM_PARTITION_RANGE, inDbTimeZone, MAX_SOURCE_ROW_LIMIT, QueryAlias, getEnv } from '@cubejs-backend/shared';
import { FROM_PARTITION_RANGE, inDbTimeZone, MAX_SOURCE_ROW_LIMIT, QueryAlias, getEnv, timeSeries } from '@cubejs-backend/shared';

import {
buildSqlAndParams as nativeBuildSqlAndParams,
Expand Down Expand Up @@ -576,6 +576,29 @@ export class BaseQuery {
return false;
}

buildSqlAndParamsTest(exportAnnotatedSql) {
if (!this.options.preAggregationQuery && !this.options.disableExternalPreAggregations && this.externalQueryClass) {
if (this.externalPreAggregationQuery()) { // TODO performance
return this.externalQuery().buildSqlAndParams(exportAnnotatedSql);
}
}
const js_res = this.compilers.compiler.withQuery(
this,
() => this.cacheValue(
['buildSqlAndParams', exportAnnotatedSql],
() => this.paramAllocator.buildSqlAndParams(
this.buildParamAnnotatedSql(),
exportAnnotatedSql,
this.shouldReuseParams
),
{ cache: this.queryCache }
)
);
const rust = this.buildSqlAndParamsRust(exportAnnotatedSql);
console.log('js result: ', js_res[0]);
console.log('rust result: ', rust[0]);
return js_res;
}
/**
* Returns an array of SQL query strings for the query.
* @param {boolean} [exportAnnotatedSql] - returns annotated sql with not rendered params if true
Expand Down Expand Up @@ -628,6 +651,10 @@ export class BaseQuery {
return res;
}

//FIXME helper for native generator, maybe should be moved entire to rust
generateTimeSeries(granularity, dateRange) {
return timeSeries(granularity, dateRange);
}
get shouldReuseParams() {
return false;
}
Expand Down Expand Up @@ -1381,6 +1408,7 @@ export class BaseQuery {
) || undefined
);
const baseQueryAlias = this.cubeAlias('base');
console.log("!!! date join contdition: ", dateJoinCondition);
const dateJoinConditionSql =
dateJoinCondition.map(
([d, f]) => f(
Expand All @@ -1392,6 +1420,8 @@ export class BaseQuery {
)
).join(' AND ');

console.log("!!! date join contdition sql: ", dateJoinConditionSql);

return this.overTimeSeriesSelect(
cumulativeMeasures,
dateSeriesSql,
Expand Down Expand Up @@ -3231,7 +3261,16 @@ export class BaseQuery {
'{% if offset is not none %}\nOFFSET {{ offset }}{% endif %}',
group_by_exprs: '{{ group_by | map(attribute=\'index\') | join(\', \') }}',
join: '{{ join_type }} JOIN {{ source }} ON {{ condition }}',
cte: '{{ alias }} AS ({{ query | indent(2, true) }})'
cte: '{{ alias }} AS ({{ query | indent(2, true) }})',
time_seria_select: 'SELECT date_from::timestamp AS "date_from",\n' +
'date_to::timestamp AS "date_to" \n' +
'FROM(\n' +
' VALUES ' +
'{% for time_item in seria %}' +
'(\'{{ time_item | join(\'\\\', \\\'\') }}\')' +
'{% if not loop.last %}, {% endif %}' +
'{% endfor %}' +
') AS dates (date_from, date_to)'
},
expressions: {
column_reference: '{% if table_name %}{{ table_name }}.{% endif %}{{ name }}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ describe('SQL Generation', () => {
});
`);

it('simple join 1', async () => {
it('simple join', async () => {
await compiler.compile();

console.log(joinGraph.buildJoin(['visitor_checkins', 'visitors']));
Expand Down Expand Up @@ -704,7 +704,7 @@ describe('SQL Generation', () => {
});
});

it('rolling', async () => runQueryTest({
it('rolling 1', async () => runQueryTest({
measures: [
'visitors.revenueRolling'
],
Expand All @@ -730,7 +730,7 @@ describe('SQL Generation', () => {
{ visitors__created_at_day: '2017-01-10T00:00:00.000Z', visitors__revenue_rolling: null }
]));

it('rolling multiplied', async () => runQueryTest({
it('rolling multiplied 1', async () => runQueryTest({
measures: [
'visitors.revenueRolling',
'visitor_checkins.visitor_checkins_count'
Expand Down Expand Up @@ -916,6 +916,7 @@ describe('SQL Generation', () => {
{ visitors__created_at_day: '2017-01-10T00:00:00.000Z', visitors__running_revenue_per_count: '300' }
]));


it('hll rolling (BigQuery)', async () => {
await compiler.compile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,9 @@ pub trait BaseTools {
fn filter_group_function(&self) -> Result<Rc<dyn FilterGroup>, CubeError>;
fn timestamp_precision(&self) -> Result<u32, CubeError>;
fn in_db_time_zone(&self, date: String) -> Result<String, CubeError>;
fn generate_time_series(
&self,
granularity: String,
date_range: Vec<String>,
) -> Result<Vec<Vec<String>>, CubeError>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ pub struct TimeShiftReference {
pub time_dimension: String,
}

#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct RollingWindow {
trailing: Option<String>,
leading: Option<String>,
offset: Option<String>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct MeasureDefinitionStatic {
#[serde(rename = "type")]
Expand All @@ -37,6 +44,8 @@ pub struct MeasureDefinitionStatic {
pub group_by_references: Option<Vec<String>>,
#[serde(rename = "timeShiftReferences")]
pub time_shift_references: Option<Vec<TimeShiftReference>>,
#[serde(rename = "rollingWindow")]
pub rolling_window: Option<RollingWindow>,
}

#[nativebridge::native_bridge(MeasureDefinitionStatic)]
Expand Down
72 changes: 71 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/plan/join.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,80 @@
use super::{Schema, SingleAliasedSource};
use super::{time_seria, Schema, SingleAliasedSource};
use crate::planner::sql_templates::PlanSqlTemplates;
use crate::planner::{BaseJoinCondition, BaseMember, VisitorContext};
use cubenativeutils::CubeError;

use std::rc::Rc;

pub struct RollingWindowJoinCondition {
tailing_interval: Option<String>,
leading_interval: Option<String>,
offset: String,
is_from_start_to_end: bool,
time_dimension: Vec<Rc<BaseMember>>,
}

impl RollingWindowJoinCondition {
pub fn new(
trailing_interval: Option<String>,
leading_interval: Option<String>,
offset: String,
is_from_start_to_end: bool,
dimensions: Vec<Rc<BaseMember>>,
) -> Self {
Self {
tailing_interval,
leading_interval,
offset,
is_from_start_to_end,
time_dimension,
}
}

pub fn to_sql(
&self,
templates: &PlanSqlTemplates,
context: Rc<VisitorContext>,
schema: Rc<Schema>,
) -> Result<String, CubeError> {
let result = if self.dimensions.is_empty() {
format!("1 = 1")
} else {
let conditions = vec![];
self.dimensions
.iter()
.map(|dim| -> Result<String, CubeError> {
if let Some(trailing_interval) = self.trailing_interval {
if tailing_interval == "unbounded" {
let seria_column = "date_from",
}
}


})
.collect::<Result<Vec<_>, _>>()?
.join(" AND ")
};
Ok(result)
}

fn resolve_member_alias(
&self,
templates: &PlanSqlTemplates,
context: Rc<VisitorContext>,
source: &String,
dimension: &Rc<dyn BaseMember>,
schema: Rc<Schema>,
) -> Result<String, CubeError> {
let schema = schema.extract_source_schema(source);
let source = Some(source.clone());
if let Some(column) = schema.find_column_for_member(&dimension.full_name(), &source) {
templates.column_reference(&source, &column.alias.clone())
} else {
dimension.to_sql(context.clone(), schema.clone())
}
}
}

pub struct DimensionJoinCondition {
left_source: String,
right_source: String,
Expand Down
3 changes: 3 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod order;
pub mod query_plan;
pub mod schema;
pub mod select;
pub mod time_seria;
pub mod union;

pub use builder::{JoinBuilder, SelectBuilder};
Expand All @@ -20,4 +21,6 @@ pub use order::OrderBy;
pub use query_plan::QueryPlan;
pub use schema::{Schema, SchemaColumn, SchemaCube};
pub use select::{AliasedExpr, Select};
pub use time_seria::TimeSeria;
pub use union::Union;

5 changes: 4 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/plan/query_plan.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use super::{Schema, Select, Union};
use super::{Schema, Select, TimeSeria, Union};
use crate::planner::sql_templates::PlanSqlTemplates;
use cubenativeutils::CubeError;
use std::rc::Rc;

pub enum QueryPlan {
Select(Rc<Select>),
Union(Rc<Union>),
TimeSeria(Rc<TimeSeria>),
}

impl QueryPlan {
pub fn make_schema(&self, self_alias: Option<String>) -> Schema {
match self {
QueryPlan::Select(select) => select.make_schema(self_alias),
QueryPlan::Union(union) => union.make_schema(self_alias),
QueryPlan::TimeSeria(seria) => seria.make_schema(self_alias),
}
}
pub fn to_sql(&self, templates: &PlanSqlTemplates) -> Result<String, CubeError> {
match self {
QueryPlan::Select(s) => s.to_sql(templates),
QueryPlan::Union(u) => u.to_sql(templates),
QueryPlan::TimeSeria(seria) => seria.to_sql(templates),
}
}
}
30 changes: 30 additions & 0 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/time_seria.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use super::{Schema, SchemaColumn, Select, Union};
use crate::planner::sql_templates::PlanSqlTemplates;
use cubenativeutils::CubeError;
use std::rc::Rc;

pub struct TimeSeria {
pub time_dimension_name: String,
pub from_date: Option<String>,
pub to_date: Option<String>,
pub seria: Vec<Vec<String>>,
}

impl TimeSeria {
pub fn make_schema(&self, self_alias: Option<String>) -> Schema {
let column = SchemaColumn::new(
self_alias,
format!("from_date"),
self.time_dimension_name.clone(),
);
Schema::new(vec![column], vec![])
}

pub fn to_sql(&self, templates: &PlanSqlTemplates) -> Result<String, CubeError> {
templates.time_seria_select(
self.from_date.clone(),
self.to_date.clone(),
self.seria.clone(),
)
}
}
12 changes: 11 additions & 1 deletion rust/cubesqlplanner/cubesqlplanner/src/planner/base_measure.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use super::query_tools::QueryTools;
use super::sql_evaluator::{EvaluationNode, MemberSymbol, MemberSymbolType};
use super::{evaluate_with_context, BaseMember, VisitorContext};
use crate::cube_bridge::measure_definition::{MeasureDefinition, TimeShiftReference};
use crate::cube_bridge::measure_definition::{
MeasureDefinition, RollingWindow, TimeShiftReference,
};
use crate::plan::Schema;
use cubenativeutils::CubeError;
use lazy_static::lazy_static;
Expand Down Expand Up @@ -195,6 +197,14 @@ impl BaseMeasure {
self.definition.static_data().multi_stage.unwrap_or(false)
}

pub fn rolling_window(&self) -> &Option<RollingWindow> {
&self.definition.static_data().rolling_window
}

pub fn is_rolling_window(&self) -> bool {
self.rolling_window().is_some()
}

//FIXME dublicate with symbol
pub fn measure_type(&self) -> &String {
&self.definition.static_data().measure_type
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::planner::base_measure::MeasureTimeShift;
use crate::planner::planners::multi_stage::MultiStageTimeShift;
use crate::planner::BaseDimension;
use itertools::Itertools;
use std::cmp::PartialEq;
Expand Down Expand Up @@ -43,7 +43,7 @@ impl MultiStageAppliedState {
.collect_vec();
}

pub fn add_time_shifts(&mut self, time_shifts: Vec<MeasureTimeShift>) {
pub fn add_time_shifts(&mut self, time_shifts: Vec<MultiStageTimeShift>) {
for ts in time_shifts.into_iter() {
self.time_shifts
.insert(ts.time_dimension.clone(), ts.interval.clone());
Expand Down
Loading

0 comments on commit 698a5f8

Please sign in to comment.