From 5514295d7b9be579614268c3138835f03016e60d Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 1 Oct 2024 16:47:51 -0700 Subject: [PATCH 1/4] Use qorb within the Oximeter collector --- Cargo.lock | 4 + oximeter/collector/Cargo.toml | 2 + oximeter/collector/src/agent.rs | 55 +++++------- oximeter/collector/src/lib.rs | 129 +++++++++++++++++++++------ oximeter/db/Cargo.toml | 2 + oximeter/db/src/client/mod.rs | 153 +++++++++++++++++++++++++++++--- oximeter/db/src/client/sql.rs | 8 +- oximeter/db/src/lib.rs | 6 ++ 8 files changed, 286 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2c4a71d510..292461600b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7436,6 +7436,7 @@ name = "oximeter-collector" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "chrono", "clap", @@ -7455,6 +7456,7 @@ dependencies = [ "oximeter-api", "oximeter-client", "oximeter-db", + "qorb", "rand", "reqwest 0.12.7", "schemars", @@ -7488,6 +7490,7 @@ dependencies = [ "clickward", "criterion", "crossterm 0.28.1", + "debug-ignore", "display-error-chain", "dropshot 0.12.0", "expectorate", @@ -7505,6 +7508,7 @@ dependencies = [ "oximeter-test-utils", "oxql-types", "peg", + "qorb", "reedline", "regex", "reqwest 0.12.7", diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index f8f8e147ec..c6852b5599 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true camino.workspace = true chrono.workspace = true clap.workspace = true @@ -22,6 +23,7 @@ oximeter.workspace = true oximeter-api.workspace = true oximeter-client.workspace = true oximeter-db.workspace = true +qorb.workspace = true rand.workspace = true reqwest = { workspace = true, features = [ "json" ] } schemars.workspace = true diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 60d5a7766b..ce0ab78e61 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -14,15 +14,17 @@ use anyhow::anyhow; use chrono::DateTime; use chrono::Utc; use futures::TryStreamExt; -use internal_dns::resolver::Resolver; -use internal_dns::ServiceName; use nexus_client::types::IdSortMode; +use nexus_client::Client as NexusClient; use omicron_common::backoff; use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; use oximeter_db::Client; use oximeter_db::DbWrite; +use qorb::claim::Handle; +use qorb::pool::Pool; +use qorb::resolver::BoxedResolver; use slog::debug; use slog::error; use slog::info; @@ -32,7 +34,6 @@ use slog::warn; use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; -use std::net::SocketAddr; use std::net::SocketAddrV6; use std::ops::Bound; use std::sync::Arc; @@ -387,7 +388,7 @@ impl OximeterAgent { address: SocketAddrV6, refresh_interval: Duration, db_config: DbConfig, - resolver: &Resolver, + resolver: BoxedResolver, log: &Logger, replicated: bool, ) -> Result { @@ -399,22 +400,6 @@ impl OximeterAgent { )); let insertion_log = log.new(o!("component" => "results-sink")); - // Construct the ClickHouse client first, propagate an error if we can't reach the - // database. - let db_address = if let Some(address) = db_config.address { - address - } else if replicated { - SocketAddr::V6( - resolver - .lookup_socket_v6(ServiceName::ClickhouseServer) - .await?, - ) - } else { - SocketAddr::V6( - resolver.lookup_socket_v6(ServiceName::Clickhouse).await?, - ) - }; - // Determine the version of the database. // // There are three cases @@ -429,7 +414,7 @@ impl OximeterAgent { // - The DB doesn't exist at all. This reports a version number of 0. We // need to create the DB here, at the latest version. This is used in // fresh installations and tests. - let client = Client::new(db_address, &log); + let client = Client::new_with_pool(resolver, &log); match client.check_db_is_at_expected_version().await { Ok(_) => {} Err(oximeter_db::Error::DatabaseVersionMismatch { @@ -482,11 +467,14 @@ impl OximeterAgent { /// Ensure the background task that polls Nexus periodically for our list of /// assigned producers is running. - pub(crate) fn ensure_producer_refresh_task(&self, resolver: Resolver) { + pub(crate) fn ensure_producer_refresh_task( + &self, + nexus_pool: Pool, + ) { let mut task = self.refresh_task.lock().unwrap(); if task.is_none() { let refresh_task = - tokio::spawn(refresh_producer_list(self.clone(), resolver)); + tokio::spawn(refresh_producer_list(self.clone(), nexus_pool)); *task = Some(refresh_task); } } @@ -763,15 +751,16 @@ impl OximeterAgent { } // A task which periodically updates our list of producers from Nexus. -async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) { +async fn refresh_producer_list( + agent: OximeterAgent, + nexus_pool: Pool, +) { let mut interval = tokio::time::interval(agent.refresh_interval); loop { interval.tick().await; info!(agent.log, "refreshing list of producers from Nexus"); - let nexus_addr = - resolve_nexus_with_backoff(&agent.log, &resolver).await; - let url = format!("http://{}", nexus_addr); - let client = nexus_client::Client::new(&url, agent.log.clone()); + + let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await; let mut stream = client.cpapi_assigned_producers_list_stream( &agent.id, // This is a _total_ limit, not a page size, so `None` means "get @@ -826,10 +815,10 @@ async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) { } } -async fn resolve_nexus_with_backoff( +async fn claim_nexus_with_backoff( log: &Logger, - resolver: &Resolver, -) -> SocketAddrV6 { + nexus_pool: &Pool, +) -> Handle { let log_failure = |error, delay| { warn!( log, @@ -839,8 +828,8 @@ async fn resolve_nexus_with_backoff( ); }; let do_lookup = || async { - resolver - .lookup_socket_v6(ServiceName::Nexus) + nexus_pool + .claim() .await .map_err(|e| BackoffError::transient(e.to_string())) }; diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 0576c7d532..cbe2678486 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -11,12 +11,15 @@ use dropshot::ConfigLogging; use dropshot::HttpError; use dropshot::HttpServer; use dropshot::HttpServerStarter; -use internal_dns::resolver::ResolveError; -use internal_dns::resolver::Resolver; use internal_dns::ServiceName; +use omicron_common::address::{get_internal_dns_server_addresses, DNS_PORT}; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::backoff; use omicron_common::FileKv; +use qorb::backend; +use qorb::resolver::{AllBackends, BoxedResolver, Resolver}; +use qorb::resolvers::dns::{DnsResolver, DnsResolverConfig}; +use qorb::service; use serde::Deserialize; use serde::Serialize; use slog::debug; @@ -26,12 +29,14 @@ use slog::o; use slog::warn; use slog::Drain; use slog::Logger; +use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::Path; use std::sync::Arc; use std::time::Duration; use thiserror::Error; +use tokio::sync::watch; use uuid::Uuid; mod agent; @@ -56,9 +61,6 @@ pub enum Error { #[error(transparent)] Database(#[from] oximeter_db::Error), - #[error(transparent)] - ResolveError(#[from] ResolveError), - #[error("Error running standalone")] Standalone(#[from] anyhow::Error), } @@ -157,6 +159,49 @@ pub struct OximeterArguments { pub address: SocketAddrV6, } +// Provides an alternative to the DNS resolver for cases where we want to +// contact a backend without performing resolution. +struct SingleHostResolver { + tx: watch::Sender, +} + +impl SingleHostResolver { + fn new(address: SocketAddr) -> Self { + let backends = Arc::new(BTreeMap::from([( + backend::Name::new("singleton"), + backend::Backend { address }, + )])); + let (tx, _rx) = watch::channel(backends.clone()); + Self { tx } + } +} + +impl Resolver for SingleHostResolver { + fn monitor(&mut self) -> watch::Receiver { + self.tx.subscribe() + } +} + +// A "qorb connector" which converts a SocketAddr into a nexus_client::Client. +struct NexusConnector { + log: Logger, +} + +#[async_trait::async_trait] +impl backend::Connector for NexusConnector { + type Connection = nexus_client::Client; + + async fn connect( + &self, + backend: &backend::Backend, + ) -> Result { + Ok(nexus_client::Client::new( + &format!("http://{}", backend.address), + self.log.clone(), + )) + } +} + /// A server used to collect metrics from components in the control plane. pub struct Oximeter { agent: Arc, @@ -202,10 +247,32 @@ impl Oximeter { } info!(log, "starting oximeter server"); - let resolver = Resolver::new_from_ip( - log.new(o!("component" => "DnsResolver")), - *args.address.ip(), - )?; + // Use the address for Oximeter to infer the bootstrap DNS address + let bootstrap_dns: Vec = + get_internal_dns_server_addresses(*args.address.ip()) + .into_iter() + .map(|ip| SocketAddr::new(ip, DNS_PORT)) + .collect(); + + let make_clickhouse_resolver = || -> BoxedResolver { + if let Some(address) = config.db.address { + Box::new(SingleHostResolver::new(address)) + } else { + let service = if config.db.replicated { + ServiceName::ClickhouseServer + } else { + ServiceName::Clickhouse + }; + Box::new(DnsResolver::new( + service::Name(service.srv_name()), + bootstrap_dns.clone(), + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )) + } + }; let make_agent = || async { debug!(log, "creating ClickHouse client"); @@ -215,7 +282,7 @@ impl Oximeter { args.address, config.refresh_interval, config.db, - &resolver, + make_clickhouse_resolver(), &log, config.db.replicated, ) @@ -256,24 +323,32 @@ impl Oximeter { address: server.local_addr().to_string(), collector_id: agent.id, }; + + let nexus_pool = { + let nexus_resolver: BoxedResolver = + if let Some(address) = config.nexus_address { + Box::new(SingleHostResolver::new(address)) + } else { + Box::new(DnsResolver::new( + service::Name(ServiceName::Nexus.srv_name()), + bootstrap_dns, + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )) + }; + + qorb::pool::Pool::new( + nexus_resolver, + Arc::new(NexusConnector { log: log.clone() }), + qorb::policy::Policy::default(), + ) + }; + let notify_nexus = || async { debug!(log, "contacting nexus"); - let nexus_address = if let Some(address) = config.nexus_address { - address - } else { - SocketAddr::V6( - resolver - .lookup_socket_v6(ServiceName::Nexus) - .await - .map_err(|e| { - backoff::BackoffError::transient(e.to_string()) - })?, - ) - }; - let client = nexus_client::Client::new( - &format!("http://{nexus_address}"), - log.clone(), - ); + let client = nexus_pool.claim().await.map_err(|e| e.to_string())?; client.cpapi_collectors_post(&our_info).await.map_err(|e| { match &e { // Failures to reach nexus, or server errors on its side @@ -307,7 +382,7 @@ impl Oximeter { // Now that we've successfully registered, we'll start periodically // polling for our list of producers from Nexus. - agent.ensure_producer_refresh_task(resolver); + agent.ensure_producer_refresh_task(nexus_pool); info!(log, "oximeter registered with nexus"; "id" => ?agent.id); Ok(Self { agent, server }) diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index a1750e3fc9..334acee5cc 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -17,6 +17,7 @@ camino.workspace = true chrono.workspace = true clap.workspace = true clickward.workspace = true +debug-ignore.workspace = true dropshot.workspace = true futures.workspace = true gethostname.workspace = true @@ -27,6 +28,7 @@ omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true oxql-types.workspace = true +qorb.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index c95c2aa013..22caaa7312 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -25,6 +25,8 @@ use crate::Timeseries; use crate::TimeseriesPageSelector; use crate::TimeseriesScanParams; use crate::TimeseriesSchema; +use anyhow::anyhow; +use debug_ignore::DebugIgnore; use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; @@ -33,6 +35,10 @@ use omicron_common::backoff; use oximeter::schema::TimeseriesKey; use oximeter::types::Sample; use oximeter::TimeseriesName; +use qorb::backend; +use qorb::backend::Error as QorbError; +use qorb::pool::Pool; +use qorb::resolver::BoxedResolver; use regex::Regex; use regex::RegexBuilder; use slog::debug; @@ -51,6 +57,7 @@ use std::num::NonZeroU32; use std::ops::Bound; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use std::sync::OnceLock; use std::time::Duration; use std::time::Instant; @@ -72,18 +79,127 @@ mod probes { fn sql__query__done(_: &usdt::UniqueId) {} } +// A "qorb connector" which creates a ReqwestClient for the backend. +// +// This also keeps track of the underlying address, so we can use it +// for making HTTP requests directly to the backend. +struct ReqwestConnector {} + +#[async_trait::async_trait] +impl backend::Connector for ReqwestConnector { + type Connection = ReqwestClient; + + async fn connect( + &self, + backend: &backend::Backend, + ) -> Result { + Ok(ReqwestClient { + client: reqwest::Client::builder() + .pool_max_idle_per_host(1) + .build() + .map_err(|e| QorbError::Other(anyhow!(e)))?, + url: format!("http://{}", backend.address), + }) + } + + async fn is_valid( + &self, + conn: &mut Self::Connection, + ) -> Result<(), backend::Error> { + handle_db_response( + conn.client + .get(format!("{}/ping", conn.url)) + .send() + .await + .map_err(|err| QorbError::Other(anyhow!(err.to_string())))?, + ) + .await + .map_err(|e| QorbError::Other(anyhow!(e)))?; + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct ReqwestClient { + url: String, + client: reqwest::Client, +} + +#[derive(Debug)] +pub(crate) enum ClientSource { + Static(ReqwestClient), + Pool { pool: DebugIgnore> }, +} + +pub(crate) enum ClientVariant { + Static(ReqwestClient), + Handle(qorb::claim::Handle), +} + +impl ClientVariant { + pub(crate) async fn new(source: &ClientSource) -> Result { + let client = match source { + ClientSource::Static(client) => { + ClientVariant::Static(client.clone()) + } + ClientSource::Pool { pool } => { + let handle = pool.claim().await?; + ClientVariant::Handle(handle) + } + }; + Ok(client) + } + + pub(crate) fn url(&self) -> &str { + match self { + ClientVariant::Static(client) => &client.url, + ClientVariant::Handle(handle) => &handle.url, + } + } + + pub(crate) fn reqwest(&self) -> &reqwest::Client { + match self { + ClientVariant::Static(client) => &client.client, + ClientVariant::Handle(handle) => &handle.client, + } + } +} + /// A `Client` to the ClickHouse metrics database. #[derive(Debug)] pub struct Client { _id: Uuid, log: Logger, - url: String, - client: reqwest::Client, + source: ClientSource, schema: Mutex>, request_timeout: Duration, } impl Client { + /// Construct a Clickhouse client of the database with a connection pool. + pub fn new_with_pool(resolver: BoxedResolver, log: &Logger) -> Self { + let id = Uuid::new_v4(); + let log = log.new(slog::o!( + "component" => "clickhouse-client", + "id" => id.to_string(), + )); + let schema = Mutex::new(BTreeMap::new()); + let request_timeout = DEFAULT_REQUEST_TIMEOUT; + Self { + _id: id, + log, + source: ClientSource::Pool { + pool: DebugIgnore(Pool::new( + resolver, + Arc::new(ReqwestConnector {}), + qorb::policy::Policy::default(), + )), + }, + schema, + request_timeout, + } + } + /// Construct a new ClickHouse client of the database at `address`. pub fn new(address: SocketAddr, log: &Logger) -> Self { Self::new_with_request_timeout(address, log, DEFAULT_REQUEST_TIMEOUT) @@ -104,19 +220,34 @@ impl Client { let client = reqwest::Client::new(); let url = format!("http://{}", address); let schema = Mutex::new(BTreeMap::new()); - Self { _id: id, log, url, client, schema, request_timeout } + Self { + _id: id, + log, + source: ClientSource::Static(ReqwestClient { url, client }), + schema, + request_timeout, + } } - /// Return the url the client is trying to connect to + /// Return the url the client is trying to connect to. + /// + /// For pool-based clients, this returns "dynamic", as the URL may change + /// between accesses. pub fn url(&self) -> &str { - &self.url + match &self.source { + ClientSource::Static(client) => &client.url, + ClientSource::Pool { .. } => "dynamic", + } } /// Ping the ClickHouse server to verify connectivitiy. pub async fn ping(&self) -> Result<(), Error> { + let client = ClientVariant::new(&self.source).await?; + handle_db_response( - self.client - .get(format!("{}/ping", self.url)) + client + .reqwest() + .get(format!("{}/ping", client.url())) .send() .await .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?, @@ -920,9 +1051,11 @@ impl Client { let start = Instant::now(); // Submit the SQL request itself. - let response = self - .client - .post(&self.url) + let client = ClientVariant::new(&self.source).await?; + + let response = client + .reqwest() + .post(client.url()) .timeout(self.request_timeout) .query(&[ ("output_format_json_quote_64bit_integers", "0"), diff --git a/oximeter/db/src/client/sql.rs b/oximeter/db/src/client/sql.rs index 236faa7aa4..10c234c9cc 100644 --- a/oximeter/db/src/client/sql.rs +++ b/oximeter/db/src/client/sql.rs @@ -55,9 +55,11 @@ impl Client { "original_sql" => &original_query, "rewritten_sql" => &rewritten, ); - let request = self - .client - .post(&self.url) + let client = crate::client::ClientVariant::new(&self.source).await?; + + let request = client + .reqwest() + .post(client.url()) .query(&[ ("output_format_json_quote_64bit_integers", "0"), ("database", crate::DATABASE_NAME), diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index cd51c92c18..d7d3c3e730 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -54,6 +54,12 @@ pub use model::OXIMETER_VERSION; #[derive(Debug, Error)] pub enum Error { + #[error("Failed to create reqwest client")] + Reqwest(#[from] reqwest::Error), + + #[error("Failed to check out connection to database")] + Connection(#[from] qorb::pool::Error), + #[error("Oximeter core error: {0}")] Oximeter(#[from] oximeter::MetricsError), From 20a16b818d6f53b3d4b4ca5d5266a8ce02b5f6a7 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 1 Oct 2024 17:01:40 -0700 Subject: [PATCH 2/4] hakari --- Cargo.lock | 1 + workspace-hack/Cargo.toml | 2 ++ 2 files changed, 3 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 292461600b..6adf89acf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7067,6 +7067,7 @@ dependencies = [ "postgres-types", "predicates", "proc-macro2", + "qorb", "quote", "regex", "regex-automata", diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 72fc65ed06..a356369467 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -86,6 +86,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption" postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.2" } proc-macro2 = { version = "1.0.86" } +qorb = { git = "https://github.com/oxidecomputer/qorb", branch = "master", features = ["qtop"] } quote = { version = "1.0.37" } regex = { version = "1.10.6" } regex-automata = { version = "0.4.6", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] } @@ -196,6 +197,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption" postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.2" } proc-macro2 = { version = "1.0.86" } +qorb = { git = "https://github.com/oxidecomputer/qorb", branch = "master", features = ["qtop"] } quote = { version = "1.0.37" } regex = { version = "1.10.6" } regex-automata = { version = "0.4.6", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] } From 834ea859e3a87204b4f370c42ae888c1ca966d22 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Wed, 2 Oct 2024 09:53:00 -0700 Subject: [PATCH 3/4] import style --- oximeter/collector/src/lib.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index cbe2678486..a9f03a5b09 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -12,13 +12,17 @@ use dropshot::HttpError; use dropshot::HttpServer; use dropshot::HttpServerStarter; use internal_dns::ServiceName; -use omicron_common::address::{get_internal_dns_server_addresses, DNS_PORT}; +use omicron_common::address::get_internal_dns_server_addresses; +use omicron_common::address::DNS_PORT; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::backoff; use omicron_common::FileKv; use qorb::backend; -use qorb::resolver::{AllBackends, BoxedResolver, Resolver}; -use qorb::resolvers::dns::{DnsResolver, DnsResolverConfig}; +use qorb::resolver::AllBackends; +use qorb::resolver::BoxedResolver; +use qorb::resolver::Resolver; +use qorb::resolvers::dns::DnsResolver; +use qorb::resolvers::dns::DnsResolverConfig; use qorb::service; use serde::Deserialize; use serde::Serialize; From ded7d759b7a7d21ff3ab98f18ca6a3d07431262a Mon Sep 17 00:00:00 2001 From: Alex Plotnick Date: Mon, 14 Oct 2024 19:03:01 -0600 Subject: [PATCH 4/4] Initialize timeseries database on qorb acquire --- nexus/tests/integration_tests/oximeter.rs | 2 +- oximeter/collector/src/agent.rs | 2 +- oximeter/db/src/bin/oxdb/main.rs | 2 +- oximeter/db/src/client/dbwrite.rs | 47 +++--- oximeter/db/src/client/mod.rs | 179 +++++++++++++++++++++- oximeter/db/src/client/oxql.rs | 2 +- oximeter/db/src/lib.rs | 2 +- oximeter/db/tests/integration_test.rs | 2 +- 8 files changed, 211 insertions(+), 27 deletions(-) diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 6a63799790..fa22761441 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -8,7 +8,7 @@ use crate::integration_tests::metrics::wait_for_producer; use nexus_test_interface::NexusServer; use nexus_test_utils_macros::nexus_test; use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; -use oximeter_db::DbWrite; +use oximeter_db::DbInit; use std::time::Duration; use uuid::Uuid; diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index ce0ab78e61..995657fff2 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -21,7 +21,7 @@ use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; use oximeter_db::Client; -use oximeter_db::DbWrite; +use oximeter_db::{DbInit, DbWrite}; use qorb::claim::Handle; use qorb::pool::Pool; use qorb::resolver::BoxedResolver; diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index 3ad8959e66..c9675f1d45 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -14,7 +14,7 @@ use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; -use oximeter_db::{make_client, query, Client, DbWrite}; +use oximeter_db::{make_client, query, Client, DbInit, DbWrite}; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; use uuid::Uuid; diff --git a/oximeter/db/src/client/dbwrite.rs b/oximeter/db/src/client/dbwrite.rs index 2e771df74e..eb28eedd86 100644 --- a/oximeter/db/src/client/dbwrite.rs +++ b/oximeter/db/src/client/dbwrite.rs @@ -24,17 +24,9 @@ pub(super) struct UnrolledSampleRows { pub rows: BTreeMap>, } -/// A trait allowing a [`Client`] to write data into the timeseries database. -/// -/// The vanilla [`Client`] object allows users to query the timeseries database, returning -/// timeseries samples corresponding to various filtering criteria. This trait segregates the -/// methods required for _writing_ new data into the database, and is intended only for use by the -/// `oximeter-collector` crate. +/// A trait allowing a client to initialize the timeseries database. #[async_trait::async_trait] -pub trait DbWrite { - /// Insert the given samples into the database. - async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>; - +pub trait DbInit { /// Initialize the replicated telemetry database, creating tables as needed. async fn init_replicated_db(&self) -> Result<(), Error>; @@ -49,16 +41,7 @@ pub trait DbWrite { } #[async_trait::async_trait] -impl DbWrite for Client { - /// Insert the given samples into the database. - async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> { - debug!(self.log, "unrolling {} total samples", samples.len()); - let UnrolledSampleRows { new_schema, rows } = - self.unroll_samples(samples).await; - self.save_new_schema_or_remove(new_schema).await?; - self.insert_unrolled_samples(rows).await - } - +impl DbInit for Client { /// Initialize the replicated telemetry database, creating tables as needed. /// /// We run both db-init files since we want all tables in production. @@ -109,6 +92,30 @@ impl DbWrite for Client { } } +/// A trait allowing a [`Client`] to write data into the timeseries database. +/// +/// The vanilla [`Client`] object allows users to query the timeseries database, returning +/// timeseries samples corresponding to various filtering criteria. This trait segregates the +/// methods required for _writing_ new data into the database, and is intended only for use by the +/// `oximeter-collector` crate. +#[async_trait::async_trait] +pub trait DbWrite { + /// Insert the given samples into the database. + async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>; +} + +#[async_trait::async_trait] +impl DbWrite for Client { + /// Insert the given samples into the database. + async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> { + debug!(self.log, "unrolling {} total samples", samples.len()); + let UnrolledSampleRows { new_schema, rows } = + self.unroll_samples(samples).await; + self.save_new_schema_or_remove(new_schema).await?; + self.insert_unrolled_samples(rows).await + } +} + /// Allow initializing a minimal subset of db tables for replicated cluster /// testing #[async_trait::async_trait] diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 22caaa7312..b227e76809 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod query_summary; #[cfg(any(feature = "sql", test))] mod sql; +pub use self::dbwrite::DbInit; pub use self::dbwrite::DbWrite; pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; @@ -26,6 +27,7 @@ use crate::TimeseriesPageSelector; use crate::TimeseriesScanParams; use crate::TimeseriesSchema; use anyhow::anyhow; +use anyhow::Context; use debug_ignore::DebugIgnore; use dropshot::EmptyScanParams; use dropshot::PaginationOrder; @@ -97,6 +99,7 @@ impl backend::Connector for ReqwestConnector { client: reqwest::Client::builder() .pool_max_idle_per_host(1) .build() + .context("can't connect to database") .map_err(|e| QorbError::Other(anyhow!(e)))?, url: format!("http://{}", backend.address), }) @@ -111,12 +114,25 @@ impl backend::Connector for ReqwestConnector { .get(format!("{}/ping", conn.url)) .send() .await - .map_err(|err| QorbError::Other(anyhow!(err.to_string())))?, + .context("can't ping database") + .map_err(|e| QorbError::Other(anyhow!(e)))?, ) .await .map_err(|e| QorbError::Other(anyhow!(e)))?; Ok(()) } + + async fn on_acquire( + &self, + conn: &mut Self::Connection, + ) -> Result<(), backend::Error> { + let replicated = false; // TODO + conn.init_db(replicated, model::OXIMETER_VERSION) + .await + .context("can't initialize database") + .map_err(|e| QorbError::Other(anyhow!(e)))?; + Ok(()) + } } #[derive(Clone, Debug)] @@ -125,6 +141,167 @@ pub(crate) struct ReqwestClient { client: reqwest::Client, } +/// If we acquire a connection to a fresh database, we must ensure that +/// it is initialized (i.e., that the database exists and has the current +/// schema) before we can insert samples into it. These methods currently +/// duplicate the work of the [Client] database initialization, but with +/// somewhat simpler machinery. +/// +/// TODO: De-duplicate with [Client] `impl`. +impl ReqwestClient { + /// Ensure that the acquired database exists and is up-to-date. + async fn init_db( + &self, + replicated: bool, + expected_version: u64, + ) -> Result<(), Error> { + // Read the version from the DB + let version = self.read_latest_version().await?; + + // Decide how to conform the on-disk version with this version of + // Oximeter. + if version < expected_version { + // If the on-storage version is less than the constant embedded into + // this binary, the DB is out-of-date. Drop it, and re-populate it + // later. + if !replicated { + self.wipe_single_node_db().await?; + self.init_single_node_db().await?; + } else { + self.wipe_replicated_db().await?; + self.init_replicated_db().await?; + } + } else if version > expected_version { + // If the on-storage version is greater than the constant embedded + // into this binary, we may have downgraded. + return Err(Error::DatabaseVersionMismatch { + expected: model::OXIMETER_VERSION, + found: version, + }); + } else { + // If the version matches, we don't need to update the DB + return Ok(()); + } + + self.insert_version(expected_version).await?; + Ok(()) + } + + /// Read the latest version applied in the database. + pub async fn read_latest_version(&self) -> Result { + let sql = format!( + "SELECT MAX(value) FROM {db_name}.version;", + db_name = crate::DATABASE_NAME, + ); + + let version = match self.execute(sql).await { + Ok(body) if body.is_empty() => 0, + Ok(body) => body.trim().parse::().map_err(|err| { + Error::Database(format!("Cannot read version: {err}")) + })?, + Err(Error::Database(err)) + // Case 1: The database has not been created. + if err.contains(CLICKHOUSE_DB_MISSING) || + // Case 2: The database has been created, but it's old (exists + // prior to the version table). + err.contains(CLICKHOUSE_DB_VERSION_MISSING) => 0, + Err(err) => return Err(err), + }; + Ok(version) + } + + async fn insert_version(&self, version: u64) -> Result<(), Error> { + let sql = format!( + "INSERT INTO {db_name}.version (*) VALUES ({version}, now());", + db_name = crate::DATABASE_NAME, + ); + self.execute(sql).await?; + Ok(()) + } + + /// Execute a SQL statement, awaiting the response as text. + /// + /// This is intended only to be used for initializing the database. + /// It does not implement the logging, probing, or query summarization + /// of [`Client::execute_with_body`]. + async fn execute(&self, sql: String) -> Result { + let response = self + .client + .post(&self.url) + .timeout(DEFAULT_REQUEST_TIMEOUT) + .query(&[("output_format_json_quote_64bit_integers", "0")]) + .body(sql) + .send() + .await + .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?; + + // Convert the HTTP response into a database response. + let response = handle_db_response(response).await?; + response.text().await.map_err(|err| Error::Database(err.to_string())) + } + + /// Run one or more SQL statements. + /// + /// This is intended only to be used for initializing the database. + async fn run_many_sql_statements( + &self, + sql: impl AsRef, + ) -> Result<(), Error> { + for stmt in sql.as_ref().split(';').filter(|s| !s.trim().is_empty()) { + self.execute(stmt.to_string()).await?; + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl DbInit for ReqwestClient { + /// Initialize the replicated telemetry database, creating tables as needed. + /// + /// We run both db-init files since we want all tables in production. + /// These files are intentionally disjoint so that we don't have to + /// duplicate any setup. + async fn init_replicated_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/replicated/db-init-1.sql" + ))) + .await?; + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/replicated/db-init-2.sql" + ))) + .await + } + + /// Wipe the ClickHouse database entirely from a replicated set up. + async fn wipe_replicated_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/replicated/db-wipe.sql" + ))) + .await + } + + /// Initialize a single node telemetry database, creating tables as needed. + async fn init_single_node_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/single-node/db-init.sql" + ))) + .await + } + + /// Wipe the ClickHouse database entirely from a single node set up. + async fn wipe_single_node_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/single-node/db-wipe.sql" + ))) + .await + } +} + #[derive(Debug)] pub(crate) enum ClientSource { Static(ReqwestClient), diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index 7c23eb0517..aef0be6c9e 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -1110,7 +1110,7 @@ fn update_total_rows_and_check( mod tests { use super::ConsistentKeyGroup; use crate::client::oxql::chunk_consistent_key_groups_impl; - use crate::{Client, DbWrite}; + use crate::{Client, DbInit, DbWrite}; use crate::{Metric, Target}; use chrono::{DateTime, Utc}; use dropshot::test_util::LogContext; diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index d7d3c3e730..c3fb078c8f 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -48,8 +48,8 @@ pub mod sql; pub use client::oxql::OxqlResult; pub use client::query_summary::QuerySummary; pub use client::Client; -pub use client::DbWrite; pub use client::TestDbWrite; +pub use client::{DbInit, DbWrite}; pub use model::OXIMETER_VERSION; #[derive(Debug, Error)] diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index 35f96dfd50..33895a0a1c 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -7,7 +7,7 @@ use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId}; use dropshot::test_util::log_prefix_for_test; use omicron_test_utils::dev::poll; use omicron_test_utils::dev::test_setup_log; -use oximeter_db::{Client, DbWrite, OxqlResult, Sample, TestDbWrite}; +use oximeter_db::{Client, DbInit, DbWrite, OxqlResult, Sample, TestDbWrite}; use oximeter_test_utils::wait_for_keepers; use slog::{info, Logger}; use std::collections::BTreeSet;