Skip to content

Commit

Permalink
[Task] add NonCachingRotatingFileHanlder for worker task (apache#41064)
Browse files Browse the repository at this point in the history
* [Task] add NonCachingRotatingFileHanlder for worker task

* [Task] format the change
  • Loading branch information
HuanjieGuo authored Jul 28, 2024
1 parent 7e910e5 commit b6ed1a9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
25 changes: 22 additions & 3 deletions airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string, render_template_to_string
from airflow.utils.log.logging_mixin import SetContextPropagate
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.log.non_caching_file_handler import NonCachingRotatingFileHandler
from airflow.utils.session import provide_session
from airflow.utils.state import State, TaskInstanceState

Expand Down Expand Up @@ -176,14 +176,24 @@ class FileTaskHandler(logging.Handler):
:param base_log_folder: Base log folder to place logs.
:param filename_template: template filename string
:param max_bytes: max bytes size for the log file
:param backup_count: backup file count for the log file
:param delay: default False -> StreamHandler, True -> Handler
"""

trigger_should_wrap = True
inherits_from_empty_operator_log_message = (
"Operator inherits from empty operator and thus does not have logs"
)

def __init__(self, base_log_folder: str, filename_template: str | None = None):
def __init__(
self,
base_log_folder: str,
filename_template: str | None = None,
max_bytes: int = 0,
backup_count: int = 0,
delay: bool = False,
):
super().__init__()
self.handler: logging.Handler | None = None
self.local_base = base_log_folder
Expand All @@ -196,6 +206,9 @@ def __init__(self, base_log_folder: str, filename_template: str | None = None):
stacklevel=(2 if type(self) == FileTaskHandler else 3),
)
self.maintain_propagate: bool = False
self.max_bytes = max_bytes
self.backup_count = backup_count
self.delay = delay
"""
If true, overrides default behavior of setting propagate=False
Expand Down Expand Up @@ -224,7 +237,13 @@ def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> Non
to task logs from a context other than task or trigger run
"""
local_loc = self._init_file(ti, identifier=identifier)
self.handler = NonCachingFileHandler(local_loc, encoding="utf-8")
self.handler = NonCachingRotatingFileHandler(
local_loc,
encoding="utf-8",
maxBytes=self.max_bytes,
backupCount=self.backup_count,
delay=self.delay,
)
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,24 @@ Example of custom logger name:
}
},
)
If you want to limit the log size of the tasks, you can add the handlers.task.max_bytes parameter.

Example of limiting the size of tasks:

.. code-block:: python
from copy import deepcopy
from pydantic.utils import deep_update
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
LOGGING_CONFIG = deep_update(
deepcopy(DEFAULT_LOGGING_CONFIG),
{
"handlers": {
"task": {
"max_bytes": 104857600, # 100MB
}
}
},
)

0 comments on commit b6ed1a9

Please sign in to comment.