Skip to content

Commit

Permalink
Python log forwarding
Browse files Browse the repository at this point in the history
Fixes #311
  • Loading branch information
cretz committed Oct 24, 2023
1 parent 4242dfb commit b3eca87
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 32 deletions.
48 changes: 25 additions & 23 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ once_cell = "1.16.0"
parking_lot = "0.12"
prost = "0.11"
prost-types = "0.11"
pyo3 = { version = "0.18", features = ["extension-module", "abi3-py37"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
pyo3 = { version = "0.19", features = ["extension-module", "abi3-py37"] }
pyo3-asyncio = { version = "0.19", features = ["tokio-runtime"] }
pythonize = "0.19"
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
Expand Down
4 changes: 4 additions & 0 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def retrieve_buffered_metrics(self) -> Sequence[Any]:
"""Get buffered metrics."""
return self._ref.retrieve_buffered_metrics()

def retrieve_buffered_logs(self) -> Sequence[Any]:
"""Get buffered logs."""
return self._ref.retrieve_buffered_logs()


@dataclass(frozen=True)
class LoggingConfig:
Expand Down
1 change: 1 addition & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {

// Runtime stuff
m.add_class::<runtime::RuntimeRef>()?;
m.add_class::<runtime::BufferedLogEntry>()?;
m.add_function(wrap_pyfunction!(init_runtime, m)?)?;
m.add_function(wrap_pyfunction!(raise_in_thread, m)?)?;

Expand Down
57 changes: 55 additions & 2 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::AsPyPointer;
use pythonize::pythonize;
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
Expand All @@ -14,8 +15,8 @@ use temporal_sdk_core::telemetry::{
use temporal_sdk_core::CoreRuntime;
use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer};
use temporal_sdk_core_api::telemetry::{
Logger, MetricTemporality, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
TelemetryOptions, TelemetryOptionsBuilder,
CoreLog, CoreTelemetry, Logger, MetricTemporality, OtelCollectorOptionsBuilder,
PrometheusExporterOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder,
};
use url::Url;

Expand All @@ -32,6 +33,12 @@ pub(crate) struct Runtime {
metrics_call_buffer: Option<Arc<MetricsCallBuffer<BufferedMetricRef>>>,
}

// WARNING: This must match temporalio.runtime.BufferedLogEntry protocol
#[pyclass]
pub struct BufferedLogEntry {
core_log: CoreLog,
}

#[derive(FromPyObject)]
pub struct TelemetryConfig {
logging: Option<LoggingConfig>,
Expand Down Expand Up @@ -138,6 +145,52 @@ impl RuntimeRef {
.retrieve(),
)
}

fn retrieve_buffered_logs(&self) -> Vec<BufferedLogEntry> {
self.runtime
.core
.telemetry()
.fetch_buffered_logs()
.into_iter()
.map(|core_log| BufferedLogEntry { core_log })
.collect()
}
}

// WARNING: This must match temporalio.runtime.BufferedLogEntry protocol
#[pymethods]
impl BufferedLogEntry {
#[getter]
fn target(&self) -> &str {
&self.core_log.target
}

#[getter]
fn message(&self) -> &str {
&self.core_log.message
}

#[getter]
fn timestamp_millis(&self) -> u128 {
self.core_log.millis_since_epoch()
}

#[getter]
fn level(&self) -> &str {
self.core_log.level.as_str()
}

#[getter]
fn fields(&self, py: Python<'_>) -> PyResult<HashMap<&str, PyObject>> {
self.core_log
.fields
.iter()
.map(|(key, value)| match pythonize(py, value) {
Ok(value) => Ok((key.as_str(), value)),
Err(err) => Err(err.into()),
})
.collect()
}
}

impl TryFrom<&TelemetryConfig> for TelemetryOptions {
Expand Down
100 changes: 96 additions & 4 deletions temporalio/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from typing import ClassVar, Mapping, NewType, Optional, Sequence, Union
from typing import Any, ClassVar, Dict, Mapping, NewType, Optional, Sequence, Union

from typing_extensions import Literal, Protocol
from typing_extensions import Protocol

import temporalio.bridge.metric
import temporalio.bridge.runtime
Expand Down Expand Up @@ -70,6 +70,8 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
self._core_runtime = temporalio.bridge.runtime.Runtime(
telemetry=telemetry._to_bridge_config()
)
if telemetry.logging and isinstance(telemetry.logging.buffer, LogBuffer):
telemetry.logging.buffer._runtime = self
if isinstance(telemetry.metrics, MetricBuffer):
telemetry.metrics._runtime = self
core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime)
Expand Down Expand Up @@ -114,6 +116,10 @@ class LoggingConfig:
filter: Union[TelemetryFilter, str]
"""Filter for logging. Can use :py:class:`TelemetryFilter` or raw string."""

buffer: Optional[LogBuffer] = None
"""Buffer to send logs too. If set, logs are sent to this buffer instead of
console."""

default: ClassVar[LoggingConfig]
"""Default logging configuration of Core WARN level and other ERROR
level.
Expand All @@ -124,8 +130,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.LoggingConfig:
filter=self.filter
if isinstance(self.filter, str)
else self.filter.formatted(),
# Log forwarding not currently supported in Python
forward=False,
forward=self.buffer is not None,
)


Expand Down Expand Up @@ -220,6 +225,93 @@ def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]:
return self._runtime._core_runtime.retrieve_buffered_metrics()


class LogBuffer:
"""A buffer that can be set on :py:class:`LoggnigConfig` to record logs
instead of having them sent to console.
.. warning::
It is important that :py:meth:`retrieve_logs` is called regularly to
drain the buffer or log entries may be lost.
"""

def __init__(self) -> None:
"""Create a log buffer."""
self._runtime: Optional[Runtime] = None

def retrieve_logs(self) -> Sequence[BufferedLogEntry]:
"""Drain the buffer and return all log entries.
.. warning::
It is important that this is called regularly. See
:py:class:`LogBuffer` warning.
Returns:
A sequence of log entries.
"""
if not self._runtime:
raise RuntimeError("Attempting to retrieve logs before runtime created")
return self._runtime._core_runtime.retrieve_buffered_logs()


BufferedLogLevel = NewType("BufferedLogLevel", str)
"""Representation of a log level for a buffered log entry."""

BUFFERED_LOG_LEVEL_TRACE = BufferedLogLevel("TRACE")
"""Trace log level."""

BUFFERED_LOG_LEVEL_DEBUG = BufferedLogLevel("DEBUG")
"""Debug log level."""

BUFFERED_LOG_LEVEL_INFO = BufferedLogLevel("INFO")
"""Info log level."""

BUFFERED_LOG_LEVEL_WARN = BufferedLogLevel("WARN")
"""Warn log level."""

BUFFERED_LOG_LEVEL_ERROR = BufferedLogLevel("ERROR")
"""Error log level."""


# WARNING: This must match Rust runtime::BufferedLogEntry
class BufferedLogEntry(Protocol):
"""A buffered log entry."""

@property
def target(self) -> str:
"""Target category for the log entry."""
...

@property
def message(self) -> str:
"""Log message."""
...

@property
def timestamp_millis(self) -> int:
"""Milliseconds since Unix epoch."""
...

@property
def level(self) -> BufferedLogLevel:
"""Log level."""
...

@property
def fields(self) -> Dict[str, Any]:
"""Additional log entry fields.
Requesting this property performs a conversion from the internal
representation to the Python representation on every request. Therefore
callers should store the result instead of repeatedly calling.
Raises:
Exception: If the internal representation cannot be converted. This
should not happen and if it does it is considered a bug in the
SDK and should be reported.
"""
...


@dataclass(frozen=True)
class TelemetryConfig:
"""Configuration for Core telemetry."""
Expand Down
Loading

0 comments on commit b3eca87

Please sign in to comment.