Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log forwarding #405

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions temporalio/bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ once_cell = "1.16.0"
parking_lot = "0.12"
prost = "0.11"
prost-types = "0.11"
pyo3 = { version = "0.18", features = ["extension-module", "abi3-py37"] }
pyo3-asyncio = { version = "0.18", features = ["tokio-runtime"] }
pyo3 = { version = "0.19", features = ["extension-module", "abi3-py37"] }
pyo3-asyncio = { version = "0.19", features = ["tokio-runtime"] }
pythonize = "0.19"
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
Expand Down
4 changes: 4 additions & 0 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def retrieve_buffered_metrics(self) -> Sequence[Any]:
"""Get buffered metrics."""
return self._ref.retrieve_buffered_metrics()

def retrieve_buffered_logs(self) -> Sequence[Any]:
"""Get buffered logs."""
return self._ref.retrieve_buffered_logs()


@dataclass(frozen=True)
class LoggingConfig:
Expand Down
1 change: 1 addition & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {

// Runtime stuff
m.add_class::<runtime::RuntimeRef>()?;
m.add_class::<runtime::BufferedLogEntry>()?;
m.add_function(wrap_pyfunction!(init_runtime, m)?)?;
m.add_function(wrap_pyfunction!(raise_in_thread, m)?)?;

Expand Down
57 changes: 55 additions & 2 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use pyo3::exceptions::{PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::AsPyPointer;
use pythonize::pythonize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nifty

use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
Expand All @@ -14,8 +15,8 @@ use temporal_sdk_core::telemetry::{
use temporal_sdk_core::CoreRuntime;
use temporal_sdk_core_api::telemetry::metrics::{CoreMeter, MetricCallBufferer};
use temporal_sdk_core_api::telemetry::{
Logger, MetricTemporality, OtelCollectorOptionsBuilder, PrometheusExporterOptionsBuilder,
TelemetryOptions, TelemetryOptionsBuilder,
CoreLog, CoreTelemetry, Logger, MetricTemporality, OtelCollectorOptionsBuilder,
PrometheusExporterOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder,
};
use url::Url;

Expand All @@ -32,6 +33,12 @@ pub(crate) struct Runtime {
metrics_call_buffer: Option<Arc<MetricsCallBuffer<BufferedMetricRef>>>,
}

// WARNING: This must match temporalio.runtime.BufferedLogEntry protocol
#[pyclass]
pub struct BufferedLogEntry {
core_log: CoreLog,
}

#[derive(FromPyObject)]
pub struct TelemetryConfig {
logging: Option<LoggingConfig>,
Expand Down Expand Up @@ -138,6 +145,52 @@ impl RuntimeRef {
.retrieve(),
)
}

fn retrieve_buffered_logs(&self) -> Vec<BufferedLogEntry> {
self.runtime
.core
.telemetry()
.fetch_buffered_logs()
.into_iter()
.map(|core_log| BufferedLogEntry { core_log })
.collect()
}
}

// WARNING: This must match temporalio.runtime.BufferedLogEntry protocol
#[pymethods]
impl BufferedLogEntry {
#[getter]
fn target(&self) -> &str {
&self.core_log.target
}

#[getter]
fn message(&self) -> &str {
&self.core_log.message
}

#[getter]
fn timestamp_millis(&self) -> u128 {
self.core_log.millis_since_epoch()
}

#[getter]
fn level(&self) -> &str {
self.core_log.level.as_str()
}

#[getter]
fn fields(&self, py: Python<'_>) -> PyResult<HashMap<&str, PyObject>> {
self.core_log
.fields
.iter()
.map(|(key, value)| match pythonize(py, value) {
Ok(value) => Ok((key.as_str(), value)),
Err(err) => Err(err.into()),
})
.collect()
}
}

impl TryFrom<&TelemetryConfig> for TelemetryOptions {
Expand Down
100 changes: 96 additions & 4 deletions temporalio/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from typing import ClassVar, Mapping, NewType, Optional, Sequence, Union
from typing import Any, ClassVar, Dict, Mapping, NewType, Optional, Sequence, Union

from typing_extensions import Literal, Protocol
from typing_extensions import Protocol

import temporalio.bridge.metric
import temporalio.bridge.runtime
Expand Down Expand Up @@ -70,6 +70,8 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
self._core_runtime = temporalio.bridge.runtime.Runtime(
telemetry=telemetry._to_bridge_config()
)
if telemetry.logging and isinstance(telemetry.logging.buffer, LogBuffer):
telemetry.logging.buffer._runtime = self
if isinstance(telemetry.metrics, MetricBuffer):
telemetry.metrics._runtime = self
core_meter = temporalio.bridge.metric.MetricMeter.create(self._core_runtime)
Expand Down Expand Up @@ -114,6 +116,10 @@ class LoggingConfig:
filter: Union[TelemetryFilter, str]
"""Filter for logging. Can use :py:class:`TelemetryFilter` or raw string."""

buffer: Optional[LogBuffer] = None
"""Buffer to send logs too. If set, logs are sent to this buffer instead of
console."""

default: ClassVar[LoggingConfig]
"""Default logging configuration of Core WARN level and other ERROR
level.
Expand All @@ -124,8 +130,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.LoggingConfig:
filter=self.filter
if isinstance(self.filter, str)
else self.filter.formatted(),
# Log forwarding not currently supported in Python
forward=False,
forward=self.buffer is not None,
)


Expand Down Expand Up @@ -220,6 +225,93 @@ def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]:
return self._runtime._core_runtime.retrieve_buffered_metrics()


class LogBuffer:
"""A buffer that can be set on :py:class:`LoggnigConfig` to record logs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""A buffer that can be set on :py:class:`LoggnigConfig` to record logs
"""A buffer that can be set on :py:class:`LoggingConfig` to record logs

instead of having them sent to console.

.. warning::
It is important that :py:meth:`retrieve_logs` is called regularly to
drain the buffer or log entries may be lost.
"""

def __init__(self) -> None:
"""Create a log buffer."""
self._runtime: Optional[Runtime] = None

def retrieve_logs(self) -> Sequence[BufferedLogEntry]:
"""Drain the buffer and return all log entries.

.. warning::
It is important that this is called regularly. See
:py:class:`LogBuffer` warning.

Returns:
A sequence of log entries.
"""
if not self._runtime:
raise RuntimeError("Attempting to retrieve logs before runtime created")
return self._runtime._core_runtime.retrieve_buffered_logs()


BufferedLogLevel = NewType("BufferedLogLevel", str)
"""Representation of a log level for a buffered log entry."""

BUFFERED_LOG_LEVEL_TRACE = BufferedLogLevel("TRACE")
"""Trace log level."""

BUFFERED_LOG_LEVEL_DEBUG = BufferedLogLevel("DEBUG")
"""Debug log level."""

BUFFERED_LOG_LEVEL_INFO = BufferedLogLevel("INFO")
"""Info log level."""

BUFFERED_LOG_LEVEL_WARN = BufferedLogLevel("WARN")
"""Warn log level."""

BUFFERED_LOG_LEVEL_ERROR = BufferedLogLevel("ERROR")
"""Error log level."""


# WARNING: This must match Rust runtime::BufferedLogEntry
class BufferedLogEntry(Protocol):
"""A buffered log entry."""

@property
def target(self) -> str:
"""Target category for the log entry."""
...

@property
def message(self) -> str:
"""Log message."""
...

@property
def timestamp_millis(self) -> int:
"""Milliseconds since Unix epoch."""
...

@property
def level(self) -> BufferedLogLevel:
"""Log level."""
...

@property
def fields(self) -> Dict[str, Any]:
"""Additional log entry fields.

Requesting this property performs a conversion from the internal
representation to the Python representation on every request. Therefore
callers should store the result instead of repeatedly calling.

Raises:
Exception: If the internal representation cannot be converted. This
should not happen and if it does it is considered a bug in the
SDK and should be reported.
"""
...


@dataclass(frozen=True)
class TelemetryConfig:
"""Configuration for Core telemetry."""
Expand Down
Loading
Loading