Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffered metrics #391

Merged
merged 4 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 5 additions & 23 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions temporalio/bridge/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
) -> None:
"""Initialize metric meter."""
self._ref = ref
self._default_attributes = MetricAttributes(ref.default_attributes)
self._default_attributes = MetricAttributes(self, ref.default_attributes)

@property
def default_attributes(self) -> MetricAttributes:
Expand Down Expand Up @@ -99,13 +99,19 @@ class MetricAttributes:
"""Metric attributes using SDK Core."""

def __init__(
self, ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef
self,
meter: MetricMeter,
ref: temporalio.bridge.temporal_sdk_bridge.MetricAttributesRef,
) -> None:
"""Initialize attributes."""
self._meter = meter
self._ref = ref

def with_additional_attributes(
self, new_attrs: Mapping[str, Union[str, int, float, bool]]
) -> MetricAttributes:
"""Create new attributes with new attributes appended."""
return MetricAttributes(self._ref.with_additional_attributes(new_attrs))
return MetricAttributes(
self._meter,
self._ref.with_additional_attributes(self._meter._ref, new_attrs),
)
7 changes: 6 additions & 1 deletion temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Mapping, Optional, Type
from typing import Any, Mapping, Optional, Sequence, Type

import temporalio.bridge.temporal_sdk_bridge

Expand All @@ -25,6 +25,10 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
"""Create SDK Core runtime."""
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)

def retrieve_buffered_metrics(self) -> Sequence[Any]:
"""Get buffered metrics."""
return self._ref.retrieve_buffered_metrics()


@dataclass(frozen=True)
class LoggingConfig:
Expand All @@ -40,6 +44,7 @@ class MetricsConfig:

opentelemetry: Optional[OpenTelemetryConfig]
prometheus: Optional[PrometheusConfig]
buffered_with_size: int
attach_service_name: bool
global_tags: Optional[Mapping[str, str]]
metric_prefix: Optional[str]
Expand Down
2 changes: 2 additions & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<metric::MetricCounterRef>()?;
m.add_class::<metric::MetricHistogramRef>()?;
m.add_class::<metric::MetricGaugeRef>()?;
m.add_class::<metric::BufferedMetricUpdate>()?;
m.add_class::<metric::BufferedMetric>()?;
m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?;

// Runtime stuff
Expand Down
165 changes: 157 additions & 8 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::any::Any;
use std::{collections::HashMap, sync::Arc};

use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use temporal_sdk_core_api::telemetry::metrics;
use pyo3::{exceptions::PyTypeError, types::PyDict};
use temporal_sdk_core_api::telemetry::metrics::{
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, NewAttributes,
};

use crate::runtime;

Expand Down Expand Up @@ -139,14 +142,17 @@ impl MetricAttributesRef {
fn with_additional_attributes<'p>(
&self,
py: Python<'p>,
meter: &MetricMeterRef,
new_attrs: HashMap<String, PyObject>,
) -> PyResult<Self> {
let mut attrs = self.attrs.clone();
attrs.add_new_attrs(
new_attrs
.into_iter()
.map(|(k, obj)| metric_key_value_from_py(py, k, obj))
.collect::<PyResult<Vec<metrics::MetricKeyValue>>>()?,
let attrs = meter.meter.inner.extend_attributes(
self.attrs.clone(),
NewAttributes {
attributes: new_attrs
.into_iter()
.map(|(k, obj)| metric_key_value_from_py(py, k, obj))
.collect::<PyResult<Vec<metrics::MetricKeyValue>>>()?,
},
);
Ok(MetricAttributesRef { attrs })
}
Expand All @@ -173,3 +179,146 @@ fn metric_key_value_from_py<'p>(
};
Ok(metrics::MetricKeyValue::new(k, val))
}

// WARNING: This must match temporalio.runtime.BufferedMetricUpdate protocol
#[pyclass]
pub struct BufferedMetricUpdate {
#[pyo3(get)]
pub metric: Py<BufferedMetric>,
#[pyo3(get)]
pub value: u64,
#[pyo3(get)]
pub attributes: Py<PyDict>,
}

// WARNING: This must match temporalio.runtime.BufferedMetric protocol
#[pyclass]
pub struct BufferedMetric {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub description: Option<String>,
#[pyo3(get)]
pub unit: Option<String>,
#[pyo3(get)]
pub kind: u8, // 0 - counter, 1 - gauge, 2 - histogram
}

#[derive(Debug)]
struct BufferedMetricAttributes(Py<PyDict>);

#[derive(Clone, Debug)]
pub struct BufferedMetricRef(Py<BufferedMetric>);

impl BufferInstrumentRef for BufferedMetricRef {}

impl CustomMetricAttributes for BufferedMetricAttributes {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self as Arc<dyn Any + Send + Sync>
}
}

pub fn convert_metric_events<'p>(
py: Python<'p>,
events: Vec<MetricEvent<BufferedMetricRef>>,
) -> Vec<BufferedMetricUpdate> {
events
.into_iter()
.filter_map(|e| convert_metric_event(py, e))
.collect()
}

fn convert_metric_event<'p>(
py: Python<'p>,
event: MetricEvent<BufferedMetricRef>,
) -> Option<BufferedMetricUpdate> {
match event {
// Create the metric and put it on the lazy ref
MetricEvent::Create {
params,
populate_into,
kind,
} => {
let buffered_ref = BufferedMetricRef(
Py::new(
py,
BufferedMetric {
name: params.name.to_string(),
description: Some(params.description)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
unit: Some(params.unit)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
kind: match kind {
metrics::MetricKind::Counter => 0,
metrics::MetricKind::Gauge => 1,
metrics::MetricKind::Histogram => 2,
},
},
)
.expect("Unable to create buffered metric"),
);
populate_into.set(Arc::new(buffered_ref)).unwrap();
None
}
// Create the attributes and put it on the lazy ref
MetricEvent::CreateAttributes {
populate_into,
append_from,
attributes,
} => {
// Create the dictionary (as copy from existing if needed)
let new_attrs_ref: Py<PyDict> = match append_from {
Some(existing) => existing
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.expect("Unable to downcast to expected buffered metric attributes")
.0
.as_ref(py)
.copy()
.expect("Failed to copy metric attribute dictionary")
.into(),
None => PyDict::new(py).into(),
};
// Add attributes
let new_attrs = new_attrs_ref.as_ref(py);
for kv in attributes.into_iter() {
match kv.value {
metrics::MetricValue::String(v) => new_attrs.set_item(kv.key, v),
metrics::MetricValue::Int(v) => new_attrs.set_item(kv.key, v),
metrics::MetricValue::Float(v) => new_attrs.set_item(kv.key, v),
metrics::MetricValue::Bool(v) => new_attrs.set_item(kv.key, v),
Comment on lines +290 to +293
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to do this all with one branch like:

metrics::MetricValue::String(v) | metrics::MetricValue::Int(v) | metrics::MetricValue::Float(v) | metrics::MetricValue::Bool(v) => new_attrs.set_item(kv.key, v)

Copy link
Member Author

@cretz cretz Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a thing where I had the match just in the value but Rust was struggling to confirm they all impl'd ToPyObject w/out a manual dyn hint. I just figured this was the easiest, but will try to change to that if it works on next commit.

EDIT: Confirmed Rust is not smart enough to do proper inference of v here, you get something like:

294 |                     metrics::MetricValue::String(v) | metrics::MetricValue::Int(v) | metrics::MetricValue::Float(v) | metrics::MetricValue::Bool(v) => new_attrs.set_item(kv.key, v)
    |                                                  -                              ^ expected `String`, found `i64`
    |                                                  |
    |                                                  first introduced with type `std::string::String` here
    |
    = note: in the same arm, a binding must have the same type in all alternatives

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, they'd need to be turned into trait objects. Not worth it.

}
.expect("Unable to set metric key/value on dictionary");
}
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricAttributes(new_attrs_ref)))
.expect("Unable to set buffered metric attributes on reference");
None
}
// Convert to Python metric event
MetricEvent::Update {
instrument,
attributes,
update,
} => Some(BufferedMetricUpdate {
metric: instrument.get().clone().0.clone(),
value: match update {
metrics::MetricUpdateVal::Delta(v) => v,
metrics::MetricUpdateVal::Value(v) => v,
},
attributes: attributes
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.expect("Unable to downcast to expected buffered metric attributes")
.0
.clone(),
}),
}
}
Loading
Loading