Skip to content

Commit 127c072

Browse files
authored
Revert "Remove remaining Airflow 2.5 backcompat code from GCS Task Handler (#36443)" (#36453)
This reverts commit 75faf11.
1 parent d73bef2 commit 127c072

File tree

2 files changed

+45
-5
lines changed

2 files changed

+45
-5
lines changed

airflow/providers/google/cloud/log/gcs_task_handler.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
# not sure why but mypy complains on missing `storage` but it is clearly there and is importable
2828
from google.cloud import storage # type: ignore[attr-defined]
29+
from packaging.version import Version
2930

3031
from airflow.configuration import conf
3132
from airflow.exceptions import AirflowNotFoundException
@@ -47,6 +48,18 @@
4748
logger = logging.getLogger(__name__)
4849

4950

51+
def get_default_delete_local_copy():
52+
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not.
53+
54+
TODO: delete this function when min airflow version >= 2.6.
55+
"""
56+
from airflow.version import version
57+
58+
if Version(version) < Version("2.6"):
59+
return False
60+
return conf.getboolean("logging", "delete_local_logs")
61+
62+
5063
class GCSTaskHandler(FileTaskHandler, LoggingMixin):
5164
"""
5265
GCSTaskHandler is a python log handler that handles and reads task instance logs.
@@ -95,8 +108,8 @@ def __init__(
95108
self.gcp_keyfile_dict = gcp_keyfile_dict
96109
self.scopes = gcp_scopes
97110
self.project_id = project_id
98-
self.delete_local_copy = kwargs.get(
99-
"delete_local_copy", conf.getboolean("logging", "delete_local_logs")
111+
self.delete_local_copy = (
112+
kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy()
100113
)
101114

102115
@cached_property
@@ -205,6 +218,30 @@ def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], l
205218
messages.append(f"Unable to read remote log {e}")
206219
return messages, logs
207220

221+
def _read(self, ti, try_number, metadata=None):
222+
"""
223+
Read logs of given task instance and try_number from GCS.
224+
225+
If failed, read the log from task instance host machine.
226+
227+
todo: when min airflow version >= 2.6, remove this method
228+
229+
:param ti: task instance object
230+
:param try_number: task instance try_number to read logs from
231+
:param metadata: log metadata,
232+
can be used for steaming log reading and auto-tailing.
233+
"""
234+
if hasattr(super(), "_read_remote_logs"):
235+
# from Airflow 2.6, we don't implement the `_read` method.
236+
# if parent has _read_remote_logs, we're >= 2.6
237+
return super()._read(ti, try_number, metadata)
238+
239+
messages, logs = self._read_remote_logs(ti, try_number, metadata)
240+
if not logs:
241+
return super()._read(ti, try_number, metadata)
242+
243+
return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True}
244+
208245
def gcs_write(self, log, remote_log_location) -> bool:
209246
"""
210247
Write the log to the remote location and return `True`; fail silently and return `False` on error.

tests/providers/google/cloud/log/test_gcs_task_handler.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,8 @@ def test_write_to_remote_on_close_failed_read_old_logs(self, mock_blob, mock_cli
248248
)
249249

250250
@pytest.mark.parametrize(
251-
"delete_local_copy, expected_existence_of_local_copy",
252-
[(True, False), (False, True)],
251+
"delete_local_copy, expected_existence_of_local_copy, airflow_version",
252+
[(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")],
253253
)
254254
@mock.patch(
255255
"airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id",
@@ -265,9 +265,12 @@ def test_close_with_delete_local_copy_conf(
265265
local_log_location,
266266
delete_local_copy,
267267
expected_existence_of_local_copy,
268+
airflow_version,
268269
):
269270
mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT"
270-
with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}):
271+
with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch(
272+
"airflow.version.version", airflow_version
273+
):
271274
handler = GCSTaskHandler(
272275
base_log_folder=local_log_location,
273276
gcs_log_folder="gs://bucket/remote/log/location",

0 commit comments

Comments
 (0)