diff --git a/src/prefect/logging/configuration.py b/src/prefect/logging/configuration.py index 638c605e3a29..d3cc0341fcba 100644 --- a/src/prefect/logging/configuration.py +++ b/src/prefect/logging/configuration.py @@ -38,7 +38,9 @@ def load_logging_config(path: Path) -> dict[str, Any]: warnings.filterwarnings("ignore", category=DeprecationWarning) config = yaml.safe_load( # Substitute settings into the template in format $SETTING / ${SETTING} - template.substitute(current_settings.to_environment_variables()) + template.substitute( + current_settings.to_environment_variables(include_aliases=True) + ) ) # Load overrides from the environment diff --git a/src/prefect/settings/base.py b/src/prefect/settings/base.py index 57ebbbd06ed9..52283d8a2d8a 100644 --- a/src/prefect/settings/base.py +++ b/src/prefect/settings/base.py @@ -92,6 +92,7 @@ def to_environment_variables( self, exclude_unset: bool = False, include_secrets: bool = True, + include_aliases: bool = False, ) -> Dict[str, str]: """Convert the settings object to a dictionary of environment variables.""" env: Dict[str, Any] = self.model_dump( @@ -105,12 +106,26 @@ def to_environment_variables( child_env = child_settings.to_environment_variables( exclude_unset=exclude_unset, include_secrets=include_secrets, + include_aliases=include_aliases, ) env_variables.update(child_env) elif (value := env.get(key)) is not None: - env_variables[f"{self.model_config.get('env_prefix')}{key.upper()}"] = ( - _to_environment_variable_value(value) - ) + validation_alias = self.model_fields[key].validation_alias + if include_aliases and validation_alias is not None: + if isinstance(validation_alias, AliasChoices): + for alias in validation_alias.choices: + if isinstance(alias, str): + env_variables[alias.upper()] = ( + _to_environment_variable_value(value) + ) + elif isinstance(validation_alias, str): + env_variables[validation_alias.upper()] = ( + _to_environment_variable_value(value) + ) + else: + env_variables[ + f"{self.model_config.get('env_prefix')}{key.upper()}" + ] = _to_environment_variable_value(value) return env_variables @model_serializer( diff --git a/tests/test_logging.py b/tests/test_logging.py index 63d13d35b6af..940c24b0485f 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json import logging import sys @@ -6,7 +8,8 @@ from contextlib import nullcontext from functools import partial from io import StringIO -from typing import Type +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Generator from unittest import mock from unittest.mock import ANY, MagicMock @@ -21,6 +24,7 @@ import prefect.settings from prefect import flow, task from prefect._internal.concurrency.api import create_call, from_sync +from prefect.client.orchestration import PrefectClient from prefect.context import FlowRunContext, TaskRunContext from prefect.exceptions import MissingContextError from prefect.logging import LogEavesdropper @@ -62,6 +66,7 @@ PREFECT_LOGGING_TO_API_ENABLED, PREFECT_LOGGING_TO_API_MAX_LOG_SIZE, PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW, + PREFECT_SERVER_LOGGING_LEVEL, PREFECT_TEST_MODE, temporary_settings, ) @@ -71,6 +76,10 @@ from prefect.utilities.names import obfuscate from prefect.workers.base import BaseJobConfiguration, BaseWorker +if TYPE_CHECKING: + from prefect.client.schemas.objects import FlowRun, TaskRun + from prefect.server.events.pipeline import EventsPipeline + @pytest.fixture def dictConfigMock(monkeypatch: pytest.MonkeyPatch): @@ -84,18 +93,18 @@ def dictConfigMock(monkeypatch: pytest.MonkeyPatch): @pytest.fixture -async def logger_test_deployment(prefect_client): +async def logger_test_deployment(prefect_client: PrefectClient): """ A deployment with a flow that returns information about the given loggers """ @prefect.flow - def my_flow(loggers=["foo", "bar", "prefect"]): + def my_flow() -> dict[str, Any]: import logging - settings = {} + settings: dict[str, Any] = {} - for logger_name in loggers: + for logger_name in ["foo", "bar", "prefect"]: logger = logging.getLogger(logger_name) settings[logger_name] = { "handlers": [handler.name for handler in logger.handlers], @@ -115,7 +124,7 @@ def my_flow(loggers=["foo", "bar", "prefect"]): return deployment_id -def test_setup_logging_uses_default_path(tmp_path, dictConfigMock): +def test_setup_logging_uses_default_path(tmp_path: Path, dictConfigMock: MagicMock): with temporary_settings( {PREFECT_LOGGING_SETTINGS_PATH: tmp_path.joinpath("does-not-exist.yaml")} ): @@ -126,7 +135,7 @@ def test_setup_logging_uses_default_path(tmp_path, dictConfigMock): dictConfigMock.assert_called_once_with(expected_config) -def test_setup_logging_sets_incremental_on_repeated_calls(dictConfigMock): +def test_setup_logging_sets_incremental_on_repeated_calls(dictConfigMock: MagicMock): setup_logging() assert dictConfigMock.call_count == 1 setup_logging() @@ -135,7 +144,9 @@ def test_setup_logging_sets_incremental_on_repeated_calls(dictConfigMock): assert dictConfigMock.mock_calls[1][1][0]["incremental"] is True -def test_setup_logging_uses_settings_path_if_exists(tmp_path, dictConfigMock): +def test_setup_logging_uses_settings_path_if_exists( + tmp_path: Path, dictConfigMock: MagicMock +): config_file = tmp_path.joinpath("exists.yaml") config_file.write_text("foo: bar") @@ -147,12 +158,14 @@ def test_setup_logging_uses_settings_path_if_exists(tmp_path, dictConfigMock): dictConfigMock.assert_called_once_with(expected_config) -def test_setup_logging_uses_env_var_overrides(tmp_path, dictConfigMock, monkeypatch): +def test_setup_logging_uses_env_var_overrides( + tmp_path: Path, dictConfigMock: MagicMock, monkeypatch: pytest.MonkeyPatch +): with temporary_settings( {PREFECT_LOGGING_SETTINGS_PATH: tmp_path.joinpath("does-not-exist.yaml")} ): expected_config = load_logging_config(DEFAULT_LOGGING_SETTINGS_PATH) - env = {} + env: dict[str, Any] = {} expected_config["incremental"] = False @@ -188,8 +201,27 @@ def test_setup_logging_uses_env_var_overrides(tmp_path, dictConfigMock, monkeypa dictConfigMock.assert_called_once_with(expected_config) +def test_setting_aliases_respected_for_logging_config(tmp_path: Path): + logging_config_content = """ +loggers: + prefect: + level: "${PREFECT_LOGGING_SERVER_LEVEL}" +""" + config_file = tmp_path / "logging.yaml" + config_file.write_text(logging_config_content) + + with temporary_settings( + { + PREFECT_LOGGING_SETTINGS_PATH: config_file, + PREFECT_SERVER_LOGGING_LEVEL: "INFO", + } + ): + config = setup_logging() + assert config["loggers"]["prefect"]["level"] == "INFO" + + @pytest.mark.parametrize("name", ["default", None, ""]) -def test_get_logger_returns_prefect_logger_by_default(name): +def test_get_logger_returns_prefect_logger_by_default(name: str | None): if name == "default": logger = get_logger() else: @@ -208,7 +240,9 @@ def test_get_logger_does_not_duplicate_prefect_prefix(): assert logger.name == "prefect.foo" -def test_default_level_is_applied_to_interpolated_yaml_values(dictConfigMock): +def test_default_level_is_applied_to_interpolated_yaml_values( + dictConfigMock: MagicMock, +): with temporary_settings( {PREFECT_LOGGING_LEVEL: "WARNING", PREFECT_TEST_MODE: False} ): @@ -224,7 +258,7 @@ def test_default_level_is_applied_to_interpolated_yaml_values(dictConfigMock): @pytest.fixture() -def external_logger_setup(request): +def external_logger_setup(request: pytest.FixtureRequest): # This fixture will create a logger with the specified name, level, and propagate value name, level = request.param logger = logging.getLogger(name) @@ -254,10 +288,10 @@ def external_logger_setup(request): ("foo.child", logging.CRITICAL), ], indirect=True, - ids=lambda x: f"logger='{x[0]}'-level='{logging.getLevelName(x[1])}'", + ids=lambda x: f"logger='{x[0]}'-level='{logging._levelToName[x[1]]}'", # type: ignore[reportPrivateUsage] ) def test_setup_logging_extra_loggers_does_not_modify_external_logger_level( - dictConfigMock, external_logger_setup + dictConfigMock: MagicMock, external_logger_setup: tuple[str, int, bool] ): ext_name, ext_level, ext_propagate = external_logger_setup with temporary_settings( @@ -284,7 +318,7 @@ def test_setup_logging_extra_loggers_does_not_modify_external_logger_level( @pytest.fixture -def mock_log_worker(monkeypatch): +def mock_log_worker(monkeypatch: pytest.MonkeyPatch): mock = MagicMock() monkeypatch.setattr("prefect.logging.handlers.APILogWorker", mock) return mock @@ -293,24 +327,28 @@ def mock_log_worker(monkeypatch): @pytest.mark.enable_api_log_handler class TestAPILogHandler: @pytest.fixture - def handler(self): + def handler(self) -> Generator[APILogHandler, None, None]: yield APILogHandler() @pytest.fixture - def logger(self, handler): + def logger(self, handler: APILogHandler): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(handler) yield logger logger.removeHandler(handler) - def test_worker_is_not_flushed_on_handler_close(self, mock_log_worker): + def test_worker_is_not_flushed_on_handler_close(self, mock_log_worker: MagicMock): handler = APILogHandler() handler.close() mock_log_worker.drain_all.assert_not_called() async def test_logs_can_still_be_sent_after_close( - self, logger, handler, flow_run, prefect_client + self, + logger: logging.Logger, + handler: APILogHandler, + flow_run: "FlowRun", + prefect_client: PrefectClient, ): logger.info("Test", extra={"flow_run_id": flow_run.id}) handler.close() # Close it @@ -321,7 +359,11 @@ async def test_logs_can_still_be_sent_after_close( assert len(logs) == 2 async def test_logs_can_still_be_sent_after_flush( - self, logger, handler, flow_run, prefect_client + self, + logger: logging.Logger, + handler: APILogHandler, + flow_run: "FlowRun", + prefect_client: PrefectClient, ): logger.info("Test", extra={"flow_run_id": flow_run.id}) await handler.aflush() @@ -332,7 +374,11 @@ async def test_logs_can_still_be_sent_after_flush( assert len(logs) == 2 async def test_sync_flush_from_async_context( - self, logger, handler, flow_run, prefect_client + self, + logger: logging.Logger, + handler: APILogHandler, + flow_run: "FlowRun", + prefect_client: PrefectClient, ): logger.info("Test", extra={"flow_run_id": flow_run.id}) handler.flush() @@ -343,16 +389,22 @@ async def test_sync_flush_from_async_context( logs = await prefect_client.read_logs() assert len(logs) == 1 - def test_sync_flush_from_global_event_loop(self, logger, handler, flow_run): + def test_sync_flush_from_global_event_loop( + self, logger: logging.Logger, handler: APILogHandler, flow_run: "FlowRun" + ): logger.info("Test", extra={"flow_run_id": flow_run.id}) with pytest.raises(RuntimeError, match="would block"): from_sync.call_soon_in_loop_thread(create_call(handler.flush)).result() - def test_sync_flush_from_sync_context(self, logger, handler, flow_run): + def test_sync_flush_from_sync_context( + self, logger: logging.Logger, handler: APILogHandler, flow_run: "FlowRun" + ): logger.info("Test", extra={"flow_run_id": flow_run.id}) handler.flush() - def test_sends_task_run_log_to_worker(self, logger, mock_log_worker, task_run): + def test_sends_task_run_log_to_worker( + self, logger: logging.Logger, mock_log_worker: MagicMock, task_run: "TaskRun" + ): with TaskRunContext.model_construct(task_run=task_run): logger.info("test-task") @@ -368,7 +420,9 @@ def test_sends_task_run_log_to_worker(self, logger, mock_log_worker, task_run): mock_log_worker.instance().send.assert_called_once_with(expected) - def test_sends_flow_run_log_to_worker(self, logger, mock_log_worker, flow_run): + def test_sends_flow_run_log_to_worker( + self, logger: logging.Logger, mock_log_worker: MagicMock, flow_run: "FlowRun" + ): with FlowRunContext.model_construct(flow_run=flow_run): logger.info("test-flow") @@ -386,7 +440,11 @@ def test_sends_flow_run_log_to_worker(self, logger, mock_log_worker, flow_run): @pytest.mark.parametrize("with_context", [True, False]) def test_respects_explicit_flow_run_id( - self, logger, mock_log_worker, flow_run, with_context + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + flow_run: "FlowRun", + with_context: bool, ): flow_run_id = uuid.uuid4() context = ( @@ -411,7 +469,12 @@ def test_respects_explicit_flow_run_id( @pytest.mark.parametrize("with_context", [True, False]) def test_respects_explicit_task_run_id( - self, logger, mock_log_worker, flow_run, with_context, task_run + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + flow_run: "FlowRun", + with_context: bool, + task_run: "TaskRun", ): task_run_id = uuid.uuid4() context = ( @@ -435,13 +498,15 @@ def test_respects_explicit_task_run_id( mock_log_worker.instance().send.assert_called_once_with(expected) - def test_does_not_emit_logs_below_level(self, logger, mock_log_worker): + def test_does_not_emit_logs_below_level( + self, logger: logging.Logger, mock_log_worker: MagicMock + ): logger.setLevel(logging.WARNING) logger.info("test-task", extra={"flow_run_id": uuid.uuid4()}) mock_log_worker.instance().send.assert_not_called() def test_explicit_task_run_id_still_requires_flow_run_id( - self, logger, mock_log_worker + self, logger: logging.Logger, mock_log_worker: MagicMock ): task_run_id = uuid.uuid4() with pytest.warns( @@ -452,7 +517,11 @@ def test_explicit_task_run_id_still_requires_flow_run_id( mock_log_worker.instance().send.assert_not_called() def test_sets_timestamp_from_record_created_time( - self, logger, mock_log_worker, flow_run, handler + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + flow_run: "FlowRun", + handler: APILogHandler, ): # Capture the record handler.emit = MagicMock(side_effect=handler.emit) @@ -468,10 +537,17 @@ def test_sets_timestamp_from_record_created_time( ) def test_sets_timestamp_from_time_if_missing_from_recrod( - self, logger, mock_log_worker, flow_run, handler, monkeypatch + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + flow_run: "FlowRun", + handler: APILogHandler, + monkeypatch: pytest.MonkeyPatch, ): - def drop_created_and_emit(emit, record): - record.created = None + def drop_created_and_emit( + emit: Callable[[logging.LogRecord], None], record: logging.LogRecord + ): + record.created = None # type: ignore return emit(record) handler.emit = MagicMock( @@ -488,14 +564,22 @@ def drop_created_and_emit(emit, record): assert log_dict["timestamp"] == from_timestamp(now).to_iso8601_string() - def test_does_not_send_logs_that_opt_out(self, logger, mock_log_worker, task_run): + def test_does_not_send_logs_that_opt_out( + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + task_run: "TaskRun", + ): with TaskRunContext.model_construct(task_run=task_run): logger.info("test", extra={"send_to_api": False}) mock_log_worker.instance().send.assert_not_called() def test_does_not_send_logs_when_handler_is_disabled( - self, logger, mock_log_worker, task_run + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + task_run: "TaskRun", ): with temporary_settings( updates={PREFECT_LOGGING_TO_API_ENABLED: "False"}, @@ -506,7 +590,10 @@ def test_does_not_send_logs_when_handler_is_disabled( mock_log_worker.instance().send.assert_not_called() def test_does_not_send_logs_outside_of_run_context_with_default_setting( - self, logger, mock_log_worker, capsys + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + capsys: pytest.CaptureFixture[str], ): # Warns in the main process with pytest.warns( @@ -522,7 +609,8 @@ def test_does_not_send_logs_outside_of_run_context_with_default_setting( def test_does_not_raise_when_logger_outside_of_run_context_with_default_setting( self, - logger, + logger: logging.Logger, + capsys: pytest.CaptureFixture[str], ): with pytest.warns( UserWarning, @@ -534,7 +622,10 @@ def test_does_not_raise_when_logger_outside_of_run_context_with_default_setting( logger.info("test") def test_does_not_send_logs_outside_of_run_context_with_error_setting( - self, logger, mock_log_worker, capsys + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + capsys: pytest.CaptureFixture[str], ): with temporary_settings( updates={PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW: "error"}, @@ -553,7 +644,7 @@ def test_does_not_send_logs_outside_of_run_context_with_error_setting( def test_does_not_warn_when_logger_outside_of_run_context_with_error_setting( self, - logger, + logger: logging.Logger, ): with temporary_settings( updates={PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW: "error"}, @@ -568,7 +659,10 @@ def test_does_not_warn_when_logger_outside_of_run_context_with_error_setting( logger.info("test") def test_does_not_send_logs_outside_of_run_context_with_ignore_setting( - self, logger, mock_log_worker, capsys + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + capsys: pytest.CaptureFixture[str], ): with temporary_settings( updates={PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW: "ignore"}, @@ -583,7 +677,7 @@ def test_does_not_send_logs_outside_of_run_context_with_ignore_setting( def test_does_not_raise_or_warn_when_logger_outside_of_run_context_with_ignore_setting( self, - logger, + logger: logging.Logger, ): with temporary_settings( updates={PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW: "ignore"}, @@ -591,7 +685,10 @@ def test_does_not_raise_or_warn_when_logger_outside_of_run_context_with_ignore_s logger.info("test") def test_does_not_send_logs_outside_of_run_context_with_warn_setting( - self, logger, mock_log_worker, capsys + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + capsys: pytest.CaptureFixture[str], ): with temporary_settings( updates={PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW: "warn"}, @@ -609,7 +706,8 @@ def test_does_not_send_logs_outside_of_run_context_with_warn_setting( assert output.err == "" def test_does_not_raise_when_logger_outside_of_run_context_with_warn_setting( - self, logger + self, + logger: logging.Logger, ): with temporary_settings( updates={PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW: "warn"}, @@ -624,7 +722,9 @@ def test_does_not_raise_when_logger_outside_of_run_context_with_warn_setting( logger.info("test") def test_missing_context_warning_refers_to_caller_lineno( - self, logger, mock_log_worker + self, + logger: logging.Logger, + mock_log_worker: MagicMock, ): from inspect import currentframe, getframeinfo @@ -633,7 +733,7 @@ def test_missing_context_warning_refers_to_caller_lineno( UserWarning, match="attempted to send logs .* without a flow run id" ) as warnings: logger.info("test") - lineno = getframeinfo(currentframe()).lineno - 1 + lineno = getframeinfo(currentframe()).lineno - 1 # type: ignore # The above dynamic collects the line number so that added tests do not # break this test @@ -641,7 +741,11 @@ def test_missing_context_warning_refers_to_caller_lineno( assert warnings.pop().lineno == lineno def test_writes_logging_errors_to_stderr( - self, logger, mock_log_worker, capsys, monkeypatch + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + capsys: pytest.CaptureFixture[str], + monkeypatch: pytest.MonkeyPatch, ): monkeypatch.setattr( "prefect.logging.handlers.APILogHandler.prepare", @@ -657,7 +761,10 @@ def test_writes_logging_errors_to_stderr( assert "RuntimeError: Oh no!" in output.err def test_does_not_write_error_for_logs_outside_run_context_that_opt_out( - self, logger, mock_log_worker, capsys + self, + logger: logging.Logger, + mock_log_worker: MagicMock, + capsys: pytest.CaptureFixture[str], ): logger.info("test", extra={"send_to_api": False}) @@ -669,7 +776,11 @@ def test_does_not_write_error_for_logs_outside_run_context_that_opt_out( ) async def test_does_not_enqueue_logs_that_are_too_big( - self, task_run, logger, capsys, mock_log_worker + self, + task_run: "TaskRun", + logger: logging.Logger, + capsys: pytest.CaptureFixture[str], + mock_log_worker: MagicMock, ): with TaskRunContext.model_construct(task_run=task_run): with temporary_settings(updates={PREFECT_LOGGING_TO_API_MAX_LOG_SIZE: "1"}): @@ -693,16 +804,16 @@ def test_handler_knows_how_large_logs_are(self): log_size = len(json.dumps(dict_log)) assert log_size == 211 handler = APILogHandler() - assert handler._get_payload_size(dict_log) == log_size + assert handler._get_payload_size(dict_log) == log_size # type: ignore[reportPrivateUsage] WORKER_ID = uuid.uuid4() class TestWorkerLogging: - class CloudWorkerTestImpl(BaseWorker): + class CloudWorkerTestImpl(BaseWorker[Any, Any, Any]): type: str = "cloud_logging_test" - job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration + job_configuration = BaseJobConfiguration async def _send_worker_heartbeat(self, *_, **__): """ @@ -714,9 +825,9 @@ async def _send_worker_heartbeat(self, *_, **__): async def run(self, *_, **__): pass - class ServerWorkerTestImpl(BaseWorker): + class ServerWorkerTestImpl(BaseWorker[Any, Any, Any]): type: str = "server_logging_test" - job_configuration: Type[BaseJobConfiguration] = BaseJobConfiguration + job_configuration = BaseJobConfiguration async def run(self, *_, **__): pass @@ -738,7 +849,7 @@ def worker_handler(self): yield WorkerAPILogHandler() @pytest.fixture - def logger(self, worker_handler): + def logger(self, worker_handler: WorkerAPILogHandler): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(worker_handler) @@ -761,7 +872,9 @@ async def test_get_worker_logger_works_with_backend_id(self): assert logger.name == "prefect.workers.cloud_logging_test.test" assert logger.extra["worker_id"] == str(WORKER_ID) - async def test_worker_emits_logs_with_worker_id(self, caplog): + async def test_worker_emits_logs_with_worker_id( + self, caplog: pytest.LogCaptureFixture + ): async with self.CloudWorkerTestImpl( name="test", work_pool_name="test-work-pool" ) as worker: @@ -776,8 +889,9 @@ async def test_worker_emits_logs_with_worker_id(self, caplog): assert record_with_extras[0].worker_id == str(worker.backend_id) assert worker._logger.extra["worker_id"] == str(worker.backend_id) + @pytest.mark.usefixtures("worker_handler", "logging_to_api_enabled") async def test_worker_logger_sends_log_to_api_worker_when_connected_to_cloud( - self, mock_log_worker, worker_handler, logging_to_api_enabled + self, mock_log_worker: MagicMock ): async with self.CloudWorkerTestImpl( name="test", work_pool_name="test-work-pool" @@ -796,8 +910,9 @@ async def test_worker_logger_sends_log_to_api_worker_when_connected_to_cloud( assert len(log_statement) == 1 assert log_statement[0]["worker_id"] == str(worker.backend_id) + @pytest.mark.usefixtures("worker_handler", "logging_to_api_enabled") async def test_worker_logger_does_not_send_logs_when_not_connected_to_cloud( - self, mock_log_worker, worker_handler, logging_to_api_enabled + self, mock_log_worker: MagicMock ): async with self.ServerWorkerTestImpl( name="test", work_pool_name="test-work-pool" @@ -824,14 +939,24 @@ def log_dict(self): message="hello", ).model_dump(mode="json") - async def test_send_logs_single_record(self, log_dict, prefect_client, worker): + async def test_send_logs_single_record( + self, + log_dict: dict[str, Any], + prefect_client: PrefectClient, + worker: APILogWorker, + ): worker.send(log_dict) await worker.drain() logs = await prefect_client.read_logs() assert len(logs) == 1 assert logs[0].model_dump(include=log_dict.keys(), mode="json") == log_dict - async def test_send_logs_many_records(self, log_dict, prefect_client, worker): + async def test_send_logs_many_records( + self, + log_dict: dict[str, Any], + prefect_client: PrefectClient, + worker: APILogWorker, + ): # Use the read limit as the count since we'd need multiple read calls otherwise count = prefect.settings.PREFECT_API_DEFAULT_LIMIT.value() log_dict.pop("message") @@ -854,7 +979,11 @@ async def test_send_logs_many_records(self, log_dict, prefect_client, worker): assert len(set(log.message for log in logs)) == count, "Each log is unique" async def test_send_logs_writes_exceptions_to_stderr( - self, log_dict, capsys, monkeypatch, worker + self, + log_dict: dict[str, Any], + capsys: pytest.CaptureFixture[str], + monkeypatch: pytest.MonkeyPatch, + worker: APILogWorker, ): monkeypatch.setattr( "prefect.client.orchestration.PrefectClient.create_logs", @@ -868,7 +997,9 @@ async def test_send_logs_writes_exceptions_to_stderr( assert "--- Error logging to API ---" in err assert "ValueError: Test" in err - async def test_send_logs_batches_by_size(self, log_dict, monkeypatch): + async def test_send_logs_batches_by_size( + self, log_dict: dict[str, Any], monkeypatch: pytest.MonkeyPatch + ): mock_create_logs = AsyncMock() monkeypatch.setattr( "prefect.client.orchestration.PrefectClient.create_logs", mock_create_logs @@ -891,7 +1022,7 @@ async def test_send_logs_batches_by_size(self, log_dict, monkeypatch): assert mock_create_logs.call_count == 3 async def test_logs_are_sent_immediately_when_stopped( - self, log_dict, prefect_client + self, log_dict: dict[str, Any], prefect_client: PrefectClient ): # Set a long interval start_time = time.time() @@ -910,7 +1041,10 @@ async def test_logs_are_sent_immediately_when_stopped( assert len(logs) == 2 async def test_logs_are_sent_immediately_when_flushed( - self, log_dict, prefect_client, worker + self, + log_dict: dict[str, Any], + prefect_client: PrefectClient, + worker: APILogWorker, ): # Set a long interval start_time = time.time() @@ -928,7 +1062,7 @@ async def test_logs_are_sent_immediately_when_flushed( assert len(logs) == 2 async def test_logs_include_worker_id_if_available( - self, worker, log_dict, prefect_client + self, worker: APILogWorker, log_dict: dict[str, Any] ): worker_id = str(uuid.uuid4()) log_dict["worker_id"] = worker_id @@ -944,7 +1078,7 @@ async def test_logs_include_worker_id_if_available( assert logs[0]["worker_id"] == worker_id -def test_flow_run_logger(flow_run): +def test_flow_run_logger(flow_run: "FlowRun"): logger = flow_run_logger(flow_run) assert logger.name == "prefect.flow_runs" assert logger.extra == { @@ -954,7 +1088,7 @@ def test_flow_run_logger(flow_run): } -def test_flow_run_logger_with_flow(flow_run): +def test_flow_run_logger_with_flow(flow_run: "FlowRun"): @flow(name="foo") def test_flow(): pass @@ -963,13 +1097,13 @@ def test_flow(): assert logger.extra["flow_name"] == "foo" -def test_flow_run_logger_with_kwargs(flow_run): +def test_flow_run_logger_with_kwargs(flow_run: "FlowRun"): logger = flow_run_logger(flow_run, foo="test", flow_run_name="bar") assert logger.extra["foo"] == "test" assert logger.extra["flow_run_name"] == "bar" -def test_task_run_logger(task_run): +def test_task_run_logger(task_run: "TaskRun"): logger = task_run_logger(task_run) assert logger.name == "prefect.task_runs" assert logger.extra == { @@ -982,7 +1116,7 @@ def test_task_run_logger(task_run): } -def test_task_run_logger_with_task(task_run): +def test_task_run_logger_with_task(task_run: "TaskRun"): @task(name="task_run_logger_with_task") def test_task(): pass @@ -991,13 +1125,13 @@ def test_task(): assert logger.extra["task_name"] == "task_run_logger_with_task" -def test_task_run_logger_with_flow_run(task_run, flow_run): +def test_task_run_logger_with_flow_run(task_run: "TaskRun", flow_run: "FlowRun"): logger = task_run_logger(task_run, flow_run=flow_run) assert logger.extra["flow_run_id"] == str(task_run.flow_run_id) assert logger.extra["flow_run_name"] == flow_run.name -def test_task_run_logger_with_flow(task_run): +def test_task_run_logger_with_flow(task_run: "TaskRun"): @flow(name="foo") def test_flow(): pass @@ -1006,7 +1140,9 @@ def test_flow(): assert logger.extra["flow_name"] == "foo" -def test_task_run_logger_with_flow_run_from_context(task_run, flow_run): +def test_task_run_logger_with_flow_run_from_context( + task_run: "TaskRun", flow_run: "FlowRun" +): @flow(name="foo") def test_flow(): pass @@ -1020,7 +1156,9 @@ def test_flow(): assert logger.extra["flow_name"] == test_flow.name == "foo" -def test_run_logger_with_flow_run_context_without_parent_flow_run_id(caplog): +def test_run_logger_with_flow_run_context_without_parent_flow_run_id( + caplog: pytest.LogCaptureFixture, +): """Test that get_run_logger works when called from a constructed FlowRunContext""" with FlowRunContext.model_construct(flow_run=None, flow=None): @@ -1038,7 +1176,7 @@ def test_run_logger_with_flow_run_context_without_parent_flow_run_id(caplog): async def test_run_logger_with_task_run_context_without_parent_flow_run_id( - prefect_client, caplog + prefect_client: PrefectClient, caplog: pytest.LogCaptureFixture ): """Test that get_run_logger works when passed a constructed TaskRunContext""" @@ -1063,7 +1201,7 @@ def foo(): assert "test3141592" in caplog.text -def test_task_run_logger_with_kwargs(task_run): +def test_task_run_logger_with_kwargs(task_run: "TaskRun"): logger = task_run_logger(task_run, foo="test", task_run_name="bar") assert logger.extra["foo"] == "test" assert logger.extra["task_run_name"] == "bar" @@ -1080,7 +1218,8 @@ async def test_run_logger_with_explicit_context_of_invalid_type(): async def test_run_logger_with_explicit_context( - prefect_client, flow_run, local_filesystem + prefect_client: PrefectClient, + flow_run: "FlowRun", ): @task def foo(): @@ -1107,7 +1246,8 @@ def foo(): async def test_run_logger_with_explicit_context_overrides_existing( - prefect_client, flow_run, local_filesystem + prefect_client: PrefectClient, + flow_run: "FlowRun", ): @task def foo(): @@ -1129,7 +1269,7 @@ def bar(): assert logger.extra["task_name"] == bar.name -async def test_run_logger_in_flow(prefect_client): +async def test_run_logger_in_flow(prefect_client: PrefectClient): @flow def test_flow(): return get_run_logger() @@ -1145,7 +1285,7 @@ def test_flow(): } -async def test_run_logger_extra_data(prefect_client): +async def test_run_logger_extra_data(prefect_client: PrefectClient): @flow def test_flow(): return get_run_logger(foo="test", flow_name="bar") @@ -1162,7 +1302,7 @@ def test_flow(): } -async def test_run_logger_in_nested_flow(prefect_client): +async def test_run_logger_in_nested_flow(prefect_client: PrefectClient): @flow def child_flow(): return get_run_logger() @@ -1182,7 +1322,9 @@ def test_flow(): } -async def test_run_logger_in_task(prefect_client, events_pipeline): +async def test_run_logger_in_task( + prefect_client: PrefectClient, events_pipeline: "EventsPipeline" +): @task def test_task(): return get_run_logger() @@ -1216,7 +1358,7 @@ def handler(self): yield PrefectConsoleHandler() @pytest.fixture - def logger(self, handler): + def logger(self, handler: PrefectConsoleHandler): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(handler) @@ -1252,7 +1394,7 @@ def test_init_override_kwargs(self): ] assert handler.level == logging.DEBUG - def test_uses_stderr_by_default(self, capsys): + def test_uses_stderr_by_default(self, capsys: pytest.CaptureFixture[str]): logger = get_logger(uuid.uuid4().hex) logger.handlers = [PrefectConsoleHandler()] logger.info("Test!") @@ -1260,7 +1402,7 @@ def test_uses_stderr_by_default(self, capsys): assert stdout == "" assert "Test!" in stderr - def test_respects_given_stream(self, capsys): + def test_respects_given_stream(self, capsys: pytest.CaptureFixture[str]): logger = get_logger(uuid.uuid4().hex) logger.handlers = [PrefectConsoleHandler(stream=sys.stdout)] logger.info("Test!") @@ -1268,7 +1410,9 @@ def test_respects_given_stream(self, capsys): assert stderr == "" assert "Test!" in stdout - def test_includes_tracebacks_during_exceptions(self, capsys): + def test_includes_tracebacks_during_exceptions( + self, capsys: pytest.CaptureFixture[str] + ): logger = get_logger(uuid.uuid4().hex) logger.handlers = [PrefectConsoleHandler()] @@ -1283,7 +1427,9 @@ def test_includes_tracebacks_during_exceptions(self, capsys): assert 'raise ValueError("oh my")' in stderr assert "ValueError: oh my" in stderr - def test_does_not_word_wrap_or_crop_messages(self, capsys): + def test_does_not_word_wrap_or_crop_messages( + self, capsys: pytest.CaptureFixture[str] + ): logger = get_logger(uuid.uuid4().hex) handler = PrefectConsoleHandler() logger.handlers = [handler] @@ -1296,7 +1442,7 @@ def test_does_not_word_wrap_or_crop_messages(self, capsys): # There will be newlines in the middle if cropped assert "x" * 1000 in stderr - def test_outputs_square_brackets_as_text(self, capsys): + def test_outputs_square_brackets_as_text(self, capsys: pytest.CaptureFixture[str]): logger = get_logger(uuid.uuid4().hex) handler = PrefectConsoleHandler() logger.handlers = [handler] @@ -1307,7 +1453,7 @@ def test_outputs_square_brackets_as_text(self, capsys): _, stderr = capsys.readouterr() assert msg in stderr - def test_outputs_square_brackets_as_style(self, capsys): + def test_outputs_square_brackets_as_style(self, capsys: pytest.CaptureFixture[str]): with temporary_settings({PREFECT_LOGGING_MARKUP: True}): logger = get_logger(uuid.uuid4().hex) handler = PrefectConsoleHandler() @@ -1410,7 +1556,9 @@ def test_current_api_key_is_not_logged(self, caplog): assert test_api_key not in caplog.text assert obfuscate(test_api_key) in caplog.text - def test_current_api_key_is_not_logged_from_flow(self, caplog): + def test_current_api_key_is_not_logged_from_flow( + self, caplog: pytest.LogCaptureFixture + ): test_api_key = "i-am-a-plaintext-api-key-and-i-dream-of-being-logged-one-day" with temporary_settings({PREFECT_API_KEY: test_api_key}): @@ -1424,7 +1572,9 @@ def test_flow(): assert test_api_key not in caplog.text assert obfuscate(test_api_key) in caplog.text - def test_current_api_key_is_not_logged_from_flow_log_prints(self, caplog): + def test_current_api_key_is_not_logged_from_flow_log_prints( + self, caplog: pytest.LogCaptureFixture + ): test_api_key = "i-am-a-sneaky-little-api-key" with temporary_settings({PREFECT_API_KEY: test_api_key}): @@ -1437,7 +1587,9 @@ def test_flow(): assert test_api_key not in caplog.text assert obfuscate(test_api_key) in caplog.text - def test_current_api_key_is_not_logged_from_task(self, caplog): + def test_current_api_key_is_not_logged_from_task( + self, caplog: pytest.LogCaptureFixture + ): test_api_key = "i-am-jacks-security-risk" with temporary_settings({PREFECT_API_KEY: test_api_key}): @@ -1485,7 +1637,10 @@ def test_flow(): ], ) def test_redact_substr_from_collections( - self, caplog, raw_log_record, expected_log_record + self, + caplog: pytest.LogCaptureFixture, + raw_log_record: Any, + expected_log_record: Any, ): """ This is a regression test for https://github.com/PrefectHQ/prefect/issues/12139 @@ -1502,7 +1657,7 @@ def test_log_list(): assert str(expected_log_record) in caplog.text -def test_log_in_flow(caplog): +def test_log_in_flow(caplog: pytest.LogCaptureFixture): msg = "Hello world!" @flow @@ -1520,7 +1675,7 @@ def test_flow(): raise AssertionError(f"{msg} was not found in records: {caplog.records}") -def test_log_in_task(caplog): +def test_log_in_task(caplog: pytest.LogCaptureFixture): msg = "Hello world!" @task @@ -1541,14 +1696,14 @@ def test_flow(): raise AssertionError(f"{msg} was not found in records") -def test_without_disable_logger(caplog): +def test_without_disable_logger(caplog: pytest.LogCaptureFixture): """ Sanity test to double check whether caplog actually works so can be more confident in the asserts in test_disable_logger. """ logger = logging.getLogger("griffe.agents.nodes") - def function_with_logging(logger): + def function_with_logging(logger: logging.Logger): assert not logger.disabled logger.critical("it's enabled!") return 42 @@ -1558,7 +1713,7 @@ def function_with_logging(logger): assert ("griffe.agents.nodes", 50, "it's enabled!") in caplog.record_tuples -def test_disable_logger(caplog): +def test_disable_logger(caplog: pytest.LogCaptureFixture): logger = logging.getLogger("griffe.agents.nodes") def function_with_logging(logger): @@ -1573,7 +1728,7 @@ def function_with_logging(logger): assert caplog.record_tuples == [] -def test_disable_run_logger(caplog): +def test_disable_run_logger(caplog: pytest.LogCaptureFixture): @task def task_with_run_logger(): logger = get_run_logger() @@ -1595,7 +1750,9 @@ def task_with_run_logger(): assert caplog.record_tuples == [("null", logging.CRITICAL, "won't show")] -def test_patch_print_writes_to_stdout_without_run_context(caplog, capsys): +def test_patch_print_writes_to_stdout_without_run_context( + caplog: pytest.LogCaptureFixture, capsys: pytest.CaptureFixture[str] +): with patch_print(): print("foo") @@ -1605,7 +1762,9 @@ def test_patch_print_writes_to_stdout_without_run_context(caplog, capsys): @pytest.mark.parametrize("run_context_cls", [TaskRunContext, FlowRunContext]) def test_patch_print_writes_to_stdout_with_run_context_and_no_log_prints( - caplog, capsys, run_context_cls + caplog: pytest.LogCaptureFixture, + capsys: pytest.CaptureFixture[str], + run_context_cls: type, ): with patch_print(): with run_context_cls.model_construct(log_prints=False): @@ -1616,7 +1775,9 @@ def test_patch_print_writes_to_stdout_with_run_context_and_no_log_prints( def test_patch_print_does_not_write_to_logger_with_custom_file( - caplog, capsys, task_run + caplog: pytest.LogCaptureFixture, + capsys: pytest.CaptureFixture[str], + task_run: "TaskRun", ): string_io = StringIO() @@ -1635,7 +1796,11 @@ def my_task(): assert string_io.getvalue().rstrip() == "foo" -def test_patch_print_writes_to_logger_with_task_run_context(caplog, capsys, task_run): +def test_patch_print_writes_to_logger_with_task_run_context( + caplog: pytest.LogCaptureFixture, + capsys: pytest.CaptureFixture[str], + task_run: "TaskRun", +): @task def my_task(): pass @@ -1661,7 +1826,10 @@ def my_task(): @pytest.mark.parametrize("file", ["stdout", "stderr"]) def test_patch_print_writes_to_logger_with_explicit_file( - caplog, capsys, task_run, file + caplog: pytest.LogCaptureFixture, + capsys: pytest.CaptureFixture[str], + task_run: "TaskRun", + file: str, ): @task def my_task(): @@ -1689,7 +1857,11 @@ def my_task(): assert record.task_name == my_task.name -def test_patch_print_writes_to_logger_with_flow_run_context(caplog, capsys, flow_run): +def test_patch_print_writes_to_logger_with_flow_run_context( + caplog: pytest.LogCaptureFixture, + capsys: pytest.CaptureFixture[str], + flow_run: "FlowRun", +): @flow def my_flow(): pass @@ -1713,7 +1885,7 @@ def my_flow(): assert record.flow_name == my_flow.name -def test_log_adapter_get_child(flow_run): +def test_log_adapter_get_child(): logger = PrefectLogAdapter(get_logger("prefect.parent"), {"hello": "world"}) assert logger.extra == {"hello": "world"} diff --git a/tests/test_settings.py b/tests/test_settings.py index 0bc5063aadf0..583ade3d83eb 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -65,6 +65,7 @@ from prefect.settings.models.api import APISettings from prefect.settings.models.client import ClientSettings from prefect.settings.models.logging import LoggingSettings +from prefect.settings.models.results import ResultsSettings from prefect.settings.models.server import ServerSettings from prefect.settings.models.server.api import ServerAPISettings from prefect.settings.models.server.database import ( @@ -655,6 +656,41 @@ def test_settings_to_environment_roundtrip( new_settings = Settings() assert settings.model_dump() == new_settings.model_dump() + @pytest.mark.parametrize( + "settings,alias,expected", + [ + ( + Settings(server=ServerSettings(logging_level="DEBUG")), + "PREFECT_LOGGING_SERVER_LEVEL", + "DEBUG", + ), + ( + Settings(results=ResultsSettings(default_storage_block="foo/bar")), + "PREFECT_DEFAULT_RESULT_STORAGE_BLOCK", + "foo/bar", + ), + ], + ) + def test_settings_to_environment_includes_aliases( + self, settings: Settings, alias: str, expected: str + ): + assert ( + settings.to_environment_variables(include_aliases=True).get(alias) + == expected + ) + + @pytest.mark.parametrize( + "alias", + ["PREFECT_LOGGING_SERVER_LEVEL", "PREFECT_DEFAULT_RESULT_STORAGE_BLOCK"], + ) + def test_settings_to_environment_doesnt_include_aliases_by_default( + self, alias: str + ): + assert ( + Settings().to_environment_variables(include_aliases=False).get(alias) + is None + ) + def test_settings_hash_key(self): settings = Settings(testing=dict(test_mode=True)) # type: ignore diff_settings = Settings(testing=dict(test_mode=False)) # type: ignore