From 1224b54a04800514261e519161a726e4e23252a9 Mon Sep 17 00:00:00 2001 From: Julian LaNeve Date: Sat, 20 Apr 2024 16:04:57 -0400 Subject: [PATCH 1/3] add magic loop check to speed up task execution --- cosmos/converter.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/cosmos/converter.py b/cosmos/converter.py index f9511ab82..cd6eabf83 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -10,6 +10,7 @@ from airflow.models.dag import DAG from airflow.utils.task_group import TaskGroup +from airflow.utils.dag_parsing_context import get_parsing_context from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig @@ -191,6 +192,8 @@ class DbtToAirflowConverter: or DockerOperator parameters :param on_warning_callback: A callback function called on warnings with additional Context variables "test_names" and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results". + :param parse_all_dags: When False, only the DAG that matches the provided dag_id will be parsed. When True, all DAGs + will be parsed. Default is False. """ def __init__( @@ -203,9 +206,23 @@ def __init__( task_group: TaskGroup | None = None, operator_args: dict[str, Any] | None = None, on_warning_callback: Callable[..., Any] | None = None, + parse_all_dags: bool = False, *args: Any, **kwargs: Any, ) -> None: + if not dag and task_group: + dag = task_group.dag + + if not dag: + raise CosmosValueError("Either a dag or task_group must be provided.") + + if not parse_all_dags: + if dag.dag_id != get_parsing_context().dag_id: + return + + # if the dag is not what we're trying to parse, we can skip for performance reasons + # https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html#optimizing-dag-parsing-delays-during-execution + project_config.validate_project() execution_config = execution_config or ExecutionConfig() From 998c9ead6d36c126e1768d708d5cae9f99e04fd0 Mon Sep 17 00:00:00 2001 From: Julian LaNeve Date: Sat, 20 Apr 2024 16:06:40 -0400 Subject: [PATCH 2/3] check for none --- cosmos/converter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index cd6eabf83..e90e8e591 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -217,7 +217,9 @@ def __init__( raise CosmosValueError("Either a dag or task_group must be provided.") if not parse_all_dags: - if dag.dag_id != get_parsing_context().dag_id: + current_dag_id = get_parsing_context().dag_id + + if dag.dag_id != current_dag_id and current_dag_id is not None: return # if the dag is not what we're trying to parse, we can skip for performance reasons From 41ab9b62c42d43e32404317a0217f596e62fb780 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 20 Apr 2024 20:07:08 +0000 Subject: [PATCH 3/3] =?UTF-8?q?=F0=9F=8E=A8=20[pre-commit.ci]=20Auto=20for?= =?UTF-8?q?mat=20from=20pre-commit.com=20hooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cosmos/converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cosmos/converter.py b/cosmos/converter.py index e90e8e591..423461fd2 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -9,8 +9,8 @@ from warnings import warn from airflow.models.dag import DAG -from airflow.utils.task_group import TaskGroup from airflow.utils.dag_parsing_context import get_parsing_context +from airflow.utils.task_group import TaskGroup from cosmos.airflow.graph import build_airflow_graph from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig