Skip to content

Commit

Permalink
Row-lock TIs to be removed during mapped task expansion (apache#28689)
Browse files Browse the repository at this point in the history
Instead of query-update, we row lock the TI to apply the update.
This protects against updating a row that has been updated by another process.
  • Loading branch information
ephraimbuddy authored Jan 4, 2023
1 parent 56fb1f1 commit a055d8f
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import ResolveMixin
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import skip_locked, with_row_locks
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.task_group import MappedTaskGroup
from airflow.utils.trigger_rule import TriggerRule
Expand Down Expand Up @@ -551,13 +552,15 @@ def expand_mapped_task(self, run_id: str, *, session: Session) -> tuple[Sequence

# Any (old) task instances with inapplicable indexes (>= the total
# number we need) are set to "REMOVED".
session.query(TaskInstance).filter(
query = session.query(TaskInstance).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == run_id,
TaskInstance.map_index >= total_expanded_ti_count,
).update({TaskInstance.state: TaskInstanceState.REMOVED})

)
to_update = with_row_locks(query, of=TaskInstance, session=session, **skip_locked(session=session))
for ti in to_update:
ti.state = TaskInstanceState.REMOVED
session.flush()
return all_expanded_tis, total_expanded_ti_count - 1

Expand Down

0 comments on commit a055d8f

Please sign in to comment.