diff --git a/cosmos/converter.py b/cosmos/converter.py index f9511ab82..423461fd2 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -9,6 +9,7 @@ from warnings import warn from airflow.models.dag import DAG +from airflow.utils.dag_parsing_context import get_parsing_context from airflow.utils.task_group import TaskGroup from cosmos.airflow.graph import build_airflow_graph @@ -191,6 +192,8 @@ class DbtToAirflowConverter: or DockerOperator parameters :param on_warning_callback: A callback function called on warnings with additional Context variables "test_names" and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results". + :param parse_all_dags: When False, only the DAG that matches the provided dag_id will be parsed. When True, all DAGs + will be parsed. Default is False. """ def __init__( @@ -203,9 +206,25 @@ def __init__( task_group: TaskGroup | None = None, operator_args: dict[str, Any] | None = None, on_warning_callback: Callable[..., Any] | None = None, + parse_all_dags: bool = False, *args: Any, **kwargs: Any, ) -> None: + if not dag and task_group: + dag = task_group.dag + + if not dag: + raise CosmosValueError("Either a dag or task_group must be provided.") + + if not parse_all_dags: + current_dag_id = get_parsing_context().dag_id + + if dag.dag_id != current_dag_id and current_dag_id is not None: + return + + # if the dag is not what we're trying to parse, we can skip for performance reasons + # https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html#optimizing-dag-parsing-delays-during-execution + project_config.validate_project() execution_config = execution_config or ExecutionConfig()