From 1b77125f5696cdb8aeaff395ab1740b818f6edaa Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Fri, 5 Jul 2024 23:00:38 +0000 Subject: [PATCH] Move transaction retry timeseries to TOML --- nexus/db-queries/src/transaction_retry.rs | 22 +- oximeter/impl/src/schema/codegen.rs | 311 +++++++++++++++++- .../oximeter/schema/database-transaction.toml | 26 ++ oximeter/oximeter/src/lib.rs | 9 +- 4 files changed, 338 insertions(+), 30 deletions(-) create mode 100644 oximeter/oximeter/schema/database-transaction.toml diff --git a/nexus/db-queries/src/transaction_retry.rs b/nexus/db-queries/src/transaction_retry.rs index 558bb574c9..03e0574f52 100644 --- a/nexus/db-queries/src/transaction_retry.rs +++ b/nexus/db-queries/src/transaction_retry.rs @@ -7,27 +7,15 @@ use async_bb8_diesel::AsyncConnection; use chrono::Utc; use diesel::result::Error as DieselError; -use oximeter::{types::Sample, Metric, MetricsError, Target}; +use oximeter::{types::Sample, MetricsError}; use rand::{thread_rng, Rng}; use slog::{info, warn, Logger}; use std::sync::{Arc, Mutex}; use std::time::Duration; -// Identifies "which" transaction is retrying -#[derive(Debug, Clone, Target)] -struct DatabaseTransaction { - name: String, -} - -// Identifies that a retry has occurred, and track how long -// the transaction took (either since starting, or since the last -// retry failure was recorded). -#[derive(Debug, Clone, Metric)] -struct RetryData { - #[datum] - latency: f64, - attempt: u32, -} +oximeter::use_timeseries!("database-transaction.toml"); +use database_transaction::DatabaseTransaction; +use database_transaction::RetryData; // Collects all transaction retry samples #[derive(Debug, Default, Clone)] @@ -156,7 +144,7 @@ impl RetryHelper { let _ = self.producer.append( &DatabaseTransaction { name: self.name.into() }, - &RetryData { latency, attempt }, + &RetryData { datum: latency, attempt }, ); // This backoff is not exponential, but I'm not sure we actually want diff --git a/oximeter/impl/src/schema/codegen.rs b/oximeter/impl/src/schema/codegen.rs index f9cc62560b..c29fa2b1f5 100644 --- a/oximeter/impl/src/schema/codegen.rs +++ b/oximeter/impl/src/schema/codegen.rs @@ -83,6 +83,140 @@ fn fields_are_copyable<'a>( fields.all(FieldSchema::is_copyable) } +/// Return true if the datum type is copyable. +fn datum_type_is_copyable(datum_type: DatumType) -> bool { + match datum_type { + DatumType::Bool + | DatumType::I8 + | DatumType::U8 + | DatumType::I16 + | DatumType::U16 + | DatumType::I32 + | DatumType::U32 + | DatumType::I64 + | DatumType::U64 + | DatumType::CumulativeI64 + | DatumType::CumulativeU64 + | DatumType::CumulativeF32 + | DatumType::CumulativeF64 + | DatumType::F32 + | DatumType::F64 => true, + DatumType::String + | DatumType::Bytes + | DatumType::HistogramI8 + | DatumType::HistogramU8 + | DatumType::HistogramI16 + | DatumType::HistogramU16 + | DatumType::HistogramI32 + | DatumType::HistogramU32 + | DatumType::HistogramI64 + | DatumType::HistogramU64 + | DatumType::HistogramF32 + | DatumType::HistogramF64 => false, + } +} + +/// Return `true` if values of this datum are partially ordered (can derive +/// `PartialOrd`.) +fn datum_type_is_partially_ordered(datum_type: DatumType) -> bool { + match datum_type { + DatumType::Bool + | DatumType::I8 + | DatumType::U8 + | DatumType::I16 + | DatumType::U16 + | DatumType::I32 + | DatumType::U32 + | DatumType::I64 + | DatumType::U64 + | DatumType::String + | DatumType::Bytes + | DatumType::CumulativeI64 + | DatumType::CumulativeU64 + | DatumType::CumulativeF32 + | DatumType::CumulativeF64 + | DatumType::F32 + | DatumType::F64 => true, + DatumType::HistogramI8 + | DatumType::HistogramU8 + | DatumType::HistogramI16 + | DatumType::HistogramU16 + | DatumType::HistogramI32 + | DatumType::HistogramU32 + | DatumType::HistogramI64 + | DatumType::HistogramU64 + | DatumType::HistogramF32 + | DatumType::HistogramF64 => false, + } +} + +/// Return `true` if values of this datum are totally ordered (can derive +/// `Ord`.) +fn datum_type_is_totally_ordered(datum_type: DatumType) -> bool { + match datum_type { + DatumType::Bool + | DatumType::I8 + | DatumType::U8 + | DatumType::I16 + | DatumType::U16 + | DatumType::I32 + | DatumType::U32 + | DatumType::I64 + | DatumType::U64 + | DatumType::String + | DatumType::Bytes + | DatumType::CumulativeI64 + | DatumType::CumulativeU64 + | DatumType::CumulativeF32 + | DatumType::CumulativeF64 => true, + DatumType::F32 + | DatumType::F64 + | DatumType::HistogramI8 + | DatumType::HistogramU8 + | DatumType::HistogramI16 + | DatumType::HistogramU16 + | DatumType::HistogramI32 + | DatumType::HistogramU32 + | DatumType::HistogramI64 + | DatumType::HistogramU64 + | DatumType::HistogramF32 + | DatumType::HistogramF64 => false, + } +} + +/// Return `true` if values of this datum are hashable (can derive `Hash`). +fn datum_type_is_hashable(datum_type: DatumType) -> bool { + match datum_type { + DatumType::Bool + | DatumType::I8 + | DatumType::U8 + | DatumType::I16 + | DatumType::U16 + | DatumType::I32 + | DatumType::U32 + | DatumType::I64 + | DatumType::U64 + | DatumType::String + | DatumType::Bytes + | DatumType::CumulativeI64 + | DatumType::CumulativeU64 + | DatumType::CumulativeF32 + | DatumType::CumulativeF64 => true, + DatumType::F32 + | DatumType::F64 + | DatumType::HistogramI8 + | DatumType::HistogramU8 + | DatumType::HistogramI16 + | DatumType::HistogramU16 + | DatumType::HistogramI32 + | DatumType::HistogramU32 + | DatumType::HistogramI64 + | DatumType::HistogramU64 + | DatumType::HistogramF32 + | DatumType::HistogramF64 => false, + } +} + fn compute_extra_derives( source: FieldSource, schema: &TimeseriesSchema, @@ -96,16 +230,25 @@ fn compute_extra_derives( } } FieldSource::Metric => { - if schema.datum_type.is_histogram() { - // No other traits can be derived, since the Histogram won't - // satisfy them. + let mut derives = Vec::new(); + if fields_are_copyable(schema.metric_fields()) + && datum_type_is_copyable(schema.datum_type) + { + derives.push(quote! { Copy }); + } + if datum_type_is_partially_ordered(schema.datum_type) { + derives.push(quote! { PartialOrd }); + if datum_type_is_totally_ordered(schema.datum_type) { + derives.push(quote! { Eq, Ord }); + } + } + if datum_type_is_hashable(schema.datum_type) { + derives.push(quote! { Hash }) + } + if derives.is_empty() { quote! {} } else { - if fields_are_copyable(schema.metric_fields()) { - quote! { #[derive(Copy, Eq, Hash, Ord, PartialOrd)] } - } else { - quote! { #[derive(Eq, Hash, Ord, PartialOrd)] } - } + quote! { #[derive(#(#derives),*)] } } } } @@ -479,7 +622,7 @@ mod tests { #[doc = "a metric"] #[derive(Clone, Debug, PartialEq, ::oximeter::Metric)] - #[derive(Copy, Eq, Hash, Ord, PartialOrd)] + #[derive(Copy, PartialOrd, Eq, Ord, Hash)] pub struct Bar { #[doc = "metric field"] pub f1: ::uuid::Uuid, @@ -524,7 +667,7 @@ mod tests { #[doc = "a metric"] #[derive(Clone, Debug, PartialEq, ::oximeter::Metric)] - #[derive(Copy, Eq, Hash, Ord, PartialOrd)] + #[derive(Copy, PartialOrd, Eq, Ord, Hash)] pub struct Bar { pub datum: ::oximeter::types::Cumulative, } @@ -532,4 +675,152 @@ mod tests { assert_eq!(tokens.to_string(), expected.to_string()); } + + #[test] + fn compute_extra_derives_respects_non_copy_fields() { + // Metric fields are not copy, even though datum is. + let schema = TimeseriesSchema { + timeseries_name: "foo:bar".parse().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([FieldSchema { + name: "f0".into(), + field_type: FieldType::String, + source: FieldSource::Metric, + description: "metric field".into(), + }]), + datum_type: DatumType::CumulativeU64, + version: NonZeroU8::new(1).unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Bytes, + created: Utc::now(), + }; + let tokens = compute_extra_derives(FieldSource::Metric, &schema); + assert_eq!( + tokens.to_string(), + quote! { #[derive(PartialOrd, Eq, Ord, Hash)] }.to_string(), + "Copy should not be derived for a datum type that is copy, \ + when the fields themselves are not copy." + ); + } + + #[test] + fn compute_extra_derives_respects_non_copy_datum_types() { + // Fields are copy, but datum is not. + let schema = TimeseriesSchema { + timeseries_name: "foo:bar".parse().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Metric, + description: "metric field".into(), + }]), + datum_type: DatumType::String, + version: NonZeroU8::new(1).unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Bytes, + created: Utc::now(), + }; + let tokens = compute_extra_derives(FieldSource::Metric, &schema); + assert_eq!( + tokens.to_string(), + quote! { #[derive(PartialOrd, Eq, Ord, Hash)] }.to_string(), + "Copy should not be derived for a datum type that is not copy, \ + when the fields themselves are copy." + ); + } + + #[test] + fn compute_extra_derives_respects_partially_ordered_datum_types() { + // No fields, datum is partially- but not totally-ordered. + let schema = TimeseriesSchema { + timeseries_name: "foo:bar".parse().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Target, + description: "target field".into(), + }]), + datum_type: DatumType::F64, + version: NonZeroU8::new(1).unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Bytes, + created: Utc::now(), + }; + let tokens = compute_extra_derives(FieldSource::Metric, &schema); + assert_eq!( + tokens.to_string(), + quote! { #[derive(Copy, PartialOrd)] }.to_string(), + "Should derive only PartialOrd for a metric type that is \ + not totally-ordered." + ); + } + + #[test] + fn compute_extra_derives_respects_totally_ordered_datum_types() { + // No fields, datum is also totally-ordered + let schema = TimeseriesSchema { + timeseries_name: "foo:bar".parse().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([FieldSchema { + name: "f0".into(), + field_type: FieldType::Uuid, + source: FieldSource::Target, + description: "target field".into(), + }]), + datum_type: DatumType::U64, + version: NonZeroU8::new(1).unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Bytes, + created: Utc::now(), + }; + let tokens = compute_extra_derives(FieldSource::Metric, &schema); + assert_eq!( + tokens.to_string(), + quote! { #[derive(Copy, PartialOrd, Eq, Ord, Hash)] }.to_string(), + "Should derive Ord for a metric type that is totally-ordered." + ); + } + + #[test] + fn compute_extra_derives_respects_datum_type_with_no_extra_derives() { + // No metric fields, and histograms don't admit any other derives. + let schema = TimeseriesSchema { + timeseries_name: "foo:bar".parse().unwrap(), + description: TimeseriesDescription { + target: "a target".into(), + metric: "a metric".into(), + }, + field_schema: BTreeSet::from([FieldSchema { + name: "f0".into(), + field_type: FieldType::String, + source: FieldSource::Target, + description: "target field".into(), + }]), + datum_type: DatumType::HistogramF64, + version: NonZeroU8::new(1).unwrap(), + authz_scope: AuthzScope::Fleet, + units: Units::Bytes, + created: Utc::now(), + }; + let tokens = compute_extra_derives(FieldSource::Metric, &schema); + assert!( + tokens.is_empty(), + "A histogram has no extra derives, so a timeseries schema \ + with no metric fields should also have no extra derives." + ); + } } diff --git a/oximeter/oximeter/schema/database-transaction.toml b/oximeter/oximeter/schema/database-transaction.toml new file mode 100644 index 0000000000..bfea3fd81d --- /dev/null +++ b/oximeter/oximeter/schema/database-transaction.toml @@ -0,0 +1,26 @@ +format_version = 1 + +[target] +name = "database_transaction" +description = "A named transaction run in the control plane database" +authz_scope = "fleet" +versions = [ + { version = 1, fields = [ "name" ] } +] + +[[metrics]] +name = "retry_data" +description = "Information about a retried transaction" +units = "seconds" +datum_type = "f64" +versions = [ + { added_in = 1, fields = [ "attempt" ] } +] + +[fields.name] +type = "string" +description = "The name of the transaction" + +[fields.attempt] +type = "u32" +description = "The attempt at running the transaction" diff --git a/oximeter/oximeter/src/lib.rs b/oximeter/oximeter/src/lib.rs index 9dd8fab47a..5ec6a49e5c 100644 --- a/oximeter/oximeter/src/lib.rs +++ b/oximeter/oximeter/src/lib.rs @@ -212,15 +212,18 @@ mod test { entry.path().canonicalize().unwrap().display() ); let contents = fs::read_to_string(entry.path()).unwrap(); - let list = load_schema(&contents).unwrap_or_else(|_| { + let list = load_schema(&contents).unwrap_or_else(|e| { panic!( - "Expected a valid timeseries definition in {}", - entry.path().canonicalize().unwrap().display() + "Expected a valid timeseries definition in {}, \ + but found error: {}", + entry.path().canonicalize().unwrap().display(), + e, ) }); println!("found {} schema", list.len()); for schema in list.into_iter() { let key = (schema.timeseries_name.clone(), schema.version); + println!(" {} v{}", key.0, key.1); if let Some(dup) = all_schema.insert(key, schema.clone()) { panic!( "Timeseries '{}' version {} is duplicated.\