From b3eca87a28710ab92ec07ad92e36e880988890fe Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Tue, 24 Oct 2023 10:14:08 -0500 Subject: [PATCH] Python log forwarding Fixes #311 --- temporalio/bridge/Cargo.lock | 48 ++++++++------- temporalio/bridge/Cargo.toml | 5 +- temporalio/bridge/runtime.py | 4 ++ temporalio/bridge/src/lib.rs | 1 + temporalio/bridge/src/runtime.rs | 57 +++++++++++++++++- temporalio/runtime.py | 100 +++++++++++++++++++++++++++++-- tests/worker/test_workflow.py | 63 ++++++++++++++++++- 7 files changed, 246 insertions(+), 32 deletions(-) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index 4fa15f8f..b1ab65e1 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -381,7 +381,7 @@ dependencies = [ "autocfg", "cfg-if", "crossbeam-utils", - "memoffset 0.9.0", + "memoffset", "scopeguard", ] @@ -1160,15 +1160,6 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "memoffset" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" -dependencies = [ - "autocfg", -] - [[package]] name = "memoffset" version = "0.9.0" @@ -1711,14 +1702,14 @@ checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" [[package]] name = "pyo3" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b1ac5b3731ba34fdaa9785f8d74d17448cd18f30cf19e0c7e7b1fdb5272109" +checksum = "e681a6cfdc4adcc93b4d3cf993749a4552018ee0a9b65fc0ccfad74352c72a38" dependencies = [ "cfg-if", "indoc", "libc", - "memoffset 0.8.0", + "memoffset", "parking_lot", "pyo3-build-config", "pyo3-ffi", @@ -1728,9 +1719,9 @@ dependencies = [ [[package]] name = "pyo3-asyncio" -version = "0.18.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3564762e37035cfc486228e10b0528460fa026d681b5763873c693aa0d5c260" +checksum = "a2cc34c1f907ca090d7add03dc523acdd91f3a4dab12286604951e2f5152edad" dependencies = [ "futures", "once_cell", @@ -1741,9 +1732,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cb946f5ac61bb61a5014924910d936ebd2b23b705f7a4a3c40b05c720b079a3" +checksum = "076c73d0bc438f7a4ef6fdd0c3bb4732149136abd952b110ac93e4edb13a6ba5" dependencies = [ "once_cell", "target-lexicon", @@ -1751,9 +1742,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd4d7c5337821916ea2a1d21d1092e8443cf34879e53a0ac653fbb98f44ff65c" +checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9" dependencies = [ "libc", "pyo3-build-config", @@ -1761,9 +1752,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d39c55dab3fc5a4b25bbd1ac10a2da452c4aca13bb450f22818a002e29648d" +checksum = "dfeb4c99597e136528c6dd7d5e3de5434d1ceaf487436a3f03b2d56b6fc9efd1" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -1773,15 +1764,25 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.18.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97daff08a4c48320587b5224cc98d609e3c27b6d437315bd40b605c98eeb5918" +checksum = "947dc12175c254889edc0c02e399476c2f652b4b9ebd123aa655c224de259536" dependencies = [ "proc-macro2", "quote", "syn 1.0.109", ] +[[package]] +name = "pythonize" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e35b716d430ace57e2d1b4afb51c9e5b7c46d2bce72926e07f9be6a98ced03e" +dependencies = [ + "pyo3", + "serde", +] + [[package]] name = "quanta" version = "0.11.1" @@ -2371,6 +2372,7 @@ dependencies = [ "prost-types", "pyo3", "pyo3-asyncio", + "pythonize", "temporal-client", "temporal-sdk-core", "temporal-sdk-core-api", diff --git a/temporalio/bridge/Cargo.toml b/temporalio/bridge/Cargo.toml index a2fa315b..c957b92b 100644 --- a/temporalio/bridge/Cargo.toml +++ b/temporalio/bridge/Cargo.toml @@ -13,8 +13,9 @@ once_cell = "1.16.0" parking_lot = "0.12" prost = "0.11" prost-types = "0.11" -pyo3 = { version = "0.18", features = ["extension-module", "abi3-py37"] } -pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] } +pyo3 = { version = "0.19", features = ["extension-module", "abi3-py37"] } +pyo3-asyncio = { version = "0.19", features = ["tokio-runtime"] } +pythonize = "0.19" temporal-client = { version = "0.1.0", path = "./sdk-core/client" } temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] } temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" } diff --git a/temporalio/bridge/runtime.py b/temporalio/bridge/runtime.py index f86c4a87..6998938a 100644 --- a/temporalio/bridge/runtime.py +++ b/temporalio/bridge/runtime.py @@ -29,6 +29,10 @@ def retrieve_buffered_metrics(self) -> Sequence[Any]: """Get buffered metrics.""" return self._ref.retrieve_buffered_metrics() + def retrieve_buffered_logs(self) -> Sequence[Any]: + """Get buffered logs.""" + return self._ref.retrieve_buffered_logs() + @dataclass(frozen=True) class LoggingConfig: diff --git a/temporalio/bridge/src/lib.rs b/temporalio/bridge/src/lib.rs index 660ce94a..6d10dea4 100644 --- a/temporalio/bridge/src/lib.rs +++ b/temporalio/bridge/src/lib.rs @@ -26,6 +26,7 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> { // Runtime stuff m.add_class::()?; + m.add_class::()?; m.add_function(wrap_pyfunction!(init_runtime, m)?)?; m.add_function(wrap_pyfunction!(raise_in_thread, m)?)?; diff --git a/temporalio/bridge/src/runtime.rs b/temporalio/bridge/src/runtime.rs index dff05921..8f8ac4aa 100644 --- a/temporalio/bridge/src/runtime.rs +++ b/temporalio/bridge/src/runtime.rs @@ -1,6 +1,7 @@ use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::*; use pyo3::AsPyPointer; +use pythonize::pythonize; use std::collections::HashMap; use std::future::Future; use std::net::SocketAddr; @@ -14,8 +15,8 @@ use temporal_sdk_core::telemetry::{ use temporal_sdk_core::CoreRuntime; use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer}; use temporal_sdk_core_api::telemetry::{ - Logger, MetricTemporality, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder, - TelemetryOptions, TelemetryOptionsBuilder, + CoreLog, CoreTelemetry, Logger, MetricTemporality, OtelCollectorOptionsBuilder, + PrometheusExporterOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder, }; use url::Url; @@ -32,6 +33,12 @@ pub(crate) struct Runtime { metrics_call_buffer: Option>>, } +// WARNING: This must match temporalio.runtime.BufferedLogEntry protocol +#[pyclass] +pub struct BufferedLogEntry { + core_log: CoreLog, +} + #[derive(FromPyObject)] pub struct TelemetryConfig { logging: Option, @@ -138,6 +145,52 @@ impl RuntimeRef { .retrieve(), ) } + + fn retrieve_buffered_logs(&self) -> Vec { + self.runtime + .core + .telemetry() + .fetch_buffered_logs() + .into_iter() + .map(|core_log| BufferedLogEntry { core_log }) + .collect() + } +} + +// WARNING: This must match temporalio.runtime.BufferedLogEntry protocol +#[pymethods] +impl BufferedLogEntry { + #[getter] + fn target(&self) -> &str { + &self.core_log.target + } + + #[getter] + fn message(&self) -> &str { + &self.core_log.message + } + + #[getter] + fn timestamp_millis(&self) -> u128 { + self.core_log.millis_since_epoch() + } + + #[getter] + fn level(&self) -> &str { + self.core_log.level.as_str() + } + + #[getter] + fn fields(&self, py: Python<'_>) -> PyResult> { + self.core_log + .fields + .iter() + .map(|(key, value)| match pythonize(py, value) { + Ok(value) => Ok((key.as_str(), value)), + Err(err) => Err(err.into()), + }) + .collect() + } } impl TryFrom<&TelemetryConfig> for TelemetryOptions { diff --git a/temporalio/runtime.py b/temporalio/runtime.py index b2650e9d..72c01d18 100644 --- a/temporalio/runtime.py +++ b/temporalio/runtime.py @@ -8,9 +8,9 @@ from dataclasses import dataclass, field from datetime import timedelta from enum import Enum -from typing import ClassVar, Mapping, NewType, Optional, Sequence, Union +from typing import Any, ClassVar, Dict, Mapping, NewType, Optional, Sequence, Union -from typing_extensions import Literal, Protocol +from typing_extensions import Protocol import temporalio.bridge.metric import temporalio.bridge.runtime @@ -70,6 +70,8 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None: self._core_runtime = temporalio.bridge.runtime.Runtime( telemetry=telemetry._to_bridge_config() ) + if telemetry.logging and isinstance(telemetry.logging.buffer, LogBuffer): + telemetry.logging.buffer._runtime = self if isinstance(telemetry.metrics, MetricBuffer): telemetry.metrics._runtime = self core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime) @@ -114,6 +116,10 @@ class LoggingConfig: filter: Union[TelemetryFilter, str] """Filter for logging. Can use :py:class:`TelemetryFilter` or raw string.""" + buffer: Optional[LogBuffer] = None + """Buffer to send logs too. If set, logs are sent to this buffer instead of + console.""" + default: ClassVar[LoggingConfig] """Default logging configuration of Core WARN level and other ERROR level. @@ -124,8 +130,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.LoggingConfig: filter=self.filter if isinstance(self.filter, str) else self.filter.formatted(), - # Log forwarding not currently supported in Python - forward=False, + forward=self.buffer is not None, ) @@ -220,6 +225,93 @@ def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]: return self._runtime._core_runtime.retrieve_buffered_metrics() +class LogBuffer: + """A buffer that can be set on :py:class:`LoggnigConfig` to record logs + instead of having them sent to console. + + .. warning:: + It is important that :py:meth:`retrieve_logs` is called regularly to + drain the buffer or log entries may be lost. + """ + + def __init__(self) -> None: + """Create a log buffer.""" + self._runtime: Optional[Runtime] = None + + def retrieve_logs(self) -> Sequence[BufferedLogEntry]: + """Drain the buffer and return all log entries. + + .. warning:: + It is important that this is called regularly. See + :py:class:`LogBuffer` warning. + + Returns: + A sequence of log entries. + """ + if not self._runtime: + raise RuntimeError("Attempting to retrieve logs before runtime created") + return self._runtime._core_runtime.retrieve_buffered_logs() + + +BufferedLogLevel = NewType("BufferedLogLevel", str) +"""Representation of a log level for a buffered log entry.""" + +BUFFERED_LOG_LEVEL_TRACE = BufferedLogLevel("TRACE") +"""Trace log level.""" + +BUFFERED_LOG_LEVEL_DEBUG = BufferedLogLevel("DEBUG") +"""Debug log level.""" + +BUFFERED_LOG_LEVEL_INFO = BufferedLogLevel("INFO") +"""Info log level.""" + +BUFFERED_LOG_LEVEL_WARN = BufferedLogLevel("WARN") +"""Warn log level.""" + +BUFFERED_LOG_LEVEL_ERROR = BufferedLogLevel("ERROR") +"""Error log level.""" + + +# WARNING: This must match Rust runtime::BufferedLogEntry +class BufferedLogEntry(Protocol): + """A buffered log entry.""" + + @property + def target(self) -> str: + """Target category for the log entry.""" + ... + + @property + def message(self) -> str: + """Log message.""" + ... + + @property + def timestamp_millis(self) -> int: + """Milliseconds since Unix epoch.""" + ... + + @property + def level(self) -> BufferedLogLevel: + """Log level.""" + ... + + @property + def fields(self) -> Dict[str, Any]: + """Additional log entry fields. + + Requesting this property performs a conversion from the internal + representation to the Python representation on every request. Therefore + callers should store the result instead of repeatedly calling. + + Raises: + Exception: If the internal representation cannot be converted. This + should not happen and if it does it is considered a bug in the + SDK and should be reported. + """ + ... + + @dataclass(frozen=True) class TelemetryConfig: """Configuration for Core telemetry.""" diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 666863e2..71c1b5fc 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -3,7 +3,6 @@ import json import logging import logging.handlers -import pickle import queue import sys import threading @@ -67,8 +66,11 @@ WorkflowAlreadyStartedError, ) from temporalio.runtime import ( + BUFFERED_LOG_LEVEL_WARN, BUFFERED_METRIC_KIND_COUNTER, BUFFERED_METRIC_KIND_HISTOGRAM, + LogBuffer, + LoggingConfig, MetricBuffer, PrometheusConfig, Runtime, @@ -3503,3 +3505,62 @@ async def test_workflow_buffered_metrics(client: Client): and update.value == 1 for update in updates ) + + +buffered_log_workflow_first_run = True + + +@workflow.defn(sandboxed=False) +class BufferedLogWorkflow: + @workflow.run + async def run(self) -> None: + # We will fail the task only once to generate a log + global buffered_log_workflow_first_run + if buffered_log_workflow_first_run: + buffered_log_workflow_first_run = False + raise RuntimeError("Intentional task failure") + + +async def test_workflow_buffered_logs(client: Client): + # Create client with log buffer runtime + buffer = LogBuffer() + runtime = Runtime( + telemetry=TelemetryConfig( + logging=dataclasses.replace(LoggingConfig.default, buffer=buffer) + ) + ) + client = await Client.connect( + client.service_client.config.target_host, + namespace=client.namespace, + runtime=runtime, + ) + + # Run workflow which will issue one failing task log + async with new_worker(client, BufferedLogWorkflow) as worker: + handle = await client.start_workflow( + BufferedLogWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + await handle.result() + + # Check the log + # XXX: We accept that we're coding this test to internal core expectations + # of log format. Developers will have to update these assertions if/when + # that logic changes. + logs = buffer.retrieve_logs() + assert len(logs) == 1 + assert logs[0].target == "temporal_sdk_core::worker::workflow" + assert logs[0].message == "Failing workflow task" + # Confirm we're within 5m of accurate time + timestamp = datetime.fromtimestamp( + logs[0].timestamp_millis / 1000.0, tz=timezone.utc + ) + assert abs(datetime.now(tz=timezone.utc) - timestamp).total_seconds() < 300 + assert logs[0].level == BUFFERED_LOG_LEVEL_WARN + fields = logs[0].fields + assert ( + isinstance(fields["failure"], str) + and "Intentional task failure" in fields["failure"] + ) + assert fields["run_id"] == handle.result_run_id