Skip to content

Commit

Permalink
Add retries for SQL generation using lower optimization levels (#1569)
Browse files Browse the repository at this point in the history
In case there are issues with the SQL optimizers, this PR adds a
mechanism to retry generation using a lower optimization level to
potentially reduce user-facing errors.
  • Loading branch information
plypaul authored Dec 14, 2024
1 parent 2b819ee commit bc416f4
Showing 1 changed file with 78 additions and 17 deletions.
95 changes: 78 additions & 17 deletions metricflow/plan_conversion/dataflow_to_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,26 +187,85 @@ def convert_to_sql_query_plan(
sql_query_plan_id: Optional[DagId] = None,
) -> ConvertToSqlPlanResult:
"""Create an SQL query plan that represents the computation up to the given dataflow plan node."""
# TODO: Make this a more generally accessible attribute instead of checking against the
# BigQuery-ness of the engine
use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY

option_set = SqlGenerationOptionSet.options_for_level(
optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by
# In case there are bugs that raise exceptions at higher optimization levels, retry generation at a lower
# optimization level. Generally skip O0 (unless requested) as that level does not include the column pruner.
# Without that, the generated SQL can be enormous.
optimization_levels_to_attempt: Sequence[SqlQueryOptimizationLevel] = sorted(
# Union handles case if O0 was specifically requested.
set(
possible_level
for possible_level in SqlQueryOptimizationLevel
if SqlQueryOptimizationLevel.O1 <= possible_level <= optimization_level
).union({optimization_level}),
reverse=True,
)
retried_at_lower_optimization_level = False
logger.debug(
LazyFormat(
"Attempting to convert to a SQL plan with optimization levels:",
optimization_levels_to_attempt=optimization_levels_to_attempt,
)
)
for attempted_optimization_level in optimization_levels_to_attempt:
try:
# TODO: Make this a more generally accessible attribute instead of checking against the
# BigQuery-ness of the engine
use_column_alias_in_group_by = sql_engine_type is SqlEngine.BIGQUERY

logger.info(LazyFormat("Using option set:", option_set=option_set))
option_set = SqlGenerationOptionSet.options_for_level(
attempted_optimization_level, use_column_alias_in_group_by=use_column_alias_in_group_by
)

nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset()
if option_set.allow_cte:
nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node)
logger.info(
LazyFormat(
"Using option set for SQL generation:",
optimization_level=optimization_level,
option_set=option_set,
)
)

return self.convert_using_specifics(
dataflow_plan_node=dataflow_plan_node,
sql_query_plan_id=sql_query_plan_id,
nodes_to_convert_to_cte=nodes_to_convert_to_cte,
optimizers=option_set.optimizers,
)
nodes_to_convert_to_cte: FrozenSet[DataflowPlanNode] = frozenset()
if option_set.allow_cte:
nodes_to_convert_to_cte = self._get_nodes_to_convert_to_cte(dataflow_plan_node)

result = self.convert_using_specifics(
dataflow_plan_node=dataflow_plan_node,
sql_query_plan_id=sql_query_plan_id,
nodes_to_convert_to_cte=nodes_to_convert_to_cte,
optimizers=option_set.optimizers,
)

if retried_at_lower_optimization_level:
logger.error(
LazyFormat(
"Successfully generated the SQL plan using an optimization level lower than the"
" requested one. A lower one was used due to an exception using the requested one. Please "
"investigate the cause for the exception.",
requested_optimization_level=optimization_level,
successful_optimization_level=attempted_optimization_level,
)
)

return result

except Exception as e:
if optimization_level is optimization_levels_to_attempt[-1]:
logger.error(
"Exhausted attempts to generate the SQL without exceptions."
" Propagating the most recent exception."
)
raise e
retried_at_lower_optimization_level = True
logger.exception(
LazyFormat(
"Got an exception while generating the SQL plan. This indicates a bug that should be"
" investigated, but retrying at a different optimization level to potentially avoid a"
" user-facing error.",
attempted_optimization_level=optimization_level,
)
)

raise RuntimeError("Should have returned a result or raised an exception in the loop.")

def convert_using_specifics(
self,
Expand All @@ -216,7 +275,9 @@ def convert_using_specifics(
optimizers: Sequence[SqlQueryPlanOptimizer],
) -> ConvertToSqlPlanResult:
"""Helper method to convert using specific options. Main use case are tests."""
logger.debug(LazyFormat("Converting to SQL", nodes_to_convert_to_cte=nodes_to_convert_to_cte))
logger.debug(
LazyFormat("Converting to SQL", nodes_to_convert_to_cte=[node.node_id for node in nodes_to_convert_to_cte])
)

if len(nodes_to_convert_to_cte) == 0:
# Avoid `DataflowNodeToSqlCteVisitor` code path for better isolation during rollout.
Expand Down

0 comments on commit bc416f4

Please sign in to comment.