From b65dbaaf3f21ea5396da121bbfa7f895d0ab8516 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Fri, 17 Mar 2023 08:11:51 +0100 Subject: [PATCH] Make cleanup method in trigger an async one (#30152) Cleanup method is called in async run_trigger and it's easy to imagine that users might want to do some async operations in it (for example make an http call). Therefore the cleanup method should be asynchronous. Related https://github.com/apache/airflow/discussions/30141 --- airflow/jobs/triggerer_job.py | 6 ++++-- airflow/triggers/base.py | 8 +++++++- newsfragments/30152.significant.rst | 6 ++++++ 3 files changed, 17 insertions(+), 3 deletions(-) create mode 100644 newsfragments/30152.significant.rst diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py index d62aaa3ff51ce..3120ac5cdecc7 100644 --- a/airflow/jobs/triggerer_job.py +++ b/airflow/jobs/triggerer_job.py @@ -25,6 +25,7 @@ import time import warnings from collections import deque +from contextlib import suppress from copy import copy from queue import SimpleQueue from typing import TYPE_CHECKING, Deque @@ -601,8 +602,9 @@ async def run_trigger(self, trigger_id, trigger): # CancelledError will get injected when we're stopped - which is # fine, the cleanup process will understand that, but we want to # allow triggers a chance to cleanup, either in that case or if - # they exit cleanly. - trigger.cleanup() + # they exit cleanly. Exception from cleanup methods are ignored. + with suppress(Exception): + await trigger.cleanup() if SEND_TRIGGER_END_MARKER: self.mark_trigger_end(trigger) diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py index f616df66a08c7..fa968ebe9e5a1 100644 --- a/airflow/triggers/base.py +++ b/airflow/triggers/base.py @@ -79,12 +79,18 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: raise NotImplementedError("Triggers must implement run()") yield # To convince Mypy this is an async iterator. - def cleanup(self) -> None: + async def cleanup(self) -> None: """ Cleanup the trigger. Called when the trigger is no longer needed, and it's being removed from the active triggerer process. + + This method follows the async/await pattern to allow to run the cleanup + in triggerer main event loop. Exceptions raised by the cleanup method + are ignored, so if you would like to be able to debug them and be notified + that cleanup method failed, you should wrap your code with try/except block + and handle it appropriately (in async-compatible way). """ def __repr__(self) -> str: diff --git a/newsfragments/30152.significant.rst b/newsfragments/30152.significant.rst new file mode 100644 index 0000000000000..5b0325bbe8c07 --- /dev/null +++ b/newsfragments/30152.significant.rst @@ -0,0 +1,6 @@ +The ``cleanup()`` method in BaseTrigger is now defined as asynchronous (following async/await) pattern. + +This is potentially a breaking change for any custom trigger implementations that override the ``cleanup()`` +method and uses synchronous code, however using synchronous operations in cleanup was technically wrong, +because the method was executed in the main loop of the Triggerer and it was introducing unnecessary delays +impacting other triggers. The change is unlikely to affect any existing trigger implementations.