Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Nov 13, 2024
1 parent b566c6e commit f62b46b
Show file tree
Hide file tree
Showing 17 changed files with 417 additions and 153 deletions.
2 changes: 0 additions & 2 deletions rust/cubesqlplanner/cubesqlplanner/src/plan/select.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use itertools::Itertools;

use super::{Cte, Expr, Filter, From, OrderBy, Schema, SchemaColumn};
use crate::planner::sql_templates::PlanSqlTemplates;
use crate::planner::sql_templates::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod collectors;
pub mod compiler;
mod dependecy;
pub mod evaluation_node;
pub mod sql_node_transformers;
pub mod sql_nodes;
pub mod sql_visitor;
pub mod symbols;
Expand All @@ -16,4 +17,4 @@ pub use symbols::{
DimensionSymbol, DimensionSymbolFactory, MeasureSymbol, MeasureSymbolFactory, MemberSymbol,
MemberSymbolType, SimpleSqlSymbol, SimpleSqlSymbolFactory, SymbolFactory,
};
pub use visitor::{EvaluatorVisitor, TraversalVisitor};
pub use visitor::TraversalVisitor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod set_schema;

pub use set_schema::set_schema;
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use crate::plan::schema::Schema;
use crate::planner::sql_evaluator::sql_nodes::final_measure::FinalMeasureSqlNode;
use crate::planner::sql_evaluator::sql_nodes::{
AutoPrefixSqlNode, EvaluateSqlNode, MeasureFilterSqlNode, MultiStageRankNode,
MultiStageWindowNode, RenderReferencesSqlNode, RootSqlNode, SqlNode, TimeShiftSqlNode,
};
use std::rc::Rc;

pub fn set_schema(node_processors: Rc<dyn SqlNode>, schema: Rc<Schema>) -> Rc<dyn SqlNode> {
set_schema_impl(node_processors, schema)
}

pub fn set_schema_impl(sql_node: Rc<dyn SqlNode>, schema: Rc<Schema>) -> Rc<dyn SqlNode> {
if let Some(auto_prefix) = sql_node
.clone()
.as_any()
.downcast_ref::<AutoPrefixSqlNode>()
{
let input = set_schema_impl(auto_prefix.input().clone(), schema.clone());
AutoPrefixSqlNode::new_with_schema(input, schema)
} else if let Some(_) = sql_node.clone().as_any().downcast_ref::<EvaluateSqlNode>() {
sql_node
} else if let Some(final_measure) = sql_node
.clone()
.as_any()
.downcast_ref::<FinalMeasureSqlNode>()
{
let input = set_schema_impl(final_measure.input().clone(), schema.clone());
FinalMeasureSqlNode::new(input)
} else if let Some(measure_filter) = sql_node
.clone()
.as_any()
.downcast_ref::<MeasureFilterSqlNode>()
{
let input = set_schema_impl(measure_filter.input().clone(), schema.clone());
MeasureFilterSqlNode::new(input)
} else if let Some(multi_stage_rank) = sql_node
.clone()
.as_any()
.downcast_ref::<MultiStageRankNode>()
{
let else_processor =
set_schema_impl(multi_stage_rank.else_processor().clone(), schema.clone());
MultiStageRankNode::new(else_processor, multi_stage_rank.partition().clone())
} else if let Some(multi_stage_window) = sql_node
.clone()
.as_any()
.downcast_ref::<MultiStageWindowNode>()
{
let input = set_schema_impl(multi_stage_window.input().clone(), schema.clone());
let else_processor =
set_schema_impl(multi_stage_window.else_processor().clone(), schema.clone());
MultiStageWindowNode::new(
input,
else_processor,
multi_stage_window.partition().clone(),
)
} else if let Some(render_references) = sql_node
.clone()
.as_any()
.downcast_ref::<RenderReferencesSqlNode>()
{
let input = set_schema_impl(render_references.input().clone(), schema.clone());
RenderReferencesSqlNode::new_with_schema(input, schema)
} else if let Some(root_node) = sql_node.clone().as_any().downcast_ref::<RootSqlNode>() {
let dimension_processor =
set_schema_impl(root_node.dimension_processor().clone(), schema.clone());
let measure_processor =
set_schema_impl(root_node.measure_processor().clone(), schema.clone());
let cube_name_processor =
set_schema_impl(root_node.cube_name_processor().clone(), schema.clone());
let default_processor =
set_schema_impl(root_node.default_processor().clone(), schema.clone());
RootSqlNode::new(
dimension_processor,
measure_processor,
cube_name_processor,
default_processor,
)
} else if let Some(time_shift) = sql_node.clone().as_any().downcast_ref::<TimeShiftSqlNode>() {
let input = set_schema_impl(time_shift.input().clone(), schema.clone());
TimeShiftSqlNode::new(time_shift.shifts().clone(), input)
} else {
unreachable!("Not all nodes are implemented");
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,35 @@
use super::SqlNode;
use crate::plan::Schema;
use crate::planner::query_tools::QueryTools;
use crate::planner::sql_evaluator::SqlEvaluatorVisitor;
use crate::planner::sql_evaluator::{EvaluationNode, MemberSymbol, MemberSymbolType};
use cubenativeutils::CubeError;
use std::any::Any;
use std::rc::Rc;

pub struct AutoPrefixSqlNode {
input: Rc<dyn SqlNode>,
schema: Rc<Schema>,
}

impl AutoPrefixSqlNode {
pub fn new(input: Rc<dyn SqlNode>) -> Rc<Self> {
Rc::new(Self { input })
Rc::new(Self {
input,
schema: Rc::new(Schema::empty()),
})
}

pub fn new_with_schema(input: Rc<dyn SqlNode>, schema: Rc<Schema>) -> Rc<Self> {
Rc::new(Self { input, schema })
}

pub fn input(&self) -> &Rc<dyn SqlNode> {
&self.input
}

pub fn schema(&self) -> &Rc<Schema> {
&self.schema
}
}

Expand All @@ -21,24 +39,33 @@ impl SqlNode for AutoPrefixSqlNode {
visitor: &mut SqlEvaluatorVisitor,
node: &Rc<EvaluationNode>,
query_tools: Rc<QueryTools>,
node_processor: Rc<dyn SqlNode>,
) -> Result<String, CubeError> {
let source_schema = visitor.source_schema().clone();
let input = self.input.to_sql(visitor, node, query_tools.clone())?;
let input =
self.input
.to_sql(visitor, node, query_tools.clone(), node_processor.clone())?;
let res = match node.symbol() {
MemberSymbolType::Dimension(ev) => {
let cube_alias = source_schema.resolve_cube_alias(&ev.cube_name());
let cube_alias = self.schema.resolve_cube_alias(&ev.cube_name());
query_tools.auto_prefix_with_cube_name(&cube_alias, &input)
}
MemberSymbolType::Measure(ev) => {
let cube_alias = source_schema.resolve_cube_alias(&ev.cube_name());
let cube_alias = self.schema.resolve_cube_alias(&ev.cube_name());
query_tools.auto_prefix_with_cube_name(&cube_alias, &input)
}
MemberSymbolType::CubeName(_) => {
let cube_alias = source_schema.resolve_cube_alias(&input);
let cube_alias = self.schema.resolve_cube_alias(&input);
query_tools.escape_column_name(&cube_alias)
}
_ => input,
};
Ok(res)
}
fn as_any(self: Rc<Self>) -> Rc<dyn Any> {
self.clone()
}

fn childs(&self) -> Vec<Rc<dyn SqlNode>> {
vec![self.input.clone()]
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::SqlNode;
use crate::planner::query_tools::QueryTools;
use crate::planner::sql_evaluator::visitor::EvaluatorVisitor;
use crate::planner::sql_evaluator::SqlEvaluatorVisitor;
use crate::planner::sql_evaluator::{EvaluationNode, MemberSymbolType};
use cubenativeutils::CubeError;
use std::any::Any;
use std::rc::Rc;

pub struct EvaluateSqlNode {}
Expand All @@ -20,8 +20,9 @@ impl SqlNode for EvaluateSqlNode {
visitor: &mut SqlEvaluatorVisitor,
node: &Rc<EvaluationNode>,
_query_tools: Rc<QueryTools>,
node_processor: Rc<dyn SqlNode>,
) -> Result<String, CubeError> {
let args = visitor.evaluate_deps(node)?;
let args = visitor.evaluate_deps(node, node_processor.clone())?;
match node.symbol() {
MemberSymbolType::Dimension(ev) => ev.evaluate_sql(args),
MemberSymbolType::Measure(ev) => ev.evaluate_sql(args),
Expand All @@ -30,4 +31,12 @@ impl SqlNode for EvaluateSqlNode {
MemberSymbolType::SimpleSql(ev) => ev.evaluate_sql(args),
}
}

fn as_any(self: Rc<Self>) -> Rc<dyn Any> {
self.clone()
}

fn childs(&self) -> Vec<Rc<dyn SqlNode>> {
vec![]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::planner::query_tools::QueryTools;
use crate::planner::sql_evaluator::SqlEvaluatorVisitor;
use crate::planner::sql_evaluator::{EvaluationNode, MemberSymbolType};
use cubenativeutils::CubeError;
use std::any::Any;
use std::rc::Rc;

pub struct FinalMeasureSqlNode {
Expand All @@ -13,6 +14,10 @@ impl FinalMeasureSqlNode {
pub fn new(input: Rc<dyn SqlNode>) -> Rc<Self> {
Rc::new(Self { input })
}

pub fn input(&self) -> &Rc<dyn SqlNode> {
&self.input
}
}

impl SqlNode for FinalMeasureSqlNode {
Expand All @@ -21,10 +26,16 @@ impl SqlNode for FinalMeasureSqlNode {
visitor: &mut SqlEvaluatorVisitor,
node: &Rc<EvaluationNode>,
query_tools: Rc<QueryTools>,
node_processor: Rc<dyn SqlNode>,
) -> Result<String, CubeError> {
let res = match node.symbol() {
MemberSymbolType::Measure(ev) => {
let input = self.input.to_sql(visitor, node, query_tools.clone())?;
let input = self.input.to_sql(
visitor,
node,
query_tools.clone(),
node_processor.clone(),
)?;

if ev.is_calculated() {
input
Expand All @@ -41,4 +52,12 @@ impl SqlNode for FinalMeasureSqlNode {
};
Ok(res)
}

fn as_any(self: Rc<Self>) -> Rc<dyn Any> {
self.clone()
}

fn childs(&self) -> Vec<Rc<dyn SqlNode>> {
vec![self.input.clone()]
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::SqlNode;
use crate::planner::query_tools::QueryTools;
use crate::planner::sql_evaluator::visitor::EvaluatorVisitor;
use crate::planner::sql_evaluator::SqlEvaluatorVisitor;
use crate::planner::sql_evaluator::{EvaluationNode, MemberSymbolType};
use cubenativeutils::CubeError;
use std::any::Any;
use std::rc::Rc;

pub struct MeasureFilterSqlNode {
Expand All @@ -14,6 +14,10 @@ impl MeasureFilterSqlNode {
pub fn new(input: Rc<dyn SqlNode>) -> Rc<Self> {
Rc::new(Self { input })
}

pub fn input(&self) -> &Rc<dyn SqlNode> {
&self.input
}
}

impl SqlNode for MeasureFilterSqlNode {
Expand All @@ -22,16 +26,22 @@ impl SqlNode for MeasureFilterSqlNode {
visitor: &mut SqlEvaluatorVisitor,
node: &Rc<EvaluationNode>,
query_tools: Rc<QueryTools>,
node_processor: Rc<dyn SqlNode>,
) -> Result<String, CubeError> {
let input = self.input.to_sql(visitor, node, query_tools.clone())?;
let input =
self.input
.to_sql(visitor, node, query_tools.clone(), node_processor.clone())?;
let res = match node.symbol() {
MemberSymbolType::Measure(ev) => {
let measure_filters = ev.measure_filters();
if !measure_filters.is_empty() {
let filters = measure_filters
.iter()
.map(|filter| -> Result<String, CubeError> {
Ok(format!("({})", visitor.apply(filter)?))
Ok(format!(
"({})",
visitor.apply(filter, node_processor.clone())?
))
})
.collect::<Result<Vec<_>, _>>()?
.join(" AND ");
Expand All @@ -53,4 +63,12 @@ impl SqlNode for MeasureFilterSqlNode {
};
Ok(res)
}

fn as_any(self: Rc<Self>) -> Rc<dyn Any> {
self.clone()
}

fn childs(&self) -> Vec<Rc<dyn SqlNode>> {
vec![self.input.clone()]
}
}
Loading

0 comments on commit f62b46b

Please sign in to comment.