Skip to content

Commit

Permalink
feat(mql): Support single type formula queries (#5376)
Browse files Browse the repository at this point in the history
Support single type formula queries using the -If suffix. This embeds all the
filters for a formula parameter into the SELECT clause, by amending the
aggregate with `-If` and collapsing all the filters into the new expression.
  • Loading branch information
evanh authored Jan 15, 2024
1 parent 1e7ca98 commit c32980a
Show file tree
Hide file tree
Showing 4 changed files with 1,092 additions and 43 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sentry-redis-tools==0.1.7
sentry-relay==0.8.39
sentry-sdk==1.28.0
simplejson==3.17.6
snuba-sdk==2.0.19
snuba-sdk==2.0.20
structlog==22.3.0
structlog-sentry==2.0.0
sql-metadata==2.6.0
Expand Down
252 changes: 211 additions & 41 deletions snuba/query/mql/parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from __future__ import annotations

import logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from dataclasses import dataclass, replace
from typing import Any, Callable, Dict, Optional, Sequence, Tuple, Union

import sentry_sdk
from parsimonious.nodes import Node, NodeVisitor
Expand Down Expand Up @@ -49,18 +49,27 @@

# The parser returns a bunch of different types, so create a single aggregate type to
# capture everything.
MQLSTUFF = Dict[str, Union[str, List[SelectedExpression], List[Expression]]]
MQLSTUFF = Dict[str, Union[str, list[SelectedExpression], list[Expression]]]
logger = logging.getLogger("snuba.mql.parser")


@dataclass
class InitialParseResult:
expression: SelectedExpression | None = None
groupby: List[SelectedExpression] | None = None
conditions: List[Expression] | None = None
formula: str | None = None
parameters: list[InitialParseResult] | None = None
groupby: list[SelectedExpression] | None = None
conditions: list[Expression] | None = None
mri: str | None = None
public_name: str | None = None
metric_id: int | None = None


ARITHMETIC_OPERATORS_MAPPING = {
"+": "plus",
"-": "minus",
"*": "multiply",
"/": "divide",
}


class MQLVisitor(NodeVisitor): # type: ignore
Expand All @@ -76,12 +85,106 @@ def visit_expression(
Any,
],
) -> InitialParseResult:
# zero_or_more_others is used for formulas, which aren't supported yet
args, zero_or_more_others = children
return args
term, zero_or_more_others = children
if zero_or_more_others:
_, term_operator, _, coefficient, *_ = zero_or_more_others[0]
return self._visit_formula(term_operator, term, coefficient)
return term

def visit_expr_op(self, node: Node, children: Sequence[Any]) -> Any:
raise InvalidQueryException("Arithmetic function not supported yet")
return ARITHMETIC_OPERATORS_MAPPING[node.text]

def visit_term_op(self, node: Node, children: Sequence[Any]) -> Any:
return ARITHMETIC_OPERATORS_MAPPING[node.text]

def _build_timeseries_formula_param(
self, param: InitialParseResult
) -> SelectedExpression:
"""
Timeseries inside a formula need to have three things done to them:
1. Add the -If suffix to their aggregate function
2. Put all the filters for the timeseries inside the aggregate expression
3. Add a metric_id condition to the conditions in the aggregate
Given an input parse result from `sum(mri){x:y}`, this should output
an expression like `sumIf(value, x = y AND metric_id = mri)`.
"""
assert param.expression is not None
exp = param.expression.expression
assert isinstance(exp, (FunctionCall, CurriedFunctionCall))

conditions = param.conditions or []
metric_id_condition = binary_condition(
ConditionFunctions.EQ,
Column(None, None, "metric_id"),
Literal(None, param.mri or param.public_name),
)
conditions.append(metric_id_condition)
value_column = exp.parameters[0]
if isinstance(exp, FunctionCall):
return SelectedExpression(
None,
FunctionCall(
None,
f"{exp.function_name}If",
parameters=(
value_column,
combine_and_conditions(conditions),
),
),
)
else:
return SelectedExpression(
None,
CurriedFunctionCall(
None,
FunctionCall(
exp.internal_function.alias,
f"{exp.internal_function.function_name}If",
exp.internal_function.parameters,
),
(
value_column,
combine_and_conditions(conditions),
),
),
)

def _visit_formula(
self,
term_operator: str,
term: InitialParseResult,
coefficient: InitialParseResult,
) -> InitialParseResult:
# TODO: If the formula has filters/group by, where do those appear?

# If the parameters of the query are timeseries, extract the expressions from the result
if isinstance(term, InitialParseResult) and term.expression is not None:
term = replace(term, expression=self._build_timeseries_formula_param(term))
if (
isinstance(coefficient, InitialParseResult)
and coefficient.expression is not None
):
coefficient = replace(
coefficient,
expression=self._build_timeseries_formula_param(coefficient),
)

if (
isinstance(term, InitialParseResult)
and isinstance(coefficient, InitialParseResult)
and term.groupby != coefficient.groupby
):
raise InvalidQueryException(
"All terms in a formula must have the same groupby"
)

return InitialParseResult(
expression=None,
formula=term_operator,
parameters=[term, coefficient],
groupby=term.groupby,
)

def visit_term(
self,
Expand All @@ -90,11 +193,10 @@ def visit_term(
) -> InitialParseResult:
term, zero_or_more_others = children
if zero_or_more_others:
raise InvalidQueryException("Arithmetic function not supported yet")
return term
_, term_operator, _, coefficient, *_ = zero_or_more_others[0]
return self._visit_formula(term_operator, term, coefficient)

def visit_term_op(self, node: Node, children: Sequence[Any]) -> str:
raise InvalidQueryException("Arithmetic function not supported yet")
return term

def visit_coefficient(
self,
Expand All @@ -120,10 +222,37 @@ def visit_filter(
if packed_filters:
assert isinstance(packed_filters, list)
_, _, filter_expr, *_ = packed_filters[0]
if target.conditions is not None:
target.conditions = target.conditions + [filter_expr]
if target.formula is not None:

def pushdown_filter(param: InitialParseResult) -> InitialParseResult:
if param.formula is not None:
parameters = param.parameters or []
for p in parameters:
pushdown_filter(p)
elif param.expression is not None:
exp = param.expression.expression
assert isinstance(exp, (FunctionCall, CurriedFunctionCall))
exp = replace(
exp,
parameters=(
exp.parameters[0],
binary_condition("and", filter_expr, exp.parameters[1]),
),
)
param.expression = replace(param.expression, expression=exp)
else:
raise InvalidQueryException("Could not parse formula")

return param

if target.parameters is not None:
for param in target.parameters:
pushdown_filter(param)
else:
target.conditions = [filter_expr]
if target.conditions is not None:
target.conditions = target.conditions + [filter_expr]
else:
target.conditions = [filter_expr]

if packed_groupbys:
assert isinstance(packed_groupbys, list)
Expand Down Expand Up @@ -158,7 +287,7 @@ def visit_filter_term(self, node: Node, children: Sequence[Any]) -> Any:
def visit_filter_factor(
self,
node: Node,
children: Tuple[Sequence[Union[str, Sequence[str]]], Any],
children: Tuple[Sequence[Union[str, Sequence[str]]] | FunctionCall, Any],
) -> FunctionCall:
factor, *_ = children
if isinstance(factor, FunctionCall):
Expand Down Expand Up @@ -484,33 +613,74 @@ def parse_mql_query_body(
"""
exp_tree = MQL_GRAMMAR.parse(body)
parsed: InitialParseResult = MQLVisitor().visit(exp_tree)
if not parsed.expression:
raise ParsingException("No expression specified in MQL query")
if not parsed.expression and not parsed.formula:
raise ParsingException(
"No aggregate/expression or formula specified in MQL query"
)

selected_columns = [parsed.expression]
if parsed.groupby:
selected_columns.extend(parsed.groupby)
groupby = [g.expression for g in parsed.groupby] if parsed.groupby else None
if parsed.formula:

id_value = parsed.metric_id or parsed.mri or parsed.public_name
metric_id_condition = binary_condition(
ConditionFunctions.EQ,
Column(None, None, "metric_id"),
Literal(None, id_value),
)
if parsed.conditions:
conditions = combine_and_conditions(
[metric_id_condition, *parsed.conditions]
def extract_expression(param: InitialParseResult | Any) -> Expression:
if not isinstance(param, InitialParseResult):
return Literal(None, param)
elif param.expression is not None:
return param.expression.expression
elif param.formula:
parameters = param.parameters or []
return FunctionCall(
None,
param.formula,
tuple(extract_expression(p) for p in parameters),
)
else:
raise InvalidQueryException("Could not parse formula")

parameters = parsed.parameters or []
selected_columns = [
SelectedExpression(
name=AGGREGATE_ALIAS,
expression=FunctionCall(
alias=AGGREGATE_ALIAS,
function_name=parsed.formula,
parameters=tuple(extract_expression(p) for p in parameters),
),
)
]
if parsed.groupby:
selected_columns.extend(parsed.groupby)
groupby = [g.expression for g in parsed.groupby] if parsed.groupby else None
query = LogicalQuery(
from_clause=None,
selected_columns=selected_columns,
groupby=groupby,
)
if parsed.expression:
selected_columns = [parsed.expression]
if parsed.groupby:
selected_columns.extend(parsed.groupby)
groupby = [g.expression for g in parsed.groupby] if parsed.groupby else None

metric_value = parsed.mri or parsed.public_name
conditions: list[Expression] = [
binary_condition(
ConditionFunctions.EQ,
Column(None, None, "metric_id"),
Literal(None, metric_value),
)
]
if parsed.conditions:
conditions.extend(parsed.conditions)

final_conditions = (
combine_and_conditions(conditions) if conditions else None
)
else:
conditions = metric_id_condition

query = LogicalQuery(
from_clause=None,
selected_columns=selected_columns,
condition=conditions,
groupby=groupby,
)
query = LogicalQuery(
from_clause=None,
selected_columns=selected_columns,
condition=final_conditions,
groupby=groupby,
)
except Exception as e:
raise e

Expand Down
Loading

0 comments on commit c32980a

Please sign in to comment.