Skip to content

Commit

Permalink
Fix task_instance_mutation_hook when importing airflow.models.dagrun (
Browse files Browse the repository at this point in the history
#15851)

If a dag imported `airflow.models.dagrun` it would cause task_instance_mutation_hook from the site local settings to not be picked up.
  • Loading branch information
junnplus authored May 15, 2021
1 parent 6b46af1 commit 3919ee6
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
3 changes: 2 additions & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.models.base import ID_LEN, Base
from airflow.models.taskinstance import TaskInstance as TI
from airflow.settings import task_instance_mutation_hook
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_states import SCHEDULEABLE_STATES
Expand Down Expand Up @@ -633,6 +632,8 @@ def verify_integrity(self, session: Session = None):
:param session: Sqlalchemy ORM Session
:type session: Session
"""
from airflow.settings import task_instance_mutation_hook

dag = self.get_dag()
tis = self.get_task_instances(session=session)

Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ def test_already_added_task_instances_can_be_ignored(self):
assert State.NONE == first_ti.state

@parameterized.expand([(state,) for state in State.task_states])
@mock.patch('airflow.models.dagrun.task_instance_mutation_hook')
@mock.patch('airflow.settings.task_instance_mutation_hook')
def test_task_instance_mutation_hook(self, state, mock_hook):
def mutate_task_instance(task_instance):
if task_instance.queue == 'queue1':
Expand Down

0 comments on commit 3919ee6

Please sign in to comment.