diff --git a/Cargo.lock b/Cargo.lock index 29f5a982d0..8c9ce86011 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7425,6 +7425,7 @@ name = "oximeter-collector" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "camino", "chrono", "clap", @@ -7445,6 +7446,7 @@ dependencies = [ "oximeter-api", "oximeter-client", "oximeter-db", + "qorb", "rand", "reqwest 0.12.8", "schemars", @@ -7478,6 +7480,7 @@ dependencies = [ "clickward", "criterion", "crossterm", + "debug-ignore", "display-error-chain", "dropshot", "expectorate", @@ -7495,6 +7498,7 @@ dependencies = [ "oximeter-test-utils", "oxql-types", "peg", + "qorb", "reedline", "regex", "reqwest 0.12.8", diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index a70ddd51cd..334b091770 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] anyhow.workspace = true +async-trait.workspace = true camino.workspace = true chrono.workspace = true clap.workspace = true @@ -23,6 +24,7 @@ oximeter.workspace = true oximeter-api.workspace = true oximeter-client.workspace = true oximeter-db.workspace = true +qorb.workspace = true rand.workspace = true reqwest = { workspace = true, features = [ "json" ] } schemars.workspace = true diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 7b1574ca1f..ce0ab78e61 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -14,15 +14,17 @@ use anyhow::anyhow; use chrono::DateTime; use chrono::Utc; use futures::TryStreamExt; -use internal_dns_resolver::Resolver; -use internal_dns_types::names::ServiceName; use nexus_client::types::IdSortMode; +use nexus_client::Client as NexusClient; use omicron_common::backoff; use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; use oximeter_db::Client; use oximeter_db::DbWrite; +use qorb::claim::Handle; +use qorb::pool::Pool; +use qorb::resolver::BoxedResolver; use slog::debug; use slog::error; use slog::info; @@ -32,7 +34,6 @@ use slog::warn; use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; -use std::net::SocketAddr; use std::net::SocketAddrV6; use std::ops::Bound; use std::sync::Arc; @@ -387,7 +388,7 @@ impl OximeterAgent { address: SocketAddrV6, refresh_interval: Duration, db_config: DbConfig, - resolver: &Resolver, + resolver: BoxedResolver, log: &Logger, replicated: bool, ) -> Result { @@ -399,22 +400,6 @@ impl OximeterAgent { )); let insertion_log = log.new(o!("component" => "results-sink")); - // Construct the ClickHouse client first, propagate an error if we can't reach the - // database. - let db_address = if let Some(address) = db_config.address { - address - } else if replicated { - SocketAddr::V6( - resolver - .lookup_socket_v6(ServiceName::ClickhouseServer) - .await?, - ) - } else { - SocketAddr::V6( - resolver.lookup_socket_v6(ServiceName::Clickhouse).await?, - ) - }; - // Determine the version of the database. // // There are three cases @@ -429,7 +414,7 @@ impl OximeterAgent { // - The DB doesn't exist at all. This reports a version number of 0. We // need to create the DB here, at the latest version. This is used in // fresh installations and tests. - let client = Client::new(db_address, &log); + let client = Client::new_with_pool(resolver, &log); match client.check_db_is_at_expected_version().await { Ok(_) => {} Err(oximeter_db::Error::DatabaseVersionMismatch { @@ -482,11 +467,14 @@ impl OximeterAgent { /// Ensure the background task that polls Nexus periodically for our list of /// assigned producers is running. - pub(crate) fn ensure_producer_refresh_task(&self, resolver: Resolver) { + pub(crate) fn ensure_producer_refresh_task( + &self, + nexus_pool: Pool, + ) { let mut task = self.refresh_task.lock().unwrap(); if task.is_none() { let refresh_task = - tokio::spawn(refresh_producer_list(self.clone(), resolver)); + tokio::spawn(refresh_producer_list(self.clone(), nexus_pool)); *task = Some(refresh_task); } } @@ -763,15 +751,16 @@ impl OximeterAgent { } // A task which periodically updates our list of producers from Nexus. -async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) { +async fn refresh_producer_list( + agent: OximeterAgent, + nexus_pool: Pool, +) { let mut interval = tokio::time::interval(agent.refresh_interval); loop { interval.tick().await; info!(agent.log, "refreshing list of producers from Nexus"); - let nexus_addr = - resolve_nexus_with_backoff(&agent.log, &resolver).await; - let url = format!("http://{}", nexus_addr); - let client = nexus_client::Client::new(&url, agent.log.clone()); + + let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await; let mut stream = client.cpapi_assigned_producers_list_stream( &agent.id, // This is a _total_ limit, not a page size, so `None` means "get @@ -826,10 +815,10 @@ async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) { } } -async fn resolve_nexus_with_backoff( +async fn claim_nexus_with_backoff( log: &Logger, - resolver: &Resolver, -) -> SocketAddrV6 { + nexus_pool: &Pool, +) -> Handle { let log_failure = |error, delay| { warn!( log, @@ -839,8 +828,8 @@ async fn resolve_nexus_with_backoff( ); }; let do_lookup = || async { - resolver - .lookup_socket_v6(ServiceName::Nexus) + nexus_pool + .claim() .await .map_err(|e| BackoffError::transient(e.to_string())) }; diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index fa3d755dca..01770c9540 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -11,12 +11,19 @@ use dropshot::ConfigLogging; use dropshot::HttpError; use dropshot::HttpServer; use dropshot::HttpServerStarter; -use internal_dns_resolver::ResolveError; -use internal_dns_resolver::Resolver; use internal_dns_types::names::ServiceName; +use omicron_common::address::get_internal_dns_server_addresses; +use omicron_common::address::DNS_PORT; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::backoff; use omicron_common::FileKv; +use qorb::backend; +use qorb::resolver::AllBackends; +use qorb::resolver::BoxedResolver; +use qorb::resolver::Resolver; +use qorb::resolvers::dns::DnsResolver; +use qorb::resolvers::dns::DnsResolverConfig; +use qorb::service; use serde::Deserialize; use serde::Serialize; use slog::debug; @@ -26,12 +33,14 @@ use slog::o; use slog::warn; use slog::Drain; use slog::Logger; +use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::Path; use std::sync::Arc; use std::time::Duration; use thiserror::Error; +use tokio::sync::watch; use uuid::Uuid; mod agent; @@ -56,9 +65,6 @@ pub enum Error { #[error(transparent)] Database(#[from] oximeter_db::Error), - #[error(transparent)] - ResolveError(#[from] ResolveError), - #[error("Error running standalone")] Standalone(#[from] anyhow::Error), } @@ -157,6 +163,49 @@ pub struct OximeterArguments { pub address: SocketAddrV6, } +// Provides an alternative to the DNS resolver for cases where we want to +// contact a backend without performing resolution. +struct SingleHostResolver { + tx: watch::Sender, +} + +impl SingleHostResolver { + fn new(address: SocketAddr) -> Self { + let backends = Arc::new(BTreeMap::from([( + backend::Name::new("singleton"), + backend::Backend { address }, + )])); + let (tx, _rx) = watch::channel(backends.clone()); + Self { tx } + } +} + +impl Resolver for SingleHostResolver { + fn monitor(&mut self) -> watch::Receiver { + self.tx.subscribe() + } +} + +// A "qorb connector" which converts a SocketAddr into a nexus_client::Client. +struct NexusConnector { + log: Logger, +} + +#[async_trait::async_trait] +impl backend::Connector for NexusConnector { + type Connection = nexus_client::Client; + + async fn connect( + &self, + backend: &backend::Backend, + ) -> Result { + Ok(nexus_client::Client::new( + &format!("http://{}", backend.address), + self.log.clone(), + )) + } +} + /// A server used to collect metrics from components in the control plane. pub struct Oximeter { agent: Arc, @@ -202,10 +251,32 @@ impl Oximeter { } info!(log, "starting oximeter server"); - let resolver = Resolver::new_from_ip( - log.new(o!("component" => "DnsResolver")), - *args.address.ip(), - )?; + // Use the address for Oximeter to infer the bootstrap DNS address + let bootstrap_dns: Vec = + get_internal_dns_server_addresses(*args.address.ip()) + .into_iter() + .map(|ip| SocketAddr::new(ip, DNS_PORT)) + .collect(); + + let make_clickhouse_resolver = || -> BoxedResolver { + if let Some(address) = config.db.address { + Box::new(SingleHostResolver::new(address)) + } else { + let service = if config.db.replicated { + ServiceName::ClickhouseServer + } else { + ServiceName::Clickhouse + }; + Box::new(DnsResolver::new( + service::Name(service.srv_name()), + bootstrap_dns.clone(), + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )) + } + }; let make_agent = || async { debug!(log, "creating ClickHouse client"); @@ -215,7 +286,7 @@ impl Oximeter { args.address, config.refresh_interval, config.db, - &resolver, + make_clickhouse_resolver(), &log, config.db.replicated, ) @@ -256,24 +327,32 @@ impl Oximeter { address: server.local_addr().to_string(), collector_id: agent.id, }; + + let nexus_pool = { + let nexus_resolver: BoxedResolver = + if let Some(address) = config.nexus_address { + Box::new(SingleHostResolver::new(address)) + } else { + Box::new(DnsResolver::new( + service::Name(ServiceName::Nexus.srv_name()), + bootstrap_dns, + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )) + }; + + qorb::pool::Pool::new( + nexus_resolver, + Arc::new(NexusConnector { log: log.clone() }), + qorb::policy::Policy::default(), + ) + }; + let notify_nexus = || async { debug!(log, "contacting nexus"); - let nexus_address = if let Some(address) = config.nexus_address { - address - } else { - SocketAddr::V6( - resolver - .lookup_socket_v6(ServiceName::Nexus) - .await - .map_err(|e| { - backoff::BackoffError::transient(e.to_string()) - })?, - ) - }; - let client = nexus_client::Client::new( - &format!("http://{nexus_address}"), - log.clone(), - ); + let client = nexus_pool.claim().await.map_err(|e| e.to_string())?; client.cpapi_collectors_post(&our_info).await.map_err(|e| { match &e { // Failures to reach nexus, or server errors on its side @@ -307,7 +386,7 @@ impl Oximeter { // Now that we've successfully registered, we'll start periodically // polling for our list of producers from Nexus. - agent.ensure_producer_refresh_task(resolver); + agent.ensure_producer_refresh_task(nexus_pool); info!(log, "oximeter registered with nexus"; "id" => ?agent.id); Ok(Self { agent, server }) diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index a1750e3fc9..334acee5cc 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -17,6 +17,7 @@ camino.workspace = true chrono.workspace = true clap.workspace = true clickward.workspace = true +debug-ignore.workspace = true dropshot.workspace = true futures.workspace = true gethostname.workspace = true @@ -27,6 +28,7 @@ omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true oxql-types.workspace = true +qorb.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index c95c2aa013..22caaa7312 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -25,6 +25,8 @@ use crate::Timeseries; use crate::TimeseriesPageSelector; use crate::TimeseriesScanParams; use crate::TimeseriesSchema; +use anyhow::anyhow; +use debug_ignore::DebugIgnore; use dropshot::EmptyScanParams; use dropshot::PaginationOrder; use dropshot::ResultsPage; @@ -33,6 +35,10 @@ use omicron_common::backoff; use oximeter::schema::TimeseriesKey; use oximeter::types::Sample; use oximeter::TimeseriesName; +use qorb::backend; +use qorb::backend::Error as QorbError; +use qorb::pool::Pool; +use qorb::resolver::BoxedResolver; use regex::Regex; use regex::RegexBuilder; use slog::debug; @@ -51,6 +57,7 @@ use std::num::NonZeroU32; use std::ops::Bound; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use std::sync::OnceLock; use std::time::Duration; use std::time::Instant; @@ -72,18 +79,127 @@ mod probes { fn sql__query__done(_: &usdt::UniqueId) {} } +// A "qorb connector" which creates a ReqwestClient for the backend. +// +// This also keeps track of the underlying address, so we can use it +// for making HTTP requests directly to the backend. +struct ReqwestConnector {} + +#[async_trait::async_trait] +impl backend::Connector for ReqwestConnector { + type Connection = ReqwestClient; + + async fn connect( + &self, + backend: &backend::Backend, + ) -> Result { + Ok(ReqwestClient { + client: reqwest::Client::builder() + .pool_max_idle_per_host(1) + .build() + .map_err(|e| QorbError::Other(anyhow!(e)))?, + url: format!("http://{}", backend.address), + }) + } + + async fn is_valid( + &self, + conn: &mut Self::Connection, + ) -> Result<(), backend::Error> { + handle_db_response( + conn.client + .get(format!("{}/ping", conn.url)) + .send() + .await + .map_err(|err| QorbError::Other(anyhow!(err.to_string())))?, + ) + .await + .map_err(|e| QorbError::Other(anyhow!(e)))?; + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct ReqwestClient { + url: String, + client: reqwest::Client, +} + +#[derive(Debug)] +pub(crate) enum ClientSource { + Static(ReqwestClient), + Pool { pool: DebugIgnore> }, +} + +pub(crate) enum ClientVariant { + Static(ReqwestClient), + Handle(qorb::claim::Handle), +} + +impl ClientVariant { + pub(crate) async fn new(source: &ClientSource) -> Result { + let client = match source { + ClientSource::Static(client) => { + ClientVariant::Static(client.clone()) + } + ClientSource::Pool { pool } => { + let handle = pool.claim().await?; + ClientVariant::Handle(handle) + } + }; + Ok(client) + } + + pub(crate) fn url(&self) -> &str { + match self { + ClientVariant::Static(client) => &client.url, + ClientVariant::Handle(handle) => &handle.url, + } + } + + pub(crate) fn reqwest(&self) -> &reqwest::Client { + match self { + ClientVariant::Static(client) => &client.client, + ClientVariant::Handle(handle) => &handle.client, + } + } +} + /// A `Client` to the ClickHouse metrics database. #[derive(Debug)] pub struct Client { _id: Uuid, log: Logger, - url: String, - client: reqwest::Client, + source: ClientSource, schema: Mutex>, request_timeout: Duration, } impl Client { + /// Construct a Clickhouse client of the database with a connection pool. + pub fn new_with_pool(resolver: BoxedResolver, log: &Logger) -> Self { + let id = Uuid::new_v4(); + let log = log.new(slog::o!( + "component" => "clickhouse-client", + "id" => id.to_string(), + )); + let schema = Mutex::new(BTreeMap::new()); + let request_timeout = DEFAULT_REQUEST_TIMEOUT; + Self { + _id: id, + log, + source: ClientSource::Pool { + pool: DebugIgnore(Pool::new( + resolver, + Arc::new(ReqwestConnector {}), + qorb::policy::Policy::default(), + )), + }, + schema, + request_timeout, + } + } + /// Construct a new ClickHouse client of the database at `address`. pub fn new(address: SocketAddr, log: &Logger) -> Self { Self::new_with_request_timeout(address, log, DEFAULT_REQUEST_TIMEOUT) @@ -104,19 +220,34 @@ impl Client { let client = reqwest::Client::new(); let url = format!("http://{}", address); let schema = Mutex::new(BTreeMap::new()); - Self { _id: id, log, url, client, schema, request_timeout } + Self { + _id: id, + log, + source: ClientSource::Static(ReqwestClient { url, client }), + schema, + request_timeout, + } } - /// Return the url the client is trying to connect to + /// Return the url the client is trying to connect to. + /// + /// For pool-based clients, this returns "dynamic", as the URL may change + /// between accesses. pub fn url(&self) -> &str { - &self.url + match &self.source { + ClientSource::Static(client) => &client.url, + ClientSource::Pool { .. } => "dynamic", + } } /// Ping the ClickHouse server to verify connectivitiy. pub async fn ping(&self) -> Result<(), Error> { + let client = ClientVariant::new(&self.source).await?; + handle_db_response( - self.client - .get(format!("{}/ping", self.url)) + client + .reqwest() + .get(format!("{}/ping", client.url())) .send() .await .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?, @@ -920,9 +1051,11 @@ impl Client { let start = Instant::now(); // Submit the SQL request itself. - let response = self - .client - .post(&self.url) + let client = ClientVariant::new(&self.source).await?; + + let response = client + .reqwest() + .post(client.url()) .timeout(self.request_timeout) .query(&[ ("output_format_json_quote_64bit_integers", "0"), diff --git a/oximeter/db/src/client/sql.rs b/oximeter/db/src/client/sql.rs index 236faa7aa4..10c234c9cc 100644 --- a/oximeter/db/src/client/sql.rs +++ b/oximeter/db/src/client/sql.rs @@ -55,9 +55,11 @@ impl Client { "original_sql" => &original_query, "rewritten_sql" => &rewritten, ); - let request = self - .client - .post(&self.url) + let client = crate::client::ClientVariant::new(&self.source).await?; + + let request = client + .reqwest() + .post(client.url()) .query(&[ ("output_format_json_quote_64bit_integers", "0"), ("database", crate::DATABASE_NAME), diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index cd51c92c18..d7d3c3e730 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -54,6 +54,12 @@ pub use model::OXIMETER_VERSION; #[derive(Debug, Error)] pub enum Error { + #[error("Failed to create reqwest client")] + Reqwest(#[from] reqwest::Error), + + #[error("Failed to check out connection to database")] + Connection(#[from] qorb::pool::Error), + #[error("Oximeter core error: {0}")] Oximeter(#[from] oximeter::MetricsError),