From cab55b143e1ce5114ef3d246187e27adfa05118c Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 29 Aug 2024 17:57:51 +0800 Subject: [PATCH 1/7] refactor(layers/prometheus): rewrite prometheus layer based on observe mod --- core/src/layers/prometheus.rs | 885 +++++++--------------------------- 1 file changed, 170 insertions(+), 715 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 8eedb002750e..0b0dbe002ab7 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -15,13 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; +use std::time::Duration; -use bytes::Buf; -use futures::TryFutureExt; -use log::debug; use prometheus::core::AtomicU64; use prometheus::core::GenericCounterVec; use prometheus::exponential_buckets; @@ -31,6 +27,7 @@ use prometheus::register_int_counter_vec_with_registry; use prometheus::HistogramVec; use prometheus::Registry; +use crate::layers::observe; use crate::raw::Access; use crate::raw::*; use crate::*; @@ -56,7 +53,7 @@ use crate::*; /// /// # Examples /// -/// ```no_build +/// ``` /// use log::debug; /// use log::info; /// use opendal::layers::PrometheusLayer; @@ -75,7 +72,7 @@ use crate::*; /// /// let op = Operator::new(builder) /// .expect("must init") -/// .layer(PrometheusLayer::with_registry(registry.clone())) +/// .layer(PrometheusLayer::new(registry.clone())) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -83,7 +80,7 @@ use crate::*; /// op.write("test", "Hello, World!").await?; /// // Read data from object. /// let bs = op.read("test").await?; -/// info!("content: {}", String::from_utf8_lossy(&bs)); +/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes())); /// /// // Get object metadata. /// let meta = op.stat("test").await?; @@ -98,44 +95,47 @@ use crate::*; /// Ok(()) /// } /// ``` -#[derive(Default, Debug, Clone)] + +#[derive(Clone, Debug)] pub struct PrometheusLayer { registry: Registry, - requests_duration_seconds_buckets: Vec, - bytes_total_buckets: Vec, + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, path_label_level: usize, } impl PrometheusLayer { - /// create PrometheusLayer by incoming registry. - pub fn with_registry(registry: Registry) -> Self { + /// Create a [`PrometheusLayer`] while registering itself to this registry. + pub fn new(registry: Registry) -> Self { Self { registry, - requests_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), - bytes_total_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), + operation_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), + operation_bytes_buckets: exponential_buckets(1.0, 2.0, 16).unwrap(), path_label_level: 0, } } - /// set buckets for requests_duration_seconds - pub fn requests_duration_seconds_buckets(mut self, buckets: Vec) -> Self { + /// Set buckets for `operation_duration_seconds` histogram. + pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.requests_duration_seconds_buckets = buckets; + self.operation_duration_seconds_buckets = buckets; } self } - /// set buckets for bytes_total - pub fn bytes_total_buckets(mut self, buckets: Vec) -> Self { + /// Set buckets for `bytes` histogram. + pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.bytes_total_buckets = buckets; + self.operation_bytes_buckets = buckets; } self } - /// set path label level - /// 0: no path label, the path label will be the "" - /// >0: the path label will be the path split by "/" and get the last n level, like "/abc/def/ghi", if n=1, the path label will be "/abc" + /// Set the level of path label. + /// + /// - level = 0: no path label, the path label will be the "". + /// - level > 0: the path label will be the path split by "/" and get the last n level, + /// like "/abc/def/ghi", if n=1, the path label will be "/abc". pub fn enable_path_label(mut self, level: usize) -> Self { self.path_label_level = level; self @@ -143,739 +143,194 @@ impl PrometheusLayer { } impl Layer for PrometheusLayer { - type LayeredAccess = PrometheusAccessor; + type LayeredAccess = observe::MetricsAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { - let meta = inner.info(); - let scheme = meta.scheme(); - - PrometheusAccessor { - inner, - stats: Arc::new(PrometheusMetrics::new( - self.registry.clone(), - self.requests_duration_seconds_buckets.clone(), - self.bytes_total_buckets.clone(), - self.path_label_level, - )), - scheme, - } + let interceptor = PrometheusInterceptor::register( + self.registry.clone(), + self.operation_duration_seconds_buckets.clone(), + self.operation_bytes_buckets.clone(), + self.path_label_level, + ); + observe::MetricsLayer::new(interceptor).layer(inner) } } -/// [`PrometheusMetrics`] provide the performance and IO metrics. -#[derive(Debug)] -pub struct PrometheusMetrics { - /// Total times of the specific operation be called. - pub requests_total: GenericCounterVec, - /// Latency of the specific operation be called. - pub requests_duration_seconds: HistogramVec, - /// Size of the specific metrics. - pub bytes_total: HistogramVec, - /// The Path Level we will keep in the path label. - pub path_label_level: usize, +#[derive(Clone, Debug)] +pub struct PrometheusInterceptor { + operation_duration_seconds: HistogramVec, + operation_bytes: HistogramVec, + operation_errors_total: GenericCounterVec, + path_label_level: usize, } -impl PrometheusMetrics { - /// new with prometheus register. - pub fn new( +impl PrometheusInterceptor { + fn register( registry: Registry, - requests_duration_seconds_buckets: Vec, - bytes_total_buckets: Vec, + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, path_label_level: usize, ) -> Self { - let labels = if path_label_level > 0 { - vec!["scheme", "operation", "path"] + let fixed_labels = vec![ + observe::LABEL_SCHEME, + observe::LABEL_NAMESPACE, + observe::LABEL_ROOT, + observe::LABEL_OPERATION, + ]; + let (labels, labels_with_error) = if path_label_level > 0 { + ( + { + let mut labels = fixed_labels.clone(); + labels.push(observe::LABEL_PATH); + labels + }, + { + let mut labels = fixed_labels.clone(); + labels.push(observe::LABEL_ERROR); + labels.push(observe::LABEL_PATH); + labels + }, + ) } else { - vec!["scheme", "operation"] + (fixed_labels.clone(), { + let mut labels = fixed_labels; + labels.push(observe::LABEL_ERROR); + labels + }) }; - let requests_total = register_int_counter_vec_with_registry!( - "requests_total", - "Total times of create be called", + + let operation_duration_seconds = register_histogram_vec_with_registry!( + histogram_opts!( + observe::METRIC_OPERATION_DURATION_SECONDS.name(), + observe::METRIC_OPERATION_DURATION_SECONDS.help(), + operation_duration_seconds_buckets + ), &labels, registry ) .unwrap(); - let opts = histogram_opts!( - "requests_duration_seconds", - "Histogram of the time spent on specific operation", - requests_duration_seconds_buckets - ); - - let requests_duration_seconds = - register_histogram_vec_with_registry!(opts, &labels, registry).unwrap(); - - let opts = histogram_opts!("bytes_total", "Total size of ", bytes_total_buckets); - let bytes_total = register_histogram_vec_with_registry!(opts, &labels, registry).unwrap(); + let operation_bytes = register_histogram_vec_with_registry!( + histogram_opts!( + observe::METRIC_OPERATION_BYTES.name(), + observe::METRIC_OPERATION_BYTES.help(), + operation_bytes_buckets + ), + &labels, + registry + ) + .unwrap(); + let operation_errors_total = register_int_counter_vec_with_registry!( + observe::METRIC_OPERATION_ERRORS_TOTAL.name(), + observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + &labels_with_error, + registry + ) + .unwrap(); Self { - requests_total, - requests_duration_seconds, - bytes_total, + operation_duration_seconds, + operation_bytes, + operation_errors_total, path_label_level, } } - /// error handling is the cold path, so we will not init error counters - /// in advance. - #[inline] - fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { - debug!( - "Prometheus statistics metrics error, operation {} error {}", - op.into_static(), - kind.into_static() - ); - } - - /// generate metric label - pub fn generate_metric_label<'a>( - &self, - scheme: &'a str, - operation: &'a str, - path_label: &'a str, + // labels: `["scheme", "namespace", "root", "op", "path"]` + // or `["scheme", "namespace", "root", "op"]` + fn gen_operation_labels<'a>( + &'a self, + scheme: Scheme, + namespace: &'a str, + root: &'a str, + op: Operation, + path: &'a str, ) -> Vec<&'a str> { - match self.path_label_level { - 0 => { - vec![scheme, operation] - } - n if n > 0 => { - let path_value = get_path_label(path_label, self.path_label_level); - vec![scheme, operation, path_value] - } - _ => { - vec![scheme, operation] - } - } - } -} - -#[derive(Clone)] -pub struct PrometheusAccessor { - inner: A, - stats: Arc, - scheme: Scheme, -} - -impl Debug for PrometheusAccessor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PrometheusAccessor") - .field("inner", &self.inner) - .finish_non_exhaustive() - } -} - -impl LayeredAccess for PrometheusAccessor { - type Inner = A; - type Reader = PrometheusMetricWrapper; - type BlockingReader = PrometheusMetricWrapper; - type Writer = PrometheusMetricWrapper; - type BlockingWriter = PrometheusMetricWrapper; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::CreateDir.into_static(), - path, - ); - - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let create_res = self.inner.create_dir(path, args).await; - - timer.observe_duration(); - create_res.map_err(|e| { - self.stats - .increment_errors_total(Operation::CreateDir, e.kind()); - e - }) - } - - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Read.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); + let mut labels = vec![scheme.into_static(), namespace, root, op.into_static()]; - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.read(path, args).await; - timer.observe_duration(); - - match res { - Ok((rp, r)) => Ok(( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - )), - Err(err) => { - self.stats - .increment_errors_total(Operation::Read, err.kind()); - Err(err) - } - } - } - - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Write.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.write(path, args).await; - timer.observe_duration(); - - match res { - Ok((rp, w)) => Ok(( - rp, - PrometheusMetricWrapper::new( - w, - Operation::Write, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - )), - Err(err) => { - self.stats - .increment_errors_total(Operation::Write, err.kind()); - Err(err) - } + if self.path_label_level > 0 { + let path_value = get_path_label(path, self.path_label_level); + labels.push(path_value); } - } - - async fn stat(&self, path: &str, args: OpStat) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Stat.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - - let stat_res = self - .inner - .stat(path, args) - .inspect_err(|e| { - self.stats.increment_errors_total(Operation::Stat, e.kind()); - }) - .await; - timer.observe_duration(); - stat_res.map_err(|e| { - self.stats.increment_errors_total(Operation::Stat, e.kind()); - e - }) - } - - async fn delete(&self, path: &str, args: OpDelete) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Delete.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - - let delete_res = self.inner.delete(path, args).await; - timer.observe_duration(); - delete_res.map_err(|e| { - self.stats - .increment_errors_total(Operation::Delete, e.kind()); - e - }) + labels } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::List.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - - let list_res = self.inner.list(path, args).await; - - timer.observe_duration(); - list_res.map_err(|e| { - self.stats.increment_errors_total(Operation::List, e.kind()); - e - }) - } - - async fn batch(&self, args: OpBatch) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Batch.into_static(), - "", - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.batch(args).await; - - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::Batch, e.kind()); - e - }) - } - - async fn presign(&self, path: &str, args: OpPresign) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Presign.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.presign(path, args).await; - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::Presign, e.kind()); - e - }) - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingCreateDir.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_create_dir(path, args); - - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingCreateDir, e.kind()); - e - }) - } - - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingRead.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingRead, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingRead, e.kind()); - e - }) - } - - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingWrite.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_write(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingWrite, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingWrite, e.kind()); - e - }) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingStat.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_stat(path, args); - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingStat, e.kind()); - e - }) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingDelete.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_delete(path, args); - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingDelete, e.kind()); - e - }) - } - - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingList.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_list(path, args); - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingList, e.kind()); - e - }) - } -} - -pub struct PrometheusMetricWrapper { - inner: R, - - op: Operation, - stats: Arc, - scheme: Scheme, - path: String, -} - -impl PrometheusMetricWrapper { - fn new( - inner: R, - op: Operation, - stats: Arc, + // labels: `["scheme", "namespace", "root", "op", "error", "path"]` + // or `["scheme", "namespace", "root", "op", "error"]` + fn gen_error_labels<'a>( + &'a self, scheme: Scheme, - path: &String, - ) -> Self { - Self { - inner, - op, - stats, - scheme, - path: path.to_string(), - } - } -} - -impl oio::Read for PrometheusMetricWrapper { - async fn read(&mut self) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::ReaderRead.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.read().await; - timer.observe_duration(); + namespace: &'a str, + root: &'a str, + op: Operation, + error: ErrorKind, + path: &'a str, + ) -> Vec<&'a str> { + let mut labels = vec![ + scheme.into_static(), + namespace, + root, + op.into_static(), + error.into_static(), + ]; - match res { - Ok(bytes) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(bytes.remaining() as f64); - Ok(bytes) - } - Err(e) => { - self.stats.increment_errors_total(self.op, e.kind()); - Err(e) - } + if self.path_label_level > 0 { + let path_value = get_path_label(path, self.path_label_level); + labels.push(path_value); } - } -} - -impl oio::BlockingRead for PrometheusMetricWrapper { - fn read(&mut self) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingReaderRead.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.read(); - timer.observe_duration(); - match res { - Ok(bs) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(bs.remaining() as f64); - Ok(bs) - } - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } + labels } } -impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result<()> { - let size = bs.len(); - - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::WriterWrite.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.write(bs).await; - timer.observe_duration(); - - match res { - Ok(_) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(size as f64); - Ok(()) - } - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } - } - - async fn abort(&mut self) -> Result<()> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::WriterAbort.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.abort().await; - timer.observe_duration(); - - match res { - Ok(()) => Ok(()), - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } - } - - async fn close(&mut self) -> Result<()> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::WriterClose.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.close().await; - timer.observe_duration(); - - match res { - Ok(()) => Ok(()), - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } - } +#[inline] +fn duration_to_seconds(d: Duration) -> f64 { + let nanos = f64::from(d.subsec_nanos()) / 1e9; + d.as_secs() as f64 + nanos } -impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result<()> { - let size = bs.len(); - - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingWrite.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds +impl observe::MetricsIntercept for PrometheusInterceptor { + fn observe_operation_duration_seconds( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + duration: Duration, + ) { + let labels = self.gen_operation_labels(scheme, &namespace, &root, op, path); + self.operation_duration_seconds .with_label_values(&labels) - .start_timer(); - let res = self.inner.write(bs); - timer.observe_duration(); - - match res { - Ok(_) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(size as f64); - Ok(()) - } - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } + .observe(duration_to_seconds(duration)) } - fn close(&mut self) -> Result<()> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingWriterClose.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds + fn observe_operation_bytes( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + bytes: usize, + ) { + let labels = self.gen_operation_labels(scheme, &namespace, &root, op, path); + self.operation_bytes .with_label_values(&labels) - .start_timer(); - let res = self.inner.close(); - timer.observe_duration(); + .observe(bytes as f64); + } - match res { - Ok(()) => Ok(()), - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } + fn observe_operation_errors_total( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + error: ErrorKind, + ) { + let labels = self.gen_error_labels(scheme, &namespace, &root, op, error, path); + self.operation_errors_total.with_label_values(&labels).inc(); } } From b7ea2b9e1c33fae91c99a1ee8d03585e441ab391 Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 29 Aug 2024 18:16:35 +0800 Subject: [PATCH 2/7] fix doc --- core/src/layers/prometheus.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 0b0dbe002ab7..462adcdc3122 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -53,7 +53,7 @@ use crate::*; /// /// # Examples /// -/// ``` +/// ```no_build /// use log::debug; /// use log::info; /// use opendal::layers::PrometheusLayer; @@ -95,7 +95,6 @@ use crate::*; /// Ok(()) /// } /// ``` - #[derive(Clone, Debug)] pub struct PrometheusLayer { registry: Registry, From a16deb6dc79c21e764cab7d32f9f536cf5c57a86 Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 29 Aug 2024 19:49:55 +0800 Subject: [PATCH 3/7] use Duration::as_secs_f64 --- core/src/layers/prometheus.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 462adcdc3122..9ff1adff99fa 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -282,12 +282,6 @@ impl PrometheusInterceptor { } } -#[inline] -fn duration_to_seconds(d: Duration) -> f64 { - let nanos = f64::from(d.subsec_nanos()) / 1e9; - d.as_secs() as f64 + nanos -} - impl observe::MetricsIntercept for PrometheusInterceptor { fn observe_operation_duration_seconds( &self, @@ -301,7 +295,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { let labels = self.gen_operation_labels(scheme, &namespace, &root, op, path); self.operation_duration_seconds .with_label_values(&labels) - .observe(duration_to_seconds(duration)) + .observe(duration.as_secs_f64()) } fn observe_operation_bytes( From e1e89f4c60e8e0dfcedc1bd75b8e901798e722db Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 29 Aug 2024 19:50:45 +0800 Subject: [PATCH 4/7] update doc --- core/src/layers/prometheus.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 9ff1adff99fa..e4352af17e11 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -122,7 +122,7 @@ impl PrometheusLayer { self } - /// Set buckets for `bytes` histogram. + /// Set buckets for `operation_bytes` histogram. pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_bytes_buckets = buckets; @@ -207,7 +207,7 @@ impl PrometheusInterceptor { &labels, registry ) - .unwrap(); + .unwrap(); let operation_bytes = register_histogram_vec_with_registry!( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), @@ -217,14 +217,14 @@ impl PrometheusInterceptor { &labels, registry ) - .unwrap(); + .unwrap(); let operation_errors_total = register_int_counter_vec_with_registry!( observe::METRIC_OPERATION_ERRORS_TOTAL.name(), observe::METRIC_OPERATION_ERRORS_TOTAL.help(), &labels_with_error, registry ) - .unwrap(); + .unwrap(); Self { operation_duration_seconds, From 0bdb70fb332d4144b359ac269017343884b35d94 Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 29 Aug 2024 20:54:37 +0800 Subject: [PATCH 5/7] apply review suggestions and improve labels --- core/src/layers/prometheus.rs | 190 +++++++++++++++++++--------------- 1 file changed, 106 insertions(+), 84 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index e4352af17e11..f80fa9f1c92d 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -115,6 +115,10 @@ impl PrometheusLayer { } /// Set buckets for `operation_duration_seconds` histogram. + /// + /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) + /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) + /// to generate the buckets. pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_duration_seconds_buckets = buckets; @@ -123,6 +127,10 @@ impl PrometheusLayer { } /// Set buckets for `operation_bytes` histogram. + /// + /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) + /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) + /// to generate the buckets. pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_bytes_buckets = buckets; @@ -132,9 +140,9 @@ impl PrometheusLayer { /// Set the level of path label. /// - /// - level = 0: no path label, the path label will be the "". + /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, - /// like "/abc/def/ghi", if n=1, the path label will be "/abc". + /// if n=1 and input path is "abc/def/ghi", and we will get "abc/". pub fn enable_path_label(mut self, level: usize) -> Self { self.path_label_level = level; self @@ -170,34 +178,7 @@ impl PrometheusInterceptor { operation_bytes_buckets: Vec, path_label_level: usize, ) -> Self { - let fixed_labels = vec![ - observe::LABEL_SCHEME, - observe::LABEL_NAMESPACE, - observe::LABEL_ROOT, - observe::LABEL_OPERATION, - ]; - let (labels, labels_with_error) = if path_label_level > 0 { - ( - { - let mut labels = fixed_labels.clone(); - labels.push(observe::LABEL_PATH); - labels - }, - { - let mut labels = fixed_labels.clone(); - labels.push(observe::LABEL_ERROR); - labels.push(observe::LABEL_PATH); - labels - }, - ) - } else { - (fixed_labels.clone(), { - let mut labels = fixed_labels; - labels.push(observe::LABEL_ERROR); - labels - }) - }; - + let labels = OperationLabels::names(false, path_label_level); let operation_duration_seconds = register_histogram_vec_with_registry!( histogram_opts!( observe::METRIC_OPERATION_DURATION_SECONDS.name(), @@ -207,7 +188,7 @@ impl PrometheusInterceptor { &labels, registry ) - .unwrap(); + .unwrap(); let operation_bytes = register_histogram_vec_with_registry!( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), @@ -217,14 +198,16 @@ impl PrometheusInterceptor { &labels, registry ) - .unwrap(); + .unwrap(); + + let labels = OperationLabels::names(true, path_label_level); let operation_errors_total = register_int_counter_vec_with_registry!( observe::METRIC_OPERATION_ERRORS_TOTAL.name(), observe::METRIC_OPERATION_ERRORS_TOTAL.help(), - &labels_with_error, + &labels, registry ) - .unwrap(); + .unwrap(); Self { operation_duration_seconds, @@ -233,53 +216,6 @@ impl PrometheusInterceptor { path_label_level, } } - - // labels: `["scheme", "namespace", "root", "op", "path"]` - // or `["scheme", "namespace", "root", "op"]` - fn gen_operation_labels<'a>( - &'a self, - scheme: Scheme, - namespace: &'a str, - root: &'a str, - op: Operation, - path: &'a str, - ) -> Vec<&'a str> { - let mut labels = vec![scheme.into_static(), namespace, root, op.into_static()]; - - if self.path_label_level > 0 { - let path_value = get_path_label(path, self.path_label_level); - labels.push(path_value); - } - - labels - } - - // labels: `["scheme", "namespace", "root", "op", "error", "path"]` - // or `["scheme", "namespace", "root", "op", "error"]` - fn gen_error_labels<'a>( - &'a self, - scheme: Scheme, - namespace: &'a str, - root: &'a str, - op: Operation, - error: ErrorKind, - path: &'a str, - ) -> Vec<&'a str> { - let mut labels = vec![ - scheme.into_static(), - namespace, - root, - op.into_static(), - error.into_static(), - ]; - - if self.path_label_level > 0 { - let path_value = get_path_label(path, self.path_label_level); - labels.push(path_value); - } - - labels - } } impl observe::MetricsIntercept for PrometheusInterceptor { @@ -292,7 +228,16 @@ impl observe::MetricsIntercept for PrometheusInterceptor { op: Operation, duration: Duration, ) { - let labels = self.gen_operation_labels(scheme, &namespace, &root, op, path); + let labels = OperationLabels { + scheme, + namespace: &namespace, + root: &root, + op, + error: None, + path, + } + .into_values(self.path_label_level); + self.operation_duration_seconds .with_label_values(&labels) .observe(duration.as_secs_f64()) @@ -307,7 +252,16 @@ impl observe::MetricsIntercept for PrometheusInterceptor { op: Operation, bytes: usize, ) { - let labels = self.gen_operation_labels(scheme, &namespace, &root, op, path); + let labels = OperationLabels { + scheme, + namespace: &namespace, + root: &root, + op, + error: None, + path, + } + .into_values(self.path_label_level); + self.operation_bytes .with_label_values(&labels) .observe(bytes as f64); @@ -322,11 +276,79 @@ impl observe::MetricsIntercept for PrometheusInterceptor { op: Operation, error: ErrorKind, ) { - let labels = self.gen_error_labels(scheme, &namespace, &root, op, error, path); + let labels = OperationLabels { + scheme, + namespace: &namespace, + root: &root, + op, + error: Some(error), + path, + } + .into_values(self.path_label_level); + self.operation_errors_total.with_label_values(&labels).inc(); } } +struct OperationLabels<'a> { + scheme: Scheme, + namespace: &'a str, + root: &'a str, + op: Operation, + error: Option, + path: &'a str, +} + +impl<'a> OperationLabels<'a> { + fn names(error: bool, path_label_level: usize) -> Vec<&'a str> { + let mut names = Vec::with_capacity(6); + + names.extend([ + observe::LABEL_SCHEME, + observe::LABEL_NAMESPACE, + observe::LABEL_ROOT, + observe::LABEL_OPERATION, + ]); + + if error { + names.push(observe::LABEL_ERROR); + } + + if path_label_level > 0 { + names.push(observe::LABEL_PATH); + } + + names + } + + /// labels: + /// + /// 1. `["scheme", "namespace", "root", "operation"]` + /// 2. `["scheme", "namespace", "root", "operation", "path"]` + /// 3. `["scheme", "namespace", "root", "operation", "error"]` + /// 4. `["scheme", "namespace", "root", "operation", "error", "path"]` + fn into_values(self, path_label_level: usize) -> Vec<&'a str> { + let mut labels = Vec::with_capacity(6); + + labels.extend([ + self.scheme.into_static(), + self.namespace, + self.root, + self.op.into_static(), + ]); + + if let Some(error) = self.error { + labels.push(error.into_static()); + } + + if path_label_level > 0 { + labels.push(get_path_label(self.path, path_label_level)); + } + + labels + } +} + fn get_path_label(path: &str, path_level: usize) -> &str { if path_level > 0 { return path From 89dc5b8c7425253770619ecd1f0734f55ae2a892 Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 29 Aug 2024 21:00:51 +0800 Subject: [PATCH 6/7] update doc --- core/src/layers/prometheus.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index f80fa9f1c92d..b85c5db4b126 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -142,7 +142,7 @@ impl PrometheusLayer { /// /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, - /// if n=1 and input path is "abc/def/ghi", and we will get "abc/". + /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. pub fn enable_path_label(mut self, level: usize) -> Self { self.path_label_level = level; self From 7f58c8c636d950b43a68c6d367a43f3951b23782 Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 29 Aug 2024 21:13:52 +0800 Subject: [PATCH 7/7] adjust label order --- core/src/layers/prometheus.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index b85c5db4b126..adbc436ceab0 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -295,8 +295,8 @@ struct OperationLabels<'a> { namespace: &'a str, root: &'a str, op: Operation, - error: Option, path: &'a str, + error: Option, } impl<'a> OperationLabels<'a> { @@ -310,14 +310,14 @@ impl<'a> OperationLabels<'a> { observe::LABEL_OPERATION, ]); - if error { - names.push(observe::LABEL_ERROR); - } - if path_label_level > 0 { names.push(observe::LABEL_PATH); } + if error { + names.push(observe::LABEL_ERROR); + } + names } @@ -326,7 +326,7 @@ impl<'a> OperationLabels<'a> { /// 1. `["scheme", "namespace", "root", "operation"]` /// 2. `["scheme", "namespace", "root", "operation", "path"]` /// 3. `["scheme", "namespace", "root", "operation", "error"]` - /// 4. `["scheme", "namespace", "root", "operation", "error", "path"]` + /// 4. `["scheme", "namespace", "root", "operation", "path", "error"]` fn into_values(self, path_label_level: usize) -> Vec<&'a str> { let mut labels = Vec::with_capacity(6); @@ -337,14 +337,14 @@ impl<'a> OperationLabels<'a> { self.op.into_static(), ]); - if let Some(error) = self.error { - labels.push(error.into_static()); - } - if path_label_level > 0 { labels.push(get_path_label(self.path, path_label_level)); } + if let Some(error) = self.error { + labels.push(error.into_static()); + } + labels } }