diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py index d62aaa3ff51ce..3120ac5cdecc7 100644 --- a/airflow/jobs/triggerer_job.py +++ b/airflow/jobs/triggerer_job.py @@ -25,6 +25,7 @@ import time import warnings from collections import deque +from contextlib import suppress from copy import copy from queue import SimpleQueue from typing import TYPE_CHECKING, Deque @@ -601,8 +602,9 @@ async def run_trigger(self, trigger_id, trigger): # CancelledError will get injected when we're stopped - which is # fine, the cleanup process will understand that, but we want to # allow triggers a chance to cleanup, either in that case or if - # they exit cleanly. - trigger.cleanup() + # they exit cleanly. Exception from cleanup methods are ignored. + with suppress(Exception): + await trigger.cleanup() if SEND_TRIGGER_END_MARKER: self.mark_trigger_end(trigger) diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py index f616df66a08c7..fa968ebe9e5a1 100644 --- a/airflow/triggers/base.py +++ b/airflow/triggers/base.py @@ -79,12 +79,18 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: raise NotImplementedError("Triggers must implement run()") yield # To convince Mypy this is an async iterator. - def cleanup(self) -> None: + async def cleanup(self) -> None: """ Cleanup the trigger. Called when the trigger is no longer needed, and it's being removed from the active triggerer process. + + This method follows the async/await pattern to allow to run the cleanup + in triggerer main event loop. Exceptions raised by the cleanup method + are ignored, so if you would like to be able to debug them and be notified + that cleanup method failed, you should wrap your code with try/except block + and handle it appropriately (in async-compatible way). """ def __repr__(self) -> str: diff --git a/newsfragments/30152.significant.rst b/newsfragments/30152.significant.rst new file mode 100644 index 0000000000000..5b0325bbe8c07 --- /dev/null +++ b/newsfragments/30152.significant.rst @@ -0,0 +1,6 @@ +The ``cleanup()`` method in BaseTrigger is now defined as asynchronous (following async/await) pattern. + +This is potentially a breaking change for any custom trigger implementations that override the ``cleanup()`` +method and uses synchronous code, however using synchronous operations in cleanup was technically wrong, +because the method was executed in the main loop of the Triggerer and it was introducing unnecessary delays +impacting other triggers. The change is unlikely to affect any existing trigger implementations.