Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom metric support #384

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,10 @@ class _Context:
Type[temporalio.converter.PayloadConverter],
temporalio.converter.PayloadConverter,
]
runtime_metric_meter: Optional[temporalio.common.MetricMeter]
_logger_details: Optional[Mapping[str, Any]] = None
_payload_converter: Optional[temporalio.converter.PayloadConverter] = None
_metric_meter: Optional[temporalio.common.MetricMeter] = None

@staticmethod
def current() -> _Context:
Expand Down Expand Up @@ -185,6 +187,29 @@ def payload_converter(self) -> temporalio.converter.PayloadConverter:
self._payload_converter = self.payload_converter_class_or_instance()
return self._payload_converter

@property
def metric_meter(self) -> temporalio.common.MetricMeter:
# If there isn't a runtime metric meter, then we're in a non-threaded
# sync function and we don't support cross-process metrics
if not self.runtime_metric_meter:
raise RuntimeError(
"Metrics meter not available in non-threaded sync activities like mulitprocess"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Metrics meter not available in non-threaded sync activities like mulitprocess"
"Metrics meter not available in non-threaded sync activities like multiprocess"

)
# Create the meter lazily if not already created. We are ok creating
# multiple in the rare race where a user calls this property on
# different threads inside the same activity. The meter is immutable and
# it's better than a lock.
if not self._metric_meter:
info = self.info()
self._metric_meter = self.runtime_metric_meter.with_additional_attributes(
{
"namespace": info.workflow_namespace,
"task_queue": info.task_queue,
"activity_type": info.activity_type,
}
)
return self._metric_meter


@dataclass
class _CompositeEvent:
Expand Down Expand Up @@ -377,6 +402,24 @@ def payload_converter() -> temporalio.converter.PayloadConverter:
return _Context.current().payload_converter


def metric_meter() -> temporalio.common.MetricMeter:
"""Get the metric meter for the current activity.

.. warning::
This is only available in async or synchronous threaded activities. An
error is raised on non-thread-based sync activities when trying to
access this.

Returns:
Current metric meter for this activity for recording metrics.

Raises:
RuntimeError: When not in an activity or in a non-thread-based
synchronous activity.
"""
return _Context.current().metric_meter


class LoggerAdapter(logging.LoggerAdapter):
"""Adapter that adds details to the log about the running activity.

Expand Down
111 changes: 111 additions & 0 deletions temporalio/bridge/metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Metrics using SDK Core. (unstable)

Nothing in this module should be considered stable. The API may change.
"""

from __future__ import annotations

from typing import Mapping, Optional, Union

import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge


class MetricMeter:
"""Metric meter using SDK Core."""

@staticmethod
def create(runtime: temporalio.bridge.runtime.Runtime) -> Optional[MetricMeter]:
"""Create optional metric meter."""
ref = temporalio.bridge.temporal_sdk_bridge.new_metric_meter(runtime._ref)
if not ref:
return None
return MetricMeter(ref)
Comment on lines +21 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a personal Python style suggestion, obviously FFTI.

Suggested change
if not ref:
return None
return MetricMeter(ref)
return MetricMeter(ref) if ref else None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to avoid the RHS ternary in some of these cases, but you'll see I use it in others. I may use it here.


def __init__(
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricMeterRef
) -> None:
"""Initialize metric meter."""
self._ref = ref
self._default_attributes = MetricAttributes(ref.default_attributes)

@property
def default_attributes(self) -> MetricAttributes:
"""Default attributes for the metric meter."""
return self._default_attributes


class MetricCounter:
"""Metric counter using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize counter metric."""
self._ref = meter._ref.new_counter(name, description, unit)

def add(self, value: int, attrs: MetricAttributes) -> None:
"""Add value to counter."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.add(value, attrs._ref)


class MetricHistogram:
"""Metric histogram using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize histogram."""
self._ref = meter._ref.new_histogram(name, description, unit)

def record(self, value: int, attrs: MetricAttributes) -> None:
"""Record value on histogram."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.record(value, attrs._ref)


class MetricGauge:
"""Metric gauge using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize gauge."""
self._ref = meter._ref.new_gauge(name, description, unit)

def set(self, value: int, attrs: MetricAttributes) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't value supposed to be a float here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened an issue in sdk-core temporalio/sdk-core#604

Copy link
Member Author

@cretz cretz Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that should be a blocker for this PR

"""Set value on gauge."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.set(value, attrs._ref)


class MetricAttributes:
"""Metric attributes using SDK Core."""

def __init__(
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef
) -> None:
"""Initialize attributes."""
self._ref = ref

def with_additional_attributes(
self, new_attrs: Mapping[str, Union[str, int, float, bool]]
) -> MetricAttributes:
"""Create new attributes with new attributes appended."""
return MetricAttributes(self._ref.with_additional_attributes(new_attrs))
14 changes: 14 additions & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pyo3::prelude::*;
use pyo3::types::PyTuple;

mod client;
mod metric;
mod runtime;
mod testing;
mod worker;
Expand All @@ -13,6 +14,14 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<client::ClientRef>()?;
m.add_function(wrap_pyfunction!(connect_client, m)?)?;

// Metric stuff
m.add_class::<metric::MetricMeterRef>()?;
m.add_class::<metric::MetricAttributesRef>()?;
m.add_class::<metric::MetricCounterRef>()?;
m.add_class::<metric::MetricHistogramRef>()?;
m.add_class::<metric::MetricGaugeRef>()?;
m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?;

// Runtime stuff
m.add_class::<runtime::RuntimeRef>()?;
m.add_function(wrap_pyfunction!(init_runtime, m)?)?;
Expand Down Expand Up @@ -44,6 +53,11 @@ fn connect_client<'a>(
client::connect_client(py, &runtime_ref, config)
}

#[pyfunction]
fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<metric::MetricMeterRef> {
metric::new_metric_meter(&runtime_ref)
}

#[pyfunction]
fn init_runtime(telemetry_config: runtime::TelemetryConfig) -> PyResult<runtime::RuntimeRef> {
runtime::init_runtime(telemetry_config)
Expand Down
175 changes: 175 additions & 0 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::{collections::HashMap, sync::Arc};

use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use temporal_sdk_core_api::telemetry::metrics;

use crate::runtime;

#[pyclass]
pub struct MetricMeterRef {
meter: metrics::TemporalMeter,
#[pyo3(get)]
default_attributes: MetricAttributesRef,
}

#[pyclass]
#[derive(Clone)]
pub struct MetricAttributesRef {
attrs: metrics::MetricAttributes,
}

#[pyclass]
pub struct MetricCounterRef {
counter: Arc<dyn metrics::Counter>,
}

#[pyclass]
pub struct MetricHistogramRef {
histogram: Arc<dyn metrics::Histogram>,
}

#[pyclass]
pub struct MetricGaugeRef {
gauge: Arc<dyn metrics::Gauge>,
}

pub fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<MetricMeterRef> {
runtime_ref
.runtime
.core
.telemetry()
.get_metric_meter()
.map(|meter| {
let default_attributes = MetricAttributesRef {
attrs: meter.inner.new_attributes(meter.default_attribs.clone()),
};
MetricMeterRef {
meter,
default_attributes,
}
})
}

#[pymethods]
impl MetricMeterRef {
fn new_counter(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricCounterRef {
MetricCounterRef {
counter: self
.meter
.inner
.counter(build_metric_parameters(name, description, unit)),
}
}

fn new_histogram(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricHistogramRef {
MetricHistogramRef {
histogram: self
.meter
.inner
.histogram(build_metric_parameters(name, description, unit)),
}
}

fn new_gauge(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricGaugeRef {
MetricGaugeRef {
gauge: self
.meter
.inner
.gauge(build_metric_parameters(name, description, unit)),
}
}
}

#[pymethods]
impl MetricCounterRef {
fn add(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.counter.add(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricHistogramRef {
fn record(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.histogram.record(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricGaugeRef {
fn set(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.gauge.record(value, &attrs_ref.attrs);
}
}

fn build_metric_parameters(
name: String,
description: Option<String>,
unit: Option<String>,
) -> metrics::MetricParameters {
let mut build = metrics::MetricParametersBuilder::default();
build.name(name);
if let Some(description) = description {
build.description(description);
}
if let Some(unit) = unit {
build.unit(unit);
}
// Should be nothing that would fail validation here
build.build().unwrap()
}

#[pymethods]
impl MetricAttributesRef {
fn with_additional_attributes<'p>(
&self,
py: Python<'p>,
new_attrs: HashMap<String, PyObject>,
) -> PyResult<Self> {
let mut attrs = self.attrs.clone();
attrs.add_new_attrs(
new_attrs
.into_iter()
.map(|(k, obj)| metric_key_value_from_py(py, k, obj))
.collect::<PyResult<Vec<metrics::MetricKeyValue>>>()?,
);
Ok(MetricAttributesRef { attrs })
}
}

fn metric_key_value_from_py<'p>(
py: Python<'p>,
k: String,
obj: PyObject,
) -> PyResult<metrics::MetricKeyValue> {
let val = if let Ok(v) = obj.extract::<String>(py) {
metrics::MetricValue::String(v)
} else if let Ok(v) = obj.extract::<bool>(py) {
metrics::MetricValue::Bool(v)
} else if let Ok(v) = obj.extract::<i64>(py) {
metrics::MetricValue::Int(v)
} else if let Ok(v) = obj.extract::<f64>(py) {
metrics::MetricValue::Float(v)
} else {
return Err(PyTypeError::new_err(format!(
"Invalid value type for key {}, must be str, int, float, or bool",
k
)));
};
Ok(metrics::MetricKeyValue::new(k, val))
}
Loading