Skip to content

Commit

Permalink
Simplify logging (#1108)
Browse files Browse the repository at this point in the history
This PR addresses #906 and fixes issues in the Cosmos logging once and
for all.\*

> \* Actually, there is another issue with logs being polluting with
warnings about "source" nodes in 1.5.0, but that is a separate matter!

I have a long explanation of how the `logging` module in Python works,
and the sort of idioms it expects of end users of the module, here:
#906 (comment)

The choices I made, explained:

- Although I don't know that I entirely agree with adding
`(astronomer-cosmos)` to all the logs, clearly at least one user, and
possibly many more, want it, and I don't believe we should remove it.
The objective of this PR was therefore to preserve the feature while
future-proofing against future issues.
- Why I can't say I'm a fan of it: It seems that adding
`(astronomer-cosmos)` to logs is a symptom of other problems with the
Cosmos library, specifically how it impacts performance when users do
not set it up effectively. And the prefix was added as a way to assist
people in diagnosing these issues. I think ultimately we want to move
away from this. Other components of the Airflow ecosystem do not feel
compelled to do things like this. Also, the module path is something
that can be handled in the `log_format` if users really want it.
- How I future-proofed: As per the long post I link above, basically the
issue is that there should not be tons of StreamHandlers being created.
The proper and typical use of the logging module, with few exceptions,
is to allow for logs to propagate upwards to a root logger. The reason
the Cosmos logs presented issues for so long was because it deviated a
lot from this.
- I think default behavior being the "least astonishing" means making no
modifications to the base logging behavior whatsoever. This is also less
likely to morph into future issues if any further changes are made to
the custom logging.
- One thing I never mentioned: I found it odd that by default Cosmos did
not "work out of the box" and that, despite using Astronomer's own
Airflow platform (!), I had to set a config option that made Cosmos
logging not be a nightmare (i.e. set `propagate_logs` = false). Previous
logs referenced the Celery Executor as having issues, even though this
is one of 2 of the most popular production ways to run Airflow.
Something like this should just work out of the box for a majority of
users!
- For task execution, Cosmos should make use of the more
Airflow-idiomatic `LoggingMixin` class whenever appropriate. This can
also be used in scheduler / webserver related logging contexts but I
think it is less out-of-place there to use globally scoped loggers.
- These will not use the `get_logger()` implementation. That is
intentional and probably desirable. These logs do not need to be
"enriched" because they are isolated in the task execution logs.

Oh also, I fixed an issue in the `project.entry_points` in the
`pyproject.toml` while I was at it.

## Breaking Change?

- Removes `propagate_logging` conf option, although removing this will
not break users' builds. There is now a `rich_logging` conf option
instead, which by default is disabled.
  • Loading branch information
dwreeves authored Aug 15, 2024
1 parent f7354e5 commit 89f5999
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 113 deletions.
4 changes: 4 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def get_provider_info():
"propagate_logs": {
"description": "Enable log propagation from Cosmos custom logger\n",
"version_added": "1.3.0a1",
"version_deprecated": "1.6.0a1",
"deprecation_reason": "`propagate_logs` is no longer necessary as of Cosmos 1.6.0"
" because the issue this option was meant to address is no longer an"
" issue with Cosmos's new logging approach.",
"type": "boolean",
"example": None,
"default": "True",
Expand Down
18 changes: 7 additions & 11 deletions cosmos/hooks/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@

from airflow.hooks.base import BaseHook

from cosmos.log import get_logger

logger = get_logger(__name__)


class FullOutputSubprocessResult(NamedTuple):
exit_code: int
Expand Down Expand Up @@ -57,7 +53,7 @@ def run_command(
``output``: the last line from stderr or stdout
``full_output``: all lines from stderr or stdout.
"""
logger.info("Tmp dir root location: \n %s", gettempdir())
self.log.info("Tmp dir root location: \n %s", gettempdir())
log_lines = []
with contextlib.ExitStack() as stack:
if cwd is None:
Expand All @@ -70,7 +66,7 @@ def pre_exec() -> None:
signal.signal(getattr(signal, sig), signal.SIG_DFL)
os.setsid()

logger.info("Running command: %s", command)
self.log.info("Running command: %s", command)

self.sub_process = Popen(
command,
Expand All @@ -81,7 +77,7 @@ def pre_exec() -> None:
preexec_fn=pre_exec,
)

logger.info("Command output:")
self.log.info("Command output:")
line = ""

if self.sub_process is None:
Expand All @@ -91,23 +87,23 @@ def pre_exec() -> None:
line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip()
# storing the warn & error lines to be used later
log_lines.append(line)
logger.info("%s", line)
self.log.info("%s", line)

self.sub_process.wait()

logger.info("Command exited with return code %s", self.sub_process.returncode)
self.log.info("Command exited with return code %s", self.sub_process.returncode)
return_code: int = self.sub_process.returncode

return FullOutputSubprocessResult(exit_code=return_code, output=line, full_output=log_lines)

def send_sigterm(self) -> None:
"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""
logger.info("Sending SIGTERM signal to process group")
self.log.info("Sending SIGTERM signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

def send_sigint(self) -> None:
"""Sends SIGINT signal to ``self.sub_process`` if one exists."""
logger.info("Sending SIGINT signal to process group")
self.log.info("Sending SIGINT signal to process group")
if self.sub_process and hasattr(self.sub_process, "pid"):
os.killpg(os.getpgid(self.sub_process.pid), signal.SIGINT)
35 changes: 17 additions & 18 deletions cosmos/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@

import logging

from airflow.utils.log.colored_log import CustomTTYColoredFormatter
from cosmos.settings import rich_logging

from cosmos.settings import propagate_logs

LOG_FORMAT: str = (
"[%(blue)s%(asctime)s%(reset)s] "
"{%(blue)s%(filename)s:%(reset)s%(lineno)d} "
"%(log_color)s%(levelname)s%(reset)s - "
"%(purple)s(astronomer-cosmos)%(reset)s - "
"%(log_color)s%(message)s%(reset)s"
)
class CosmosRichLogger(logging.Logger):
"""Custom Logger that prepends ``(astronomer-cosmos)`` to each log message in the scheduler."""

LOGGER_NAME_TEMPLATE = "astronomer-cosmos-{}"
def handle(self, record: logging.LogRecord) -> None:
record.msg = "\x1b[35m(astronomer-cosmos)\x1b[0m " + record.msg
return super().handle(record)


def get_logger(name: str) -> logging.Logger:
Expand All @@ -24,13 +20,16 @@ def get_logger(name: str) -> logging.Logger:
Airflow logs usually look like:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - 13:20:55 Completed successfully
By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages:
This logger introduces a (magenta) astronomer-cosmos string into the project's log messages,
as long as the ``rich_logging`` setting is True:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully
"""
logger = logging.getLogger(LOGGER_NAME_TEMPLATE.format(name))
formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = propagate_logs
return logger
if rich_logging:
cls = logging.getLoggerClass()
try:
logging.setLoggerClass(CosmosRichLogger)
return logging.getLogger(name)
finally:
logging.setLoggerClass(cls)
else:
return logging.getLogger(name)
5 changes: 1 addition & 4 deletions cosmos/operators/azure_container_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from airflow.utils.context import Context

from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtLSMixin,
Expand All @@ -17,8 +16,6 @@
DbtTestMixin,
)

logger = get_logger(__name__)

# ACI is an optional dependency, so we need to check if it's installed
try:
from airflow.providers.microsoft.azure.operators.container_instances import AzureContainerInstancesOperator
Expand Down Expand Up @@ -68,7 +65,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = AzureContainerInstancesOperator.execute(self, context)
logger.info(result)
self.log.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
Expand Down
7 changes: 2 additions & 5 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
from airflow.utils.strings import to_boolean

from cosmos.dbt.executable import get_system_dbt
from cosmos.log import get_logger

logger = get_logger(__name__)


class AbstractDbtBaseOperator(BaseOperator, metaclass=ABCMeta):
Expand Down Expand Up @@ -178,14 +175,14 @@ def get_env(self, context: Context) -> dict[str, str | bytes | os.PathLike[Any]]
filtered_env[key] = val
else:
if isinstance(key, accepted_types):
logger.warning(
self.log.warning(
"Env var %s was ignored because its key is not a valid type. Must be one of %s",
key,
accepted_types,
)

if isinstance(val, accepted_types):
logger.warning(
self.log.warning(
"Env var %s was ignored because its value is not a valid type. Must be one of %s",
key,
accepted_types,
Expand Down
5 changes: 1 addition & 4 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from airflow.utils.context import Context

from cosmos.log import get_logger
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
Expand All @@ -17,8 +16,6 @@
DbtTestMixin,
)

logger = get_logger(__name__)

# docker is an optional dependency, so we need to check if it's installed
try:
from airflow.providers.docker.operators.docker import DockerOperator
Expand Down Expand Up @@ -52,7 +49,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = DockerOperator.execute(self, context)
logger.info(result)
self.log.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
Expand Down
5 changes: 1 addition & 4 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from cosmos.config import ProfileConfig
from cosmos.dbt.parser.output import extract_log_issues
from cosmos.log import get_logger
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
Expand All @@ -24,8 +23,6 @@
DBT_NO_TESTS_MSG = "Nothing to do"
DBT_WARN_MSG = "WARN"

logger = get_logger(__name__)

try:
# apache-airflow-providers-cncf-kubernetes >= 7.4.0
from airflow.providers.cncf.kubernetes.backcompat.backwards_compat_converters import (
Expand Down Expand Up @@ -74,7 +71,7 @@ def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None
self.build_kube_args(context, cmd_flags)
self.log.info(f"Running command: {self.arguments}")
result = KubernetesPodOperator.execute(self, context)
logger.info(result)
self.log.info(result)

def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
Expand Down
Loading

0 comments on commit 89f5999

Please sign in to comment.