diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 6a63799790..fa22761441 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -8,7 +8,7 @@ use crate::integration_tests::metrics::wait_for_producer; use nexus_test_interface::NexusServer; use nexus_test_utils_macros::nexus_test; use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; -use oximeter_db::DbWrite; +use oximeter_db::DbInit; use std::time::Duration; use uuid::Uuid; diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index ce0ab78e61..995657fff2 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -21,7 +21,7 @@ use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; use oximeter_db::Client; -use oximeter_db::DbWrite; +use oximeter_db::{DbInit, DbWrite}; use qorb::claim::Handle; use qorb::pool::Pool; use qorb::resolver::BoxedResolver; diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index 3ad8959e66..c9675f1d45 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -14,7 +14,7 @@ use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; -use oximeter_db::{make_client, query, Client, DbWrite}; +use oximeter_db::{make_client, query, Client, DbInit, DbWrite}; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; use uuid::Uuid; diff --git a/oximeter/db/src/client/dbwrite.rs b/oximeter/db/src/client/dbwrite.rs index 2e771df74e..eb28eedd86 100644 --- a/oximeter/db/src/client/dbwrite.rs +++ b/oximeter/db/src/client/dbwrite.rs @@ -24,17 +24,9 @@ pub(super) struct UnrolledSampleRows { pub rows: BTreeMap>, } -/// A trait allowing a [`Client`] to write data into the timeseries database. -/// -/// The vanilla [`Client`] object allows users to query the timeseries database, returning -/// timeseries samples corresponding to various filtering criteria. This trait segregates the -/// methods required for _writing_ new data into the database, and is intended only for use by the -/// `oximeter-collector` crate. +/// A trait allowing a client to initialize the timeseries database. #[async_trait::async_trait] -pub trait DbWrite { - /// Insert the given samples into the database. - async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>; - +pub trait DbInit { /// Initialize the replicated telemetry database, creating tables as needed. async fn init_replicated_db(&self) -> Result<(), Error>; @@ -49,16 +41,7 @@ pub trait DbWrite { } #[async_trait::async_trait] -impl DbWrite for Client { - /// Insert the given samples into the database. - async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> { - debug!(self.log, "unrolling {} total samples", samples.len()); - let UnrolledSampleRows { new_schema, rows } = - self.unroll_samples(samples).await; - self.save_new_schema_or_remove(new_schema).await?; - self.insert_unrolled_samples(rows).await - } - +impl DbInit for Client { /// Initialize the replicated telemetry database, creating tables as needed. /// /// We run both db-init files since we want all tables in production. @@ -109,6 +92,30 @@ impl DbWrite for Client { } } +/// A trait allowing a [`Client`] to write data into the timeseries database. +/// +/// The vanilla [`Client`] object allows users to query the timeseries database, returning +/// timeseries samples corresponding to various filtering criteria. This trait segregates the +/// methods required for _writing_ new data into the database, and is intended only for use by the +/// `oximeter-collector` crate. +#[async_trait::async_trait] +pub trait DbWrite { + /// Insert the given samples into the database. + async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>; +} + +#[async_trait::async_trait] +impl DbWrite for Client { + /// Insert the given samples into the database. + async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> { + debug!(self.log, "unrolling {} total samples", samples.len()); + let UnrolledSampleRows { new_schema, rows } = + self.unroll_samples(samples).await; + self.save_new_schema_or_remove(new_schema).await?; + self.insert_unrolled_samples(rows).await + } +} + /// Allow initializing a minimal subset of db tables for replicated cluster /// testing #[async_trait::async_trait] diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 22caaa7312..b227e76809 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod query_summary; #[cfg(any(feature = "sql", test))] mod sql; +pub use self::dbwrite::DbInit; pub use self::dbwrite::DbWrite; pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; @@ -26,6 +27,7 @@ use crate::TimeseriesPageSelector; use crate::TimeseriesScanParams; use crate::TimeseriesSchema; use anyhow::anyhow; +use anyhow::Context; use debug_ignore::DebugIgnore; use dropshot::EmptyScanParams; use dropshot::PaginationOrder; @@ -97,6 +99,7 @@ impl backend::Connector for ReqwestConnector { client: reqwest::Client::builder() .pool_max_idle_per_host(1) .build() + .context("can't connect to database") .map_err(|e| QorbError::Other(anyhow!(e)))?, url: format!("http://{}", backend.address), }) @@ -111,12 +114,25 @@ impl backend::Connector for ReqwestConnector { .get(format!("{}/ping", conn.url)) .send() .await - .map_err(|err| QorbError::Other(anyhow!(err.to_string())))?, + .context("can't ping database") + .map_err(|e| QorbError::Other(anyhow!(e)))?, ) .await .map_err(|e| QorbError::Other(anyhow!(e)))?; Ok(()) } + + async fn on_acquire( + &self, + conn: &mut Self::Connection, + ) -> Result<(), backend::Error> { + let replicated = false; // TODO + conn.init_db(replicated, model::OXIMETER_VERSION) + .await + .context("can't initialize database") + .map_err(|e| QorbError::Other(anyhow!(e)))?; + Ok(()) + } } #[derive(Clone, Debug)] @@ -125,6 +141,167 @@ pub(crate) struct ReqwestClient { client: reqwest::Client, } +/// If we acquire a connection to a fresh database, we must ensure that +/// it is initialized (i.e., that the database exists and has the current +/// schema) before we can insert samples into it. These methods currently +/// duplicate the work of the [Client] database initialization, but with +/// somewhat simpler machinery. +/// +/// TODO: De-duplicate with [Client] `impl`. +impl ReqwestClient { + /// Ensure that the acquired database exists and is up-to-date. + async fn init_db( + &self, + replicated: bool, + expected_version: u64, + ) -> Result<(), Error> { + // Read the version from the DB + let version = self.read_latest_version().await?; + + // Decide how to conform the on-disk version with this version of + // Oximeter. + if version < expected_version { + // If the on-storage version is less than the constant embedded into + // this binary, the DB is out-of-date. Drop it, and re-populate it + // later. + if !replicated { + self.wipe_single_node_db().await?; + self.init_single_node_db().await?; + } else { + self.wipe_replicated_db().await?; + self.init_replicated_db().await?; + } + } else if version > expected_version { + // If the on-storage version is greater than the constant embedded + // into this binary, we may have downgraded. + return Err(Error::DatabaseVersionMismatch { + expected: model::OXIMETER_VERSION, + found: version, + }); + } else { + // If the version matches, we don't need to update the DB + return Ok(()); + } + + self.insert_version(expected_version).await?; + Ok(()) + } + + /// Read the latest version applied in the database. + pub async fn read_latest_version(&self) -> Result { + let sql = format!( + "SELECT MAX(value) FROM {db_name}.version;", + db_name = crate::DATABASE_NAME, + ); + + let version = match self.execute(sql).await { + Ok(body) if body.is_empty() => 0, + Ok(body) => body.trim().parse::().map_err(|err| { + Error::Database(format!("Cannot read version: {err}")) + })?, + Err(Error::Database(err)) + // Case 1: The database has not been created. + if err.contains(CLICKHOUSE_DB_MISSING) || + // Case 2: The database has been created, but it's old (exists + // prior to the version table). + err.contains(CLICKHOUSE_DB_VERSION_MISSING) => 0, + Err(err) => return Err(err), + }; + Ok(version) + } + + async fn insert_version(&self, version: u64) -> Result<(), Error> { + let sql = format!( + "INSERT INTO {db_name}.version (*) VALUES ({version}, now());", + db_name = crate::DATABASE_NAME, + ); + self.execute(sql).await?; + Ok(()) + } + + /// Execute a SQL statement, awaiting the response as text. + /// + /// This is intended only to be used for initializing the database. + /// It does not implement the logging, probing, or query summarization + /// of [`Client::execute_with_body`]. + async fn execute(&self, sql: String) -> Result { + let response = self + .client + .post(&self.url) + .timeout(DEFAULT_REQUEST_TIMEOUT) + .query(&[("output_format_json_quote_64bit_integers", "0")]) + .body(sql) + .send() + .await + .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?; + + // Convert the HTTP response into a database response. + let response = handle_db_response(response).await?; + response.text().await.map_err(|err| Error::Database(err.to_string())) + } + + /// Run one or more SQL statements. + /// + /// This is intended only to be used for initializing the database. + async fn run_many_sql_statements( + &self, + sql: impl AsRef, + ) -> Result<(), Error> { + for stmt in sql.as_ref().split(';').filter(|s| !s.trim().is_empty()) { + self.execute(stmt.to_string()).await?; + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl DbInit for ReqwestClient { + /// Initialize the replicated telemetry database, creating tables as needed. + /// + /// We run both db-init files since we want all tables in production. + /// These files are intentionally disjoint so that we don't have to + /// duplicate any setup. + async fn init_replicated_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/replicated/db-init-1.sql" + ))) + .await?; + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/replicated/db-init-2.sql" + ))) + .await + } + + /// Wipe the ClickHouse database entirely from a replicated set up. + async fn wipe_replicated_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/replicated/db-wipe.sql" + ))) + .await + } + + /// Initialize a single node telemetry database, creating tables as needed. + async fn init_single_node_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/single-node/db-init.sql" + ))) + .await + } + + /// Wipe the ClickHouse database entirely from a single node set up. + async fn wipe_single_node_db(&self) -> Result<(), Error> { + self.run_many_sql_statements(include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/schema/single-node/db-wipe.sql" + ))) + .await + } +} + #[derive(Debug)] pub(crate) enum ClientSource { Static(ReqwestClient), diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index 7c23eb0517..aef0be6c9e 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -1110,7 +1110,7 @@ fn update_total_rows_and_check( mod tests { use super::ConsistentKeyGroup; use crate::client::oxql::chunk_consistent_key_groups_impl; - use crate::{Client, DbWrite}; + use crate::{Client, DbInit, DbWrite}; use crate::{Metric, Target}; use chrono::{DateTime, Utc}; use dropshot::test_util::LogContext; diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index d7d3c3e730..c3fb078c8f 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -48,8 +48,8 @@ pub mod sql; pub use client::oxql::OxqlResult; pub use client::query_summary::QuerySummary; pub use client::Client; -pub use client::DbWrite; pub use client::TestDbWrite; +pub use client::{DbInit, DbWrite}; pub use model::OXIMETER_VERSION; #[derive(Debug, Error)] diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index 35f96dfd50..33895a0a1c 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -7,7 +7,7 @@ use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId}; use dropshot::test_util::log_prefix_for_test; use omicron_test_utils::dev::poll; use omicron_test_utils::dev::test_setup_log; -use oximeter_db::{Client, DbWrite, OxqlResult, Sample, TestDbWrite}; +use oximeter_db::{Client, DbInit, DbWrite, OxqlResult, Sample, TestDbWrite}; use oximeter_test_utils::wait_for_keepers; use slog::{info, Logger}; use std::collections::BTreeSet;