Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Sep 19, 2024
2 parents 1aa59b0 + c5e8544 commit 1c25ab0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 14 deletions.
20 changes: 13 additions & 7 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ jobs:
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-cov
- name: Upload coverage to Github
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
include-hidden-files: true

Run-Integration-Tests:
needs: Authorize
Expand Down Expand Up @@ -177,10 +178,11 @@ jobs:
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
include-hidden-files: true

Run-Integration-Tests-Expensive:
needs: Authorize
Expand Down Expand Up @@ -248,10 +250,11 @@ jobs:
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: coverage-integration-expensive-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
include-hidden-files: true

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
Expand Down Expand Up @@ -315,10 +318,11 @@ jobs:
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: coverage-integration-sqlite-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
include-hidden-files: true

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
Expand Down Expand Up @@ -391,10 +395,11 @@ jobs:
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: coverage-integration-dbt-1-5-4-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
include-hidden-files: true

env:
AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/
Expand Down Expand Up @@ -534,10 +539,11 @@ jobs:
POSTGRES_PORT: 5432

- name: Upload coverage to Github
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: coverage-integration-kubernetes-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
include-hidden-files: true

Code-Coverage:
if: github.event.action != 'labeled'
Expand All @@ -559,7 +565,7 @@ jobs:
run: |
pip3 install coverage
- name: Download all coverage artifacts
uses: actions/download-artifact@v2
uses: actions/download-artifact@v4
with:
path: ./coverage
- name: Combine coverage
Expand Down
7 changes: 3 additions & 4 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@ def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None
module = importlib.import_module(module_name)
Operator = getattr(module, class_name)

task_kwargs = {}
if task.owner != "":
task_owner = task.owner
else:
task_owner = dag.owner
task_kwargs["owner"] = task.owner

airflow_task = Operator(
task_id=task.id,
dag=dag,
task_group=task_group,
owner=task_owner,
**task_kwargs,
**({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}),
**task.arguments,
)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [
"importlib-metadata; python_version < '3.8'",
"Jinja2>=3.0.0",
"msgpack",
"packaging",
"packaging>=22.0",
"pydantic>=1.10.0",
"typing-extensions; python_version < '3.8'",
"virtualenv",
Expand Down Expand Up @@ -88,7 +88,7 @@ docs = [
"apache-airflow-providers-cncf-kubernetes>=5.1.1",
]
tests = [
"packaging",
"packaging>=22.0",
"pytest>=6.0",
"pytest-split",
"pytest-dotenv",
Expand Down
50 changes: 49 additions & 1 deletion tests/airflow/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
from airflow import __version__ as airflow_version
from airflow.models import DAG
from airflow.models.abstractoperator import DEFAULT_OWNER
from airflow.utils.task_group import TaskGroup
from packaging import version

Expand Down Expand Up @@ -130,8 +131,9 @@ def test_build_airflow_graph_with_after_each():
task_seed_parent_seed = dag.tasks[0]
task_parent_run = dag.tasks[1]

assert task_seed_parent_seed.owner == ""
assert task_seed_parent_seed.owner == DEFAULT_OWNER
assert task_parent_run.owner == "parent_node"
assert {d for d in dag.owner.split(", ")} == {DEFAULT_OWNER, "parent_node"}


@pytest.mark.parametrize(
Expand Down Expand Up @@ -604,3 +606,49 @@ def test_airflow_kwargs_generation():
result = airflow_kwargs(**task_args)

assert "dag" in result


@pytest.mark.parametrize(
"dbt_extra_config,expected_owner",
[
({}, DEFAULT_OWNER),
({"meta": {}}, DEFAULT_OWNER),
({"meta": {"owner": ""}}, DEFAULT_OWNER),
({"meta": {"owner": "dbt-owner"}}, "dbt-owner"),
],
)
def test_owner(dbt_extra_config, expected_owner):
with DAG("test-task-group-after-each", start_date=datetime(2022, 1, 1)) as dag:
node = DbtNode(
unique_id=f"{DbtResourceType.MODEL.value}.my_folder.my_model",
resource_type=DbtResourceType.MODEL,
file_path=SAMPLE_PROJ_PATH / "gen2/models/parent.sql",
tags=["has_child"],
config={"materialized": "view", **dbt_extra_config},
depends_on=[],
)

output: TaskGroup = generate_task_or_group(
dag=dag,
task_group=None,
node=node,
execution_mode=ExecutionMode.LOCAL,
test_indirect_selection=TestIndirectSelection.EAGER,
task_args={
"project_dir": SAMPLE_PROJ_PATH,
"profile_config": ProfileConfig(
profile_name="default",
target_name="default",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="fake_conn",
profile_args={"schema": "public"},
),
),
},
test_behavior=TestBehavior.AFTER_EACH,
on_warning_callback=None,
source_rendering_behavior=SOURCE_RENDERING_BEHAVIOR,
)

assert len(output.leaves) == 1
assert output.leaves[0].owner == expected_owner

0 comments on commit 1c25ab0

Please sign in to comment.