Skip to content

Commit a3e7367

Browse files
test cases for setting dependencies correctly between task group and other if task group is empty #47947
1 parent 470b3cd commit a3e7367

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

airflow-core/tests/unit/utils/test_task_group.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1737,3 +1737,69 @@ def test_task_group_display_name_used_as_label():
17371737
}
17381738

17391739
assert extract_node_id(task_group_to_dict(dag.task_group), include_label=True) == expected_node_id
1740+
1741+
1742+
def test_task_group_dependencies_between_tasks_if_task_group_is_empty_1():
1743+
"""
1744+
Test that if a task group is empty, the dependencies between tasks are still maintained.
1745+
"""
1746+
with DAG(dag_id="test_dag", schedule=None, start_date=pendulum.parse("20200101")):
1747+
task1 = EmptyOperator(task_id="task1")
1748+
with TaskGroup("group1") as tg1:
1749+
pass
1750+
with TaskGroup("group2") as tg2:
1751+
task2 = EmptyOperator(task_id="task2")
1752+
task3 = EmptyOperator(task_id="task3")
1753+
task2 >> task3
1754+
1755+
task1 >> tg1 >> tg2
1756+
1757+
assert task1.downstream_task_ids == {"group2.task2"}
1758+
1759+
1760+
def test_task_group_dependencies_between_tasks_if_task_group_is_empty_2():
1761+
"""
1762+
Test that if a task group is empty, the dependencies between tasks are still maintained.
1763+
"""
1764+
with DAG(dag_id="test_dag", schedule=None, start_date=pendulum.parse("20200101")):
1765+
task1 = EmptyOperator(task_id="task1")
1766+
with TaskGroup("group1") as tg1:
1767+
pass
1768+
with TaskGroup("group2") as tg2:
1769+
pass
1770+
with TaskGroup("group3") as tg3:
1771+
pass
1772+
with TaskGroup("group4") as tg4:
1773+
pass
1774+
with TaskGroup("group5") as tg5:
1775+
task2 = EmptyOperator(task_id="task2")
1776+
task3 = EmptyOperator(task_id="task3")
1777+
task2 >> task3
1778+
task1 >> tg1 >> tg2 >> tg3 >> tg4 >> tg5
1779+
1780+
assert task1.downstream_task_ids == {"group5.task2"}
1781+
1782+
1783+
def test_task_group_dependencies_between_tasks_if_task_group_is_empty_3():
1784+
"""
1785+
Test that if a task group is empty, the dependencies between tasks are still maintained.
1786+
"""
1787+
with DAG(dag_id="test_dag", schedule=None, start_date=pendulum.parse("20200101")):
1788+
task1 = EmptyOperator(task_id="task1")
1789+
with TaskGroup("group1") as tg1:
1790+
pass
1791+
with TaskGroup("group2") as tg2:
1792+
pass
1793+
task2 = EmptyOperator(task_id="task2")
1794+
with TaskGroup("group3") as tg3:
1795+
pass
1796+
with TaskGroup("group4") as tg4:
1797+
pass
1798+
with TaskGroup("group5") as tg5:
1799+
task3 = EmptyOperator(task_id="task3")
1800+
task4 = EmptyOperator(task_id="task4")
1801+
task3 >> task4
1802+
task1 >> tg1 >> tg2 >> task2 >> tg3 >> tg4 >> tg5
1803+
1804+
assert task1.downstream_task_ids == {"task2"}
1805+
assert task2.downstream_task_ids == {"group5.task3"}

task-sdk/src/airflow/sdk/definitions/taskgroup.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,11 @@ def _set_relatives(
344344
task_or_task_list = [task_or_task_list]
345345

346346
# Helper function to find leaves from a task list or task group
347-
def find_leaves(group_or_task) -> list:
347+
def find_leaves(group_or_task) -> list[Any]:
348348
while group_or_task:
349-
if list(group_or_task.get_leaves()):
350-
return list(group_or_task.get_leaves())
349+
group_or_task_leaves = list(group_or_task.get_leaves())
350+
if group_or_task_leaves:
351+
return group_or_task_leaves
351352
if group_or_task.upstream_task_ids:
352353
upstream_task_ids_list = list(group_or_task.upstream_task_ids)
353354
return [self.dag.get_task(task_id) for task_id in upstream_task_ids_list]

0 commit comments

Comments
 (0)