Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add magic loop check to speed up task execution #918

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from warnings import warn

from airflow.models.dag import DAG
from airflow.utils.dag_parsing_context import get_parsing_context
from airflow.utils.task_group import TaskGroup

from cosmos.airflow.graph import build_airflow_graph
Expand Down Expand Up @@ -191,6 +192,8 @@ class DbtToAirflowConverter:
or DockerOperator parameters
:param on_warning_callback: A callback function called on warnings with additional Context variables "test_names"
and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results".
:param parse_all_dags: When False, only the DAG that matches the provided dag_id will be parsed. When True, all DAGs
will be parsed. Default is False.
"""

def __init__(
Expand All @@ -203,9 +206,25 @@ def __init__(
task_group: TaskGroup | None = None,
operator_args: dict[str, Any] | None = None,
on_warning_callback: Callable[..., Any] | None = None,
parse_all_dags: bool = False,
*args: Any,
**kwargs: Any,
) -> None:
if not dag and task_group:
dag = task_group.dag

if not dag:
raise CosmosValueError("Either a dag or task_group must be provided.")

if not parse_all_dags:
current_dag_id = get_parsing_context().dag_id

if dag.dag_id != current_dag_id and current_dag_id is not None:
return

# if the dag is not what we're trying to parse, we can skip for performance reasons
# https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html#optimizing-dag-parsing-delays-during-execution

project_config.validate_project()

execution_config = execution_config or ExecutionConfig()
Expand Down
Loading