Skip to content

Commit

Permalink
Fix reusing config accross TaskGroups/DAGs (#664)
Browse files Browse the repository at this point in the history
If execution_config was reused, Cosmos 1.2.2 would raise:

```
astronomer-cosmos/dags/basic_cosmos_task_group.py
Traceback (most recent call last):
  File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venv-38/lib/python3.8/site-packages/airflow/models/dagbag.py", line 343, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 848, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/dags/basic_cosmos_task_group.py", line 74, in <module>
    basic_cosmos_task_group()
  File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/venv-38/lib/python3.8/site-packages/airflow/models/dag.py", line 3817, in factory
    f(**f_kwargs)
  File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/dags/basic_cosmos_task_group.py", line 54, in basic_cosmos_task_group
    orders = DbtTaskGroup(
  File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/airflow/task_group.py", line 26, in __init__
    DbtToAirflowConverter.__init__(self, *args, **specific_kwargs(**kwargs))
  File "/Users/tati/Code/cosmos-clean/astronomer-cosmos/cosmos/converter.py", line 113, in __init__
    raise CosmosValueError(
cosmos.exceptions.CosmosValueError: ProjectConfig.dbt_project_path is mutually exclusive with RenderConfig.dbt_project_path and ExecutionConfig.dbt_project_path.If using RenderConfig.dbt_project_path or ExecutionConfig.dbt_project_path, ProjectConfig.dbt_project_path should be None
```

This has been raised by an Astro customer and our field engineer, who
tried to run: https://github.com/astronomer/cosmos-demo
  • Loading branch information
tatiana committed Nov 9, 2023
1 parent d063b5e commit 9001c98
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
5 changes: 5 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import copy
import inspect
from typing import Any, Callable

Expand Down Expand Up @@ -118,6 +119,10 @@ def __init__(
# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

Expand Down
26 changes: 22 additions & 4 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
An example DAG that uses Cosmos to render a dbt project as a TaskGroup.
"""
import os

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -23,6 +24,8 @@
),
)

shared_execution_config = ExecutionConfig()


@dag(
schedule_interval="@daily",
Expand All @@ -35,19 +38,34 @@ def basic_cosmos_task_group() -> None:
"""
pre_dbt = EmptyOperator(task_id="pre_dbt")

jaffle_shop = DbtTaskGroup(
group_id="test_123",
customers = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_customers.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

orders = DbtTaskGroup(
group_id="orders",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

post_dbt = EmptyOperator(task_id="post_dbt")

pre_dbt >> jaffle_shop >> post_dbt
pre_dbt >> customers >> post_dbt
pre_dbt >> orders >> post_dbt


basic_cosmos_task_group()

0 comments on commit 9001c98

Please sign in to comment.