From 516ee34596f406920297a33eecf93c88bcbcbfe0 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Wed, 16 Oct 2024 16:17:32 -0700 Subject: [PATCH 1/4] Add ClickHouse native connection pool - Implement a connector and connection pool for talking to ClickHouse over the native TCP protocol - Add the native TCP address or a `BoxedResolver` for resolving it to all code that connects to ClickHouse. This is a pretty noisy change, since we now need two addresses / objects everywhere. We'll take it back down to one when we completely make the switch to the native protocol and remove the HTTP interface. - Remove the feature flag gating the native code, keeping that just for the prototype SQL client shell. - NFC formatting change to bring the `oximeter_db::native` import style in line with the rest of the crate. --- Cargo.lock | 39 ++- Cargo.toml | 4 +- dev-tools/omdb/src/bin/omdb/oxql.rs | 56 +++- nexus/src/app/oximeter.rs | 27 +- nexus/test-utils/src/lib.rs | 10 +- nexus/tests/integration_tests/oximeter.rs | 8 +- oximeter/collector/src/agent.rs | 20 +- .../src/bin/clickhouse-schema-updater.rs | 19 +- oximeter/collector/src/lib.rs | 72 ++-- oximeter/db/Cargo.toml | 12 +- oximeter/db/src/bin/oxdb/main.rs | 63 +++- oximeter/db/src/client/mod.rs | 312 ++++++++++++++---- oximeter/db/src/client/oxql.rs | 6 +- oximeter/db/src/lib.rs | 19 +- oximeter/db/src/native/block.rs | 47 +-- oximeter/db/src/native/connection.rs | 103 ++++-- oximeter/db/src/native/io/block.rs | 10 +- oximeter/db/src/native/io/column.rs | 20 +- oximeter/db/src/native/io/exception.rs | 4 +- oximeter/db/src/native/io/packet/client.rs | 20 +- oximeter/db/src/native/io/packet/server.rs | 18 +- oximeter/db/src/native/io/progress.rs | 3 +- oximeter/db/src/native/io/string.rs | 3 +- oximeter/db/src/native/io/varuint.rs | 3 +- oximeter/db/src/native/mod.rs | 1 + oximeter/db/src/native/packets/client.rs | 10 +- oximeter/db/src/native/packets/server.rs | 3 +- oximeter/db/src/shells/mod.rs | 2 +- oximeter/db/src/shells/native.rs | 5 +- oximeter/db/src/shells/oxql.rs | 5 +- oximeter/db/src/shells/sql.rs | 5 +- oximeter/db/tests/integration_test.rs | 15 +- workspace-hack/Cargo.toml | 4 +- 33 files changed, 686 insertions(+), 262 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ed5eb5828..5a196d4b4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1362,12 +1362,13 @@ dependencies = [ [[package]] name = "clickward" version = "0.1.0" -source = "git+https://github.com/oxidecomputer/clickward?rev=ceec762e6a87d2a22bf56792a3025e145caa095e#ceec762e6a87d2a22bf56792a3025e145caa095e" +source = "git+https://github.com/oxidecomputer/clickward?rev=a1b342c2558e835d09e6e39a40d3de798a29c2f#a1b342c2558e835d09e6e39a40d3de798a29c2f5" dependencies = [ "anyhow", "camino", "clap", "derive_more", + "schemars", "serde", "serde_json", "thiserror", @@ -8718,9 +8719,9 @@ dependencies = [ [[package]] name = "qorb" -version = "0.0.2" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "104066c20d7277d0af40a7333c579a2a71cc6b68c14982d1da2e5747a381a3ed" +checksum = "675f442a5904b8b5dc9f5d298be36676b29e2e852eace78a3d3d00822469c88e" dependencies = [ "anyhow", "async-trait", @@ -8736,7 +8737,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-tungstenite 0.23.1", + "tokio-tungstenite 0.24.0", "tracing", ] @@ -11556,6 +11557,18 @@ dependencies = [ "tungstenite 0.23.0", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.24.0", +] + [[package]] name = "tokio-util" version = "0.7.12" @@ -11873,6 +11886,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" diff --git a/Cargo.toml b/Cargo.toml index 6168caee77..4caf971602 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -319,7 +319,7 @@ clickhouse-admin-api = { path = "clickhouse-admin/api" } clickhouse-admin-keeper-client = { path = "clients/clickhouse-admin-keeper-client" } clickhouse-admin-server-client = { path = "clients/clickhouse-admin-server-client" } clickhouse-admin-types = { path = "clickhouse-admin/types" } -clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "ceec762e6a87d2a22bf56792a3025e145caa095e" } +clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "a1b342c2558e835d09e6e39a40d3de798a29c2f" } cockroach-admin-api = { path = "cockroach-admin/api" } cockroach-admin-client = { path = "clients/cockroach-admin-client" } cockroach-admin-types = { path = "cockroach-admin/types" } @@ -520,7 +520,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev = propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" } propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" } proptest = "1.5.0" -qorb = "0.0.2" +qorb = "0.1.1" quote = "1.0" rand = "0.8.5" rand_core = "0.6.4" diff --git a/dev-tools/omdb/src/bin/omdb/oxql.rs b/dev-tools/omdb/src/bin/omdb/oxql.rs index 28f405e067..172344b56c 100644 --- a/dev-tools/omdb/src/bin/omdb/oxql.rs +++ b/dev-tools/omdb/src/bin/omdb/oxql.rs @@ -31,6 +31,15 @@ pub struct OxqlArgs { )] clickhouse_url: Option, + /// URL of the ClickHouse server to connect to for the native protcol. + #[arg( + long, + env = "OMDB_CLICKHOUSE_NATIVE_URL", + global = true, + help_heading = CONNECTION_OPTIONS_HEADING, + )] + clickhouse_native_url: Option, + /// Print summaries of each SQL query run against the database. #[clap(long = "summaries")] print_summaries: bool, @@ -47,7 +56,8 @@ impl OxqlArgs { omdb: &Omdb, log: &Logger, ) -> anyhow::Result<()> { - let addr = self.addr(omdb, log).await?; + let http_addr = self.resolve_http_addr(omdb, log).await?; + let native_addr = self.resolve_native_addr(omdb, log).await?; let opts = ShellOptions { print_summaries: self.print_summaries, @@ -55,21 +65,53 @@ impl OxqlArgs { }; oxql::shell( - addr.ip(), - addr.port(), + http_addr.ip(), + http_addr.port(), + native_addr.port(), log.new(slog::o!("component" => "clickhouse-client")), opts, ) .await } - /// Resolve the ClickHouse URL to a socket address. - async fn addr( + /// Resolve the ClickHouse native TCP socket address. + async fn resolve_native_addr( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result { + self.resolve_addr( + omdb, + log, + self.clickhouse_native_url.as_deref(), + ServiceName::ClickhouseNative, + ) + .await + } + + /// Resolve the ClickHouse HTTP URL to a socket address. + async fn resolve_http_addr( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result { + self.resolve_addr( + omdb, + log, + self.clickhouse_url.as_deref(), + ServiceName::Clickhouse, + ) + .await + } + + async fn resolve_addr( &self, omdb: &Omdb, log: &Logger, + maybe_url: Option<&str>, + srv: ServiceName, ) -> anyhow::Result { - match &self.clickhouse_url { + match maybe_url { Some(cli_or_env_url) => Url::parse(&cli_or_env_url) .context( "failed parsing URL from command-line or environment variable", @@ -87,7 +129,7 @@ impl OxqlArgs { Ok(SocketAddr::V6( omdb.dns_lookup_one( log.clone(), - ServiceName::Clickhouse, + srv, ) .await .context("failed looking up ClickHouse internal DNS entry")?, diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 770b5ac61b..8203cf4d48 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -12,7 +12,7 @@ use internal_dns_types::names::ServiceName; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; use nexus_db_queries::db::DataStore; -use omicron_common::address::CLICKHOUSE_HTTP_PORT; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::api::external::{DataPageParams, Error, ListResultVec}; use omicron_common::api::internal::nexus::{self, ProducerEndpoint}; use oximeter_client::Client as OximeterClient; @@ -60,15 +60,26 @@ impl LazyTimeseriesClient { pub(crate) async fn get( &self, ) -> Result { - let address = match &self.source { - ClientSource::FromIp { address } => *address, - ClientSource::FromDns { resolver } => SocketAddr::new( - resolver.lookup_ip(ServiceName::Clickhouse).await?, - CLICKHOUSE_HTTP_PORT, - ), + let (http_address, native_address) = match &self.source { + ClientSource::FromIp { address } => { + let native_address = + SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT); + (*address, native_address) + } + ClientSource::FromDns { resolver } => { + let http_address = SocketAddr::from( + resolver.lookup_socket_v6(ServiceName::Clickhouse).await?, + ); + let native_address = SocketAddr::from( + resolver + .lookup_socket_v6(ServiceName::ClickhouseNative) + .await?, + ); + (http_address, native_address) + } }; - Ok(oximeter_db::Client::new(address, &self.log)) + Ok(oximeter_db::Client::new(http_address, native_address, &self.log)) } } diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 5bd63765c4..29c6d634a9 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -624,6 +624,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { log.new(o!("component" => "oximeter")), nexus_internal_addr, clickhouse.http_address().port(), + clickhouse.native_address().port(), collector_id, ) .await @@ -1449,11 +1450,16 @@ pub async fn start_sled_agent( pub async fn start_oximeter( log: Logger, nexus_address: SocketAddr, - db_port: u16, + http_port: u16, + native_port: u16, id: Uuid, ) -> Result { let db = oximeter_collector::DbConfig { - address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db_port)), + address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), http_port)), + native_address: Some(SocketAddr::new( + Ipv6Addr::LOCALHOST.into(), + native_port, + )), batch_size: 10, batch_interval: 1, replicated: false, diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 6a63799790..d842a2cc4a 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -118,7 +118,12 @@ async fn test_oximeter_reregistration() { // ClickHouse client for verifying collection. let ch_address = context.clickhouse.http_address().into(); - let client = oximeter_db::Client::new(ch_address, &context.logctx.log); + let native_address = context.clickhouse.native_address().into(); + let client = oximeter_db::Client::new( + ch_address, + native_address, + &context.logctx.log, + ); client .init_single_node_db() .await @@ -302,6 +307,7 @@ async fn test_oximeter_reregistration() { context.logctx.log.new(o!("component" => "oximeter")), context.server.get_http_server_internal_address().await, context.clickhouse.http_address().port(), + context.clickhouse.native_address().port(), oximeter_id, ) .await diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index ce0ab78e61..bcfa3b4f4d 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -16,6 +16,7 @@ use chrono::Utc; use futures::TryStreamExt; use nexus_client::types::IdSortMode; use nexus_client::Client as NexusClient; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::backoff; use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; @@ -34,6 +35,7 @@ use slog::warn; use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::net::SocketAddr; use std::net::SocketAddrV6; use std::ops::Bound; use std::sync::Arc; @@ -383,12 +385,15 @@ pub struct OximeterAgent { impl OximeterAgent { /// Construct a new agent with the given ID and logger. + // TODO(cleanup): Remove this lint when we have only a native resolver. + #[allow(clippy::too_many_arguments)] pub async fn with_id( id: Uuid, address: SocketAddrV6, refresh_interval: Duration, db_config: DbConfig, - resolver: BoxedResolver, + http_resolver: BoxedResolver, + native_resolver: BoxedResolver, log: &Logger, replicated: bool, ) -> Result { @@ -414,7 +419,8 @@ impl OximeterAgent { // - The DB doesn't exist at all. This reports a version number of 0. We // need to create the DB here, at the latest version. This is used in // fresh installations and tests. - let client = Client::new_with_pool(resolver, &log); + let client = + Client::new_with_pool(http_resolver, native_resolver, &log); match client.check_db_is_at_expected_version().await { Ok(_) => {} Err(oximeter_db::Error::DatabaseVersionMismatch { @@ -506,12 +512,18 @@ impl OximeterAgent { // prints the results as they're received. let insertion_log = log.new(o!("component" => "results-sink")); if let Some(db_config) = db_config { - let Some(address) = db_config.address else { + let Some(http_address) = db_config.address else { return Err(Error::Standalone(anyhow!( "Must provide explicit IP address in standalone mode" ))); }; - let client = Client::new(address, &log); + + // Grab the native TCP address, or construct one from the defaults. + let native_address = + db_config.native_address.unwrap_or_else(|| { + SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT) + }); + let client = Client::new(http_address, native_address, &log); let replicated = client.is_oximeter_cluster().await?; if !replicated { client.init_single_node_db().await?; diff --git a/oximeter/collector/src/bin/clickhouse-schema-updater.rs b/oximeter/collector/src/bin/clickhouse-schema-updater.rs index 8e432e87c6..3b3ac78f8a 100644 --- a/oximeter/collector/src/bin/clickhouse-schema-updater.rs +++ b/oximeter/collector/src/bin/clickhouse-schema-updater.rs @@ -12,6 +12,7 @@ use camino::Utf8PathBuf; use clap::Parser; use clap::Subcommand; use omicron_common::address::CLICKHOUSE_HTTP_PORT; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use oximeter_db::model::OXIMETER_VERSION; use oximeter_db::Client; use slog::Drain; @@ -22,13 +23,20 @@ use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::SocketAddrV6; -const DEFAULT_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( +const DEFAULT_HTTP_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( Ipv6Addr::LOCALHOST, CLICKHOUSE_HTTP_PORT, 0, 0, )); +const DEFAULT_NATIVE_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + CLICKHOUSE_TCP_PORT, + 0, + 0, +)); + fn parse_log_level(s: &str) -> anyhow::Result { s.parse().map_err(|_| anyhow!("Invalid log level")) } @@ -37,9 +45,14 @@ fn parse_log_level(s: &str) -> anyhow::Result { #[derive(Clone, Debug, Parser)] struct Args { /// IP address and port at which to access ClickHouse. - #[arg(long, default_value_t = DEFAULT_HOST, env = "CLICKHOUSE_HOST")] + #[arg(long, default_value_t = DEFAULT_HTTP_HOST, env = "CLICKHOUSE_HOST")] host: SocketAddr, + /// IP address and port at which to access ClickHouse via the native TCP + /// protocol. + #[arg(long, default_value_t = DEFAULT_NATIVE_HOST, env = "CLICKHOUSE_NATIVE_HOST")] + native_host: SocketAddr, + /// Directory from which to read schema files for each version. #[arg( short = 's', @@ -87,7 +100,7 @@ fn build_logger(level: Level) -> Logger { async fn main() -> anyhow::Result<()> { let args = Args::parse(); let log = build_logger(args.log_level); - let client = Client::new(args.host, &log); + let client = Client::new(args.host, args.native_host, &log); let is_replicated = client.is_oximeter_cluster().await?; match args.cmd { Cmd::List => { diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 01770c9540..96af7d41c8 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -13,16 +13,16 @@ use dropshot::HttpServer; use dropshot::HttpServerStarter; use internal_dns_types::names::ServiceName; use omicron_common::address::get_internal_dns_server_addresses; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::address::DNS_PORT; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::backoff; use omicron_common::FileKv; use qorb::backend; -use qorb::resolver::AllBackends; use qorb::resolver::BoxedResolver; -use qorb::resolver::Resolver; use qorb::resolvers::dns::DnsResolver; use qorb::resolvers::dns::DnsResolverConfig; +use qorb::resolvers::single_host::SingleHostResolver; use qorb::service; use serde::Deserialize; use serde::Serialize; @@ -33,14 +33,12 @@ use slog::o; use slog::warn; use slog::Drain; use slog::Logger; -use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::Path; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::watch; use uuid::Uuid; mod agent; @@ -78,12 +76,18 @@ impl From for HttpError { /// Configuration for interacting with the metric database. #[derive(Debug, Clone, Copy, Deserialize, Serialize)] pub struct DbConfig { - /// Optional address of the ClickHouse server. + /// Optional address of the ClickHouse server's HTTP interface. /// /// If "None", will be inferred from DNS. #[serde(default, skip_serializing_if = "Option::is_none")] pub address: Option, + /// Optional address of the ClickHouse server's native TCP interface. + /// + /// If None, will be inferred from DNS. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub native_address: Option, + /// Batch size of samples at which to insert. pub batch_size: usize, @@ -114,6 +118,7 @@ impl DbConfig { fn with_address(address: SocketAddr) -> Self { Self { address: Some(address), + native_address: None, batch_size: Self::DEFAULT_BATCH_SIZE, batch_interval: Self::DEFAULT_BATCH_INTERVAL, replicated: Self::DEFAULT_REPLICATED, @@ -163,29 +168,6 @@ pub struct OximeterArguments { pub address: SocketAddrV6, } -// Provides an alternative to the DNS resolver for cases where we want to -// contact a backend without performing resolution. -struct SingleHostResolver { - tx: watch::Sender, -} - -impl SingleHostResolver { - fn new(address: SocketAddr) -> Self { - let backends = Arc::new(BTreeMap::from([( - backend::Name::new("singleton"), - backend::Backend { address }, - )])); - let (tx, _rx) = watch::channel(backends.clone()); - Self { tx } - } -} - -impl Resolver for SingleHostResolver { - fn monitor(&mut self) -> watch::Receiver { - self.tx.subscribe() - } -} - // A "qorb connector" which converts a SocketAddr into a nexus_client::Client. struct NexusConnector { log: Logger, @@ -258,35 +240,55 @@ impl Oximeter { .map(|ip| SocketAddr::new(ip, DNS_PORT)) .collect(); - let make_clickhouse_resolver = || -> BoxedResolver { + // Closure to create _two_ resolvers, one to resolve the ClickHouse HTTP + // SRV record, and one for the native TCP record. + // + // TODO(cleanup): This should be removed if / when we entirely switch to + // the native protocol. + let make_clickhouse_resolvers = || -> (BoxedResolver, BoxedResolver) { if let Some(address) = config.db.address { - Box::new(SingleHostResolver::new(address)) + let http = Box::new(SingleHostResolver::new(address)); + let native_addr = + SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT); + let native = Box::new(SingleHostResolver::new(native_addr)); + (http, native) } else { - let service = if config.db.replicated { + let http_service = if config.db.replicated { ServiceName::ClickhouseServer } else { ServiceName::Clickhouse }; - Box::new(DnsResolver::new( - service::Name(service.srv_name()), + let http = Box::new(DnsResolver::new( + service::Name(http_service.srv_name()), + bootstrap_dns.clone(), + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )); + let native = Box::new(DnsResolver::new( + service::Name(ServiceName::ClickhouseNative.srv_name()), bootstrap_dns.clone(), DnsResolverConfig { hardcoded_ttl: Some(tokio::time::Duration::MAX), ..Default::default() }, - )) + )); + (http, native) } }; let make_agent = || async { debug!(log, "creating ClickHouse client"); + let (http_resolver, native_resolver) = make_clickhouse_resolvers(); Ok(Arc::new( OximeterAgent::with_id( args.id, args.address, config.refresh_interval, config.db, - make_clickhouse_resolver(), + http_resolver, + native_resolver, &log, config.db.replicated, ) diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index 38b22e83e9..b0afdfbb07 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -24,6 +24,7 @@ futures.workspace = true gethostname.workspace = true highway.workspace = true iana-time-zone.workspace = true +indexmap.workspace = true libc.workspace = true nom.workspace = true num.workspace = true @@ -56,10 +57,6 @@ optional = true workspace = true optional = true -[dependencies.indexmap] -workspace = true -optional = true - [dependencies.peg] workspace = true optional = true @@ -100,7 +97,6 @@ features = [ "rt-multi-thread", "macros" ] camino-tempfile.workspace = true criterion = { workspace = true, features = [ "async_tokio" ] } expectorate.workspace = true -indexmap.workspace = true itertools.workspace = true omicron-test-utils.workspace = true oximeter-test-utils.workspace = true @@ -111,9 +107,8 @@ strum.workspace = true tempfile.workspace = true [features] -default = [ "native-sql", "oxql", "sql" ] +default = [ "native-sql-shell", "oxql", "sql" ] sql = [ - "dep:indexmap", "dep:reedline", "dep:rustyline", "dep:sqlformat", @@ -126,10 +121,9 @@ oxql = [ "dep:reedline", "dep:tabled", ] -native-sql = [ +native-sql-shell = [ "dep:crossterm", "dep:display-error-chain", - "dep:indexmap", "dep:reedline", "dep:rustyline", "dep:sqlformat", diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index 2f80518145..ad5018eee5 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -10,6 +10,7 @@ use anyhow::{bail, Context}; use chrono::{DateTime, Utc}; use clap::{Args, Parser}; +use omicron_common::address::CLICKHOUSE_TCP_PORT; use oximeter::{ types::{Cumulative, Sample}, Metric, Target, @@ -59,10 +60,14 @@ struct OxDb { #[clap(short, long, default_value = "::1")] address: IpAddr, - /// Port on which to connect to the database + /// Port on which to connect to the database using the HTTP interface. #[clap(short, long, default_value = "8123", action)] port: u16, + /// Port on which to connect to the database using the native TCP interface. + #[clap(short, long, default_value_t = CLICKHOUSE_TCP_PORT)] + native_port: u16, + /// Logging level #[clap(short, long, default_value = "info", value_parser = level_from_str)] log_level: Level, @@ -155,7 +160,7 @@ enum Subcommand { }, /// Start a native SQL shell to a ClickHouse server. - #[cfg(feature = "native-sql")] + #[cfg(feature = "native-sql-shell")] NativeSql, } @@ -214,12 +219,13 @@ async fn insert_samples( async fn populate( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, args: PopulateArgs, ) -> Result<(), anyhow::Error> { info!(log, "populating Oximeter database"); - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; let n_timeseries = args.n_projects * args.n_instances * args.n_cpus; debug!( log, @@ -268,23 +274,26 @@ async fn populate( async fn wipe_single_node_db( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, ) -> Result<(), anyhow::Error> { - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; client.wipe_single_node_db().await.context("Failed to wipe database") } +#[allow(clippy::too_many_arguments)] async fn query( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, timeseries_name: String, filters: Vec, start: Option, end: Option, ) -> Result<(), anyhow::Error> { - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; let filters = filters.iter().map(|s| s.as_str()).collect::>(); let timeseries = client .select_timeseries_with( @@ -316,10 +325,18 @@ async fn main() -> anyhow::Result<()> { match args.cmd { Subcommand::Describe => describe_data(), Subcommand::Populate { populate_args } => { - populate(args.address, args.port, log, populate_args).await? + populate( + args.address, + args.port, + args.native_port, + log, + populate_args, + ) + .await? } Subcommand::Wipe => { - wipe_single_node_db(args.address, args.port, log).await? + wipe_single_node_db(args.address, args.port, args.native_port, log) + .await? } Subcommand::Query { timeseries_name, @@ -342,6 +359,7 @@ async fn main() -> anyhow::Result<()> { query( args.address, args.port, + args.native_port, log, timeseries_name, filters, @@ -352,17 +370,30 @@ async fn main() -> anyhow::Result<()> { } #[cfg(feature = "sql")] Subcommand::Sql { opts } => { - oximeter_db::shells::sql::shell(args.address, args.port, log, opts) - .await? + oximeter_db::shells::sql::shell( + args.address, + args.port, + args.native_port, + log, + opts, + ) + .await? } #[cfg(feature = "oxql")] Subcommand::Oxql { opts } => { - oximeter_db::shells::oxql::shell(args.address, args.port, log, opts) - .await? + oximeter_db::shells::oxql::shell( + args.address, + args.port, + args.native_port, + log, + opts, + ) + .await? } - #[cfg(feature = "native-sql")] + #[cfg(feature = "native-sql-shell")] Subcommand::NativeSql => { - oximeter_db::shells::native::shell(args.address).await? + oximeter_db::shells::native::shell(args.address, args.native_port) + .await? } } Ok(()) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 22caaa7312..39fb0526e3 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -17,6 +17,8 @@ pub use self::dbwrite::DbWrite; pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; use crate::model; +use crate::native; +use crate::native::connection::default_pool_policy; use crate::query; use crate::Error; use crate::Metric; @@ -39,6 +41,7 @@ use qorb::backend; use qorb::backend::Error as QorbError; use qorb::pool::Pool; use qorb::resolver::BoxedResolver; +use qorb::resolvers::single_host::SingleHostResolver; use regex::Regex; use regex::RegexBuilder; use slog::debug; @@ -170,14 +173,21 @@ impl ClientVariant { pub struct Client { _id: Uuid, log: Logger, + // Source for creating HTTP connections. source: ClientSource, + // qorb pool for native TCP connections. + native_pool: DebugIgnore, schema: Mutex>, request_timeout: Duration, } impl Client { /// Construct a Clickhouse client of the database with a connection pool. - pub fn new_with_pool(resolver: BoxedResolver, log: &Logger) -> Self { + pub fn new_with_pool( + http_resolver: BoxedResolver, + native_resolver: BoxedResolver, + log: &Logger, + ) -> Self { let id = Uuid::new_v4(); let log = log.new(slog::o!( "component" => "clickhouse-client", @@ -190,25 +200,40 @@ impl Client { log, source: ClientSource::Pool { pool: DebugIgnore(Pool::new( - resolver, + http_resolver, Arc::new(ReqwestConnector {}), qorb::policy::Policy::default(), )), }, + native_pool: DebugIgnore(Pool::new( + native_resolver, + Arc::new(native::connection::Connector), + default_pool_policy(), + )), schema, request_timeout, } } /// Construct a new ClickHouse client of the database at `address`. - pub fn new(address: SocketAddr, log: &Logger) -> Self { - Self::new_with_request_timeout(address, log, DEFAULT_REQUEST_TIMEOUT) + pub fn new( + http_address: SocketAddr, + native_address: SocketAddr, + log: &Logger, + ) -> Self { + Self::new_with_request_timeout( + http_address, + native_address, + log, + DEFAULT_REQUEST_TIMEOUT, + ) } /// Construct a new ClickHouse client of the database at `address`, and a /// custom request timeout. pub fn new_with_request_timeout( - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, log: &Logger, request_timeout: Duration, ) -> Self { @@ -218,12 +243,17 @@ impl Client { "id" => id.to_string(), )); let client = reqwest::Client::new(); - let url = format!("http://{}", address); + let url = format!("http://{}", http_address); let schema = Mutex::new(BTreeMap::new()); Self { _id: id, log, source: ClientSource::Static(ReqwestClient { url, client }), + native_pool: DebugIgnore(Pool::new( + Box::new(SingleHostResolver::new(native_address)), + Arc::new(native::connection::Connector), + default_pool_policy(), + )), schema, request_timeout, } @@ -242,18 +272,8 @@ impl Client { /// Ping the ClickHouse server to verify connectivitiy. pub async fn ping(&self) -> Result<(), Error> { - let client = ClientVariant::new(&self.source).await?; - - handle_db_response( - client - .reqwest() - .get(format!("{}/ping", client.url())) - .send() - .await - .map_err(|err| Error::DatabaseUnavailable(err.to_string()))?, - ) - .await?; - debug!(self.log, "successful ping of ClickHouse server"); + self.native_pool.claim().await?.ping().await.map_err(Error::Native)?; + trace!(self.log, "successful ping of ClickHouse server"); Ok(()) } @@ -1162,7 +1182,7 @@ impl Client { self.expunge_timeseries_by_name_once(replicated, to_delete) .await .map_err(|err| match err { - Error::DatabaseUnavailable(_) => { + Error::DatabaseUnavailable(_) | Error::Connection(_) => { backoff::BackoffError::transient(err) } _ => backoff::BackoffError::permanent(err), @@ -1500,7 +1520,8 @@ mod tests { let logctx = test_setup_log("test_replicated"); let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); - let client = Client::new(address, &logctx.log); + let native_address = cluster.native_address().into(); + let client = Client::new(address, native_address, &logctx.log); let futures: Vec<(&'static str, AsyncTest)> = vec![ ( "test_is_oximeter_cluster_replicated", @@ -1656,7 +1677,8 @@ mod tests { for (test_name, mut test) in futures { let testctx = test_setup_log(test_name); init_db(&cluster, &client).await; - test(&cluster, Client::new(address, &logctx.log)).await; + test(&cluster, Client::new(address, native_address, &logctx.log)) + .await; wipe_db(&cluster, &client).await; testctx.cleanup_successful(); } @@ -1665,14 +1687,34 @@ mod tests { } #[tokio::test] - async fn bad_db_connection_test() { - let logctx = test_setup_log("test_bad_db_connection"); + async fn cannot_ping_nonexistent_server() { + let logctx = test_setup_log("cannot_ping_nonexistent_server"); let log = &logctx.log; - let client = Client::new("127.0.0.1:443".parse().unwrap(), &log); - assert!(matches!( - client.ping().await, - Err(Error::DatabaseUnavailable(_)) - )); + let dont_care = "127.0.0.1:443".parse().unwrap(); + let bad_addr = "[::1]:80".parse().unwrap(); + let client = Client::new(dont_care, bad_addr, &log); + let e = client + .ping() + .await + .expect_err("Should fail to ping non-existent server"); + let Error::Connection(qorb::pool::Error::TimedOut) = &e else { + panic!("Expected connection error, found {e:?}"); + }; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn can_ping_clickhouse() { + let logctx = test_setup_log("can_ping_clickhouse"); + let mut db = + ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); + client.ping().await.expect("Should be able to ping existing server"); + db.cleanup().await.unwrap(); logctx.cleanup_successful(); } @@ -1681,7 +1723,11 @@ mod tests { let logctx = test_setup_log("test_is_oximeter_cluster"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_is_oximeter_cluster_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1704,7 +1750,11 @@ mod tests { let logctx = test_setup_log("test_insert_samples"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_insert_samples_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1753,7 +1803,11 @@ mod tests { let logctx = test_setup_log("test_schema_mismatch"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_schema_mismatch_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1786,7 +1840,11 @@ mod tests { let logctx = test_setup_log("test_schema_update"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_schema_updated_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1865,7 +1923,11 @@ mod tests { let logctx = test_setup_log("test_client_select_timeseries_one"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_client_select_timeseries_one_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -1953,7 +2015,11 @@ mod tests { let logctx = test_setup_log("test_field_record_cont"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_field_record_count_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2011,7 +2077,11 @@ mod tests { let logctx = test_setup_log("test_unquoted_64bit_integers"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_unquoted_64bit_integers_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2045,7 +2115,11 @@ mod tests { let logctx = test_setup_log("test_differentiate_by_timeseries_name"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_differentiate_by_timeseries_name_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2115,7 +2189,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_select_one"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_select_one_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2180,7 +2258,11 @@ mod tests { ); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_select_one_field_with_multiple_values_impl( &db, client, @@ -2253,7 +2335,11 @@ mod tests { test_setup_log("test_select_timeseries_with_select_multiple_fields_with_multiple_values"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_select_multiple_fields_with_multiple_values_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2329,7 +2415,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_all"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_all_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2390,7 +2480,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_start_time"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_start_time_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2441,7 +2535,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_limit"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_limit_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2560,7 +2658,11 @@ mod tests { let logctx = test_setup_log("test_select_timeseries_with_order"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_timeseries_with_order_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2662,7 +2764,11 @@ mod tests { let logctx = test_setup_log("test_get_schema_no_new_values"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_get_schema_no_new_values_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2690,7 +2796,11 @@ mod tests { let logctx = test_setup_log("test_timeseries_schema_list"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_timeseries_schema_list_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -2729,7 +2839,11 @@ mod tests { let logctx = test_setup_log("test_list_timeseries"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_list_timeseries_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3304,7 +3418,11 @@ mod tests { let logctx = test_setup_log("test_recall_of_all_fields"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_recall_of_all_fields_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3360,7 +3478,11 @@ mod tests { test_setup_log("test_database_version_update_is_idempotent"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_update_is_idempotent_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3401,7 +3523,11 @@ mod tests { let logctx = test_setup_log("test_database_version_will_not_downgrade"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_will_not_downgrade_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3440,7 +3566,11 @@ mod tests { let logctx = test_setup_log("test_database_version_wipes_old_version"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); // NOTE: We don't init the DB, because the test explicitly tests that. test_database_version_wipes_old_version_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3480,7 +3610,11 @@ mod tests { let logctx = test_setup_log("test_update_schema_cache_on_new_sample"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_update_schema_cache_on_new_sample_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3530,7 +3664,11 @@ mod tests { let logctx = test_setup_log("test_select_all_datum_types"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_select_all_datum_types_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3569,7 +3707,11 @@ mod tests { test_setup_log("test_new_schema_removed_when_not_inserted"); let mut db = ClickHouseDeployment::new_single_node(&logctx).await.unwrap(); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); init_db(&db, &client).await; test_new_schema_removed_when_not_inserted_impl(&db, client).await; db.cleanup().await.unwrap(); @@ -3787,14 +3929,15 @@ mod tests { async fn test_apply_one_schema_upgrade_impl( log: &Logger, - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, replicated: bool, ) { let test_name = format!( "test_apply_one_schema_upgrade_{}", if replicated { "replicated" } else { "single_node" } ); - let client = Client::new(address, &log); + let client = Client::new(http_address, native_address, &log); // We'll test moving from version 1, which just creates a database and // table, to version 2, which adds two columns to that table in @@ -3871,7 +4014,13 @@ mod tests { let log = &logctx.log; let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); - test_apply_one_schema_upgrade_impl(log, address, true).await; + test_apply_one_schema_upgrade_impl( + log, + address, + cluster.native_address().into(), + true, + ) + .await; cluster.cleanup().await.expect("Failed to cleanup ClickHouse cluster"); logctx.cleanup_successful(); } @@ -3885,7 +4034,13 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - test_apply_one_schema_upgrade_impl(log, address, false).await; + test_apply_one_schema_upgrade_impl( + log, + address, + db.native_address().into(), + false, + ) + .await; db.cleanup().await.expect("Failed to cleanup ClickHouse server"); logctx.cleanup_successful(); } @@ -3899,7 +4054,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); const REPLICATED: bool = false; client .initialize_db_with_version( @@ -3942,7 +4097,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); const REPLICATED: bool = false; client .initialize_db_with_version( @@ -3976,14 +4131,15 @@ mod tests { async fn test_ensure_schema_walks_through_multiple_steps_impl( log: &Logger, - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, replicated: bool, ) { let test_name = format!( "test_ensure_schema_walks_through_multiple_steps_{}", if replicated { "replicated" } else { "single_node" } ); - let client = Client::new(address, &log); + let client = Client::new(http_address, native_address, &log); // We need to actually have the oximeter DB here, and the version table, // since `ensure_schema()` writes out versions to the DB as they're @@ -4076,7 +4232,10 @@ mod tests { .expect("Failed to start ClickHouse"); let address = db.http_address().into(); test_ensure_schema_walks_through_multiple_steps_impl( - log, address, false, + log, + address, + db.native_address().into(), + false, ) .await; db.cleanup().await.expect("Failed to cleanup ClickHouse server"); @@ -4092,7 +4251,10 @@ mod tests { let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); test_ensure_schema_walks_through_multiple_steps_impl( - log, address, true, + log, + address, + cluster.native_address().into(), + true, ) .await; cluster.cleanup().await.expect("Failed to clean up ClickHouse cluster"); @@ -4172,7 +4334,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); client .init_single_node_db() .await @@ -4204,7 +4366,7 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = db.http_address().into(); - let client = Client::new(address, &log); + let client = Client::new(address, db.native_address().into(), &log); client .initialize_db_with_version(false, OXIMETER_VERSION) .await @@ -4353,7 +4515,11 @@ mod tests { .await .expect("Failed to start ClickHouse") }; - let client = Client::new(db.http_address().into(), &log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &log, + ); // Let's start with version 2, which is the first tracked and contains // the full SQL files we need to populate the DB. @@ -4546,6 +4712,7 @@ mod tests { test_expunge_timeseries_by_name_impl( log, db.http_address().into(), + db.native_address().into(), false, ) .await; @@ -4559,7 +4726,13 @@ mod tests { let logctx = test_setup_log(TEST_NAME); let mut cluster = create_cluster(&logctx).await; let address = cluster.http_address().into(); - test_expunge_timeseries_by_name_impl(&logctx.log, address, true).await; + test_expunge_timeseries_by_name_impl( + &logctx.log, + address, + cluster.native_address().into(), + true, + ) + .await; cluster.cleanup().await.expect("Failed to cleanup ClickHouse cluster"); logctx.cleanup_successful(); } @@ -4568,11 +4741,12 @@ mod tests { // upgrade. async fn test_expunge_timeseries_by_name_impl( log: &Logger, - address: SocketAddr, + http_address: SocketAddr, + native_address: SocketAddr, replicated: bool, ) { usdt::register_probes().unwrap(); - let client = Client::new(address, &log); + let client = Client::new(http_address, native_address, &log); const STARTING_VERSION: u64 = 1; const NEXT_VERSION: u64 = 2; diff --git a/oximeter/db/src/client/oxql.rs b/oximeter/db/src/client/oxql.rs index 4fdfc71b76..d9f3295375 100644 --- a/oximeter/db/src/client/oxql.rs +++ b/oximeter/db/src/client/oxql.rs @@ -1259,7 +1259,11 @@ mod tests { let db = ClickHouseDeployment::new_single_node(&logctx) .await .expect("Failed to start ClickHouse"); - let client = Client::new(db.http_address().into(), &logctx.log); + let client = Client::new( + db.http_address().into(), + db.native_address().into(), + &logctx.log, + ); client .init_single_node_db() .await diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index d7d3c3e730..9e13cd64e0 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -34,12 +34,16 @@ use thiserror::Error; mod client; pub mod model; -#[cfg(feature = "native-sql")] pub mod native; #[cfg(any(feature = "oxql", test))] pub mod oxql; pub mod query; -#[cfg(any(feature = "oxql", feature = "sql", feature = "native-sql", test))] +#[cfg(any( + feature = "oxql", + feature = "sql", + feature = "native-sql-shell", + test +))] pub mod shells; #[cfg(any(feature = "sql", test))] pub mod sql; @@ -163,6 +167,9 @@ pub enum Error { #[cfg(any(feature = "oxql", test))] #[error(transparent)] Oxql(oxql::Error), + + #[error("Native protocol error")] + Native(#[source] crate::native::Error), } #[cfg(any(feature = "oxql", test))] @@ -258,11 +265,13 @@ pub struct TimeseriesPageSelector { /// Create a client to the timeseries database, and ensure the database exists. pub async fn make_client( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: &Logger, ) -> Result { - let address = SocketAddr::new(address, port); - let client = Client::new(address, &log); + let http_address = SocketAddr::new(address, http_port); + let native_address = SocketAddr::new(address, native_port); + let client = Client::new(http_address, native_address, &log); client .init_single_node_db() .await diff --git a/oximeter/db/src/native/block.rs b/oximeter/db/src/native/block.rs index a51b27352f..34fbaa4c25 100644 --- a/oximeter/db/src/native/block.rs +++ b/oximeter/db/src/native/block.rs @@ -7,21 +7,25 @@ //! Types for working with actual blocks and columns of data. use super::Error; -use chrono::{DateTime, NaiveDate}; +use chrono::DateTime; +use chrono::NaiveDate; use chrono_tz::Tz; use indexmap::IndexMap; -use nom::{ - bytes::complete::{tag, take_while1}, - character::complete::u8 as nom_u8, - combinator::{eof, map, map_opt, opt}, - sequence::{delimited, preceded, tuple}, - IResult, -}; -use std::{ - fmt, - net::{Ipv4Addr, Ipv6Addr}, - sync::LazyLock, -}; +use nom::bytes::complete::tag; +use nom::bytes::complete::take_while1; +use nom::character::complete::u8 as nom_u8; +use nom::combinator::eof; +use nom::combinator::map; +use nom::combinator::map_opt; +use nom::combinator::opt; +use nom::sequence::delimited; +use nom::sequence::preceded; +use nom::sequence::tuple; +use nom::IResult; +use std::fmt; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; +use std::sync::LazyLock; use uuid::Uuid; /// A set of rows and columns. @@ -740,12 +744,17 @@ impl std::str::FromStr for DataType { #[cfg(test)] mod tests { - use super::{ - Block, BlockInfo, Column, DataType, Precision, ValueArray, - DEFAULT_TIMEZONE, - }; - use crate::native::block::{datetime, datetime64}; - use chrono::{SubsecRound as _, Utc}; + use super::Block; + use super::BlockInfo; + use super::Column; + use super::DataType; + use super::Precision; + use super::ValueArray; + use super::DEFAULT_TIMEZONE; + use crate::native::block::datetime; + use crate::native::block::datetime64; + use chrono::SubsecRound as _; + use chrono::Utc; use chrono_tz::Tz; use indexmap::IndexMap; diff --git a/oximeter/db/src/native/connection.rs b/oximeter/db/src/native/connection.rs index f6367ca126..c8ee91ecaf 100644 --- a/oximeter/db/src/native/connection.rs +++ b/oximeter/db/src/native/connection.rs @@ -6,29 +6,80 @@ //! A connection and pool for talking to the ClickHouse server. +use super::io::packet::client::Encoder; +use super::io::packet::server::Decoder; +use super::packets::client::Packet as ClientPacket; +use super::packets::client::Query; +use super::packets::client::QueryResult; +use super::packets::client::OXIMETER_HELLO; +use super::packets::client::VERSION_MAJOR; +use super::packets::client::VERSION_MINOR; +use super::packets::client::VERSION_PATCH; +use super::packets::server::Hello as ServerHello; use super::packets::server::Packet as ServerPacket; -use super::packets::{ - client::{Packet as ClientPacket, Query, QueryResult}, - server::Progress, -}; -use super::{ - io::packet::{client::Encoder, server::Decoder}, - packets::{ - client::{OXIMETER_HELLO, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}, - server::{Hello as ServerHello, REVISION}, - }, - Error, -}; +use super::packets::server::Progress; +use super::packets::server::REVISION; +use super::Error; use crate::native::probes; -use futures::{SinkExt as _, StreamExt as _}; +use futures::SinkExt as _; +use futures::StreamExt as _; +use qorb::backend; +use qorb::backend::Error as QorbError; use std::net::SocketAddr; -use tokio::net::{ - tcp::{OwnedReadHalf, OwnedWriteHalf}, - TcpStream, -}; -use tokio_util::codec::{FramedRead, FramedWrite}; +use tokio::net::tcp::OwnedReadHalf; +use tokio::net::tcp::OwnedWriteHalf; +use tokio::net::TcpStream; +use tokio_util::codec::FramedRead; +use tokio_util::codec::FramedWrite; use uuid::Uuid; +/// A pool of connections to a ClickHouse server over the native protocol. +pub type Pool = qorb::pool::Pool; + +/// The default connection pooling policy for native TCP connections. +pub fn default_pool_policy() -> qorb::policy::Policy { + qorb::policy::Policy { + claim_timeout: std::time::Duration::from_secs(10), + ..Default::default() + } +} + +/// A type for making connections to a ClickHouse server. +#[derive(Clone, Copy, Debug)] +pub struct Connector; + +impl From for QorbError { + fn from(e: Error) -> Self { + QorbError::Other(anyhow::anyhow!(e)) + } +} + +#[async_trait::async_trait] +impl backend::Connector for Connector { + type Connection = Connection; + + async fn connect( + &self, + backend: &backend::Backend, + ) -> Result { + Connection::new(backend.address).await.map_err(QorbError::from) + } + + async fn is_valid( + &self, + conn: &mut Self::Connection, + ) -> Result<(), QorbError> { + conn.ping().await.map_err(QorbError::from) + } + + async fn on_recycle( + &self, + conn: &mut Self::Connection, + ) -> Result<(), QorbError> { + conn.cancel().await.map_err(QorbError::from) + } +} + /// A connection to a ClickHouse server. /// /// This connection object can be used to talk to a ClickHouse server through @@ -227,16 +278,14 @@ impl Connection { #[cfg(test)] mod tests { + use crate::native::block::DataType; + use crate::native::block::ValueArray; + use crate::native::connection::Connection; + use omicron_test_utils::dev::clickhouse::ClickHouseDeployment; + use omicron_test_utils::dev::test_setup_log; use std::sync::Arc; - - use crate::native::{ - block::{DataType, ValueArray}, - connection::Connection, - }; - use omicron_test_utils::dev::{ - clickhouse::ClickHouseDeployment, test_setup_log, - }; - use tokio::sync::{oneshot, Mutex}; + use tokio::sync::oneshot; + use tokio::sync::Mutex; #[tokio::test] async fn test_exchange_hello() { diff --git a/oximeter/db/src/native/io/block.rs b/oximeter/db/src/native/io/block.rs index e6ab6c8ba1..2a3645a150 100644 --- a/oximeter/db/src/native/io/block.rs +++ b/oximeter/db/src/native/io/block.rs @@ -6,10 +6,13 @@ //! Encoding and decoding data blocks. -use crate::native::block::{Block, BlockInfo}; +use crate::native::block::Block; +use crate::native::block::BlockInfo; use crate::native::io; use crate::native::Error; -use bytes::{Buf as _, BufMut as _, BytesMut}; +use bytes::Buf as _; +use bytes::BufMut as _; +use bytes::BytesMut; use indexmap::IndexMap; /// Encode a data packet to the server. @@ -102,7 +105,8 @@ fn encode_block_info(info: BlockInfo, mut dst: &mut BytesMut) { #[cfg(test)] mod tests { use super::*; - use crate::native::block::{Column, ValueArray}; + use crate::native::block::Column; + use crate::native::block::ValueArray; // Expected data block. // diff --git a/oximeter/db/src/native/io/column.rs b/oximeter/db/src/native/io/column.rs index 649d6a044d..2c47029ba7 100644 --- a/oximeter/db/src/native/io/column.rs +++ b/oximeter/db/src/native/io/column.rs @@ -6,13 +6,19 @@ //! Encode / decode a column. -use crate::native::{ - block::{Column, DataType, ValueArray}, - io, Error, -}; -use bytes::{Buf as _, BufMut as _, BytesMut}; -use chrono::{NaiveDate, TimeDelta, TimeZone}; -use std::net::{Ipv4Addr, Ipv6Addr}; +use crate::native::block::Column; +use crate::native::block::DataType; +use crate::native::block::ValueArray; +use crate::native::io; +use crate::native::Error; +use bytes::Buf as _; +use bytes::BufMut as _; +use bytes::BytesMut; +use chrono::NaiveDate; +use chrono::TimeDelta; +use chrono::TimeZone; +use std::net::Ipv4Addr; +use std::net::Ipv6Addr; use uuid::Uuid; // ClickHouse `Date`s are represented as an unsigned 16-bit number of days from diff --git a/oximeter/db/src/native/io/exception.rs b/oximeter/db/src/native/io/exception.rs index 6d35e9d429..31c6ddd23c 100644 --- a/oximeter/db/src/native/io/exception.rs +++ b/oximeter/db/src/native/io/exception.rs @@ -6,7 +6,9 @@ //! Decode server exception packets. -use crate::native::{io, packets::server::Exception, Error}; +use crate::native::io; +use crate::native::packets::server::Exception; +use crate::native::Error; use bytes::Buf as _; /// Decode a list of Exception packets from the server, if possible. diff --git a/oximeter/db/src/native/io/packet/client.rs b/oximeter/db/src/native/io/packet/client.rs index 31880348ae..c8397d68a2 100644 --- a/oximeter/db/src/native/io/packet/client.rs +++ b/oximeter/db/src/native/io/packet/client.rs @@ -7,16 +7,18 @@ //! Encode client packets destined for the server. use crate::native::block::Block; -use crate::native::packets::client::{ - ClientInfo, Query, QueryKind, Settings, Stage, -}; +use crate::native::io; +use crate::native::packets::client::ClientInfo; +use crate::native::packets::client::Hello; +use crate::native::packets::client::Packet; +use crate::native::packets::client::Query; +use crate::native::packets::client::QueryKind; +use crate::native::packets::client::Settings; +use crate::native::packets::client::Stage; use crate::native::probes; -use crate::native::{ - io, - packets::client::{Hello, Packet}, - Error, -}; -use bytes::{BufMut as _, BytesMut}; +use crate::native::Error; +use bytes::BufMut as _; +use bytes::BytesMut; /// Encoder for client packets. #[derive(Clone, Copy, Debug)] diff --git a/oximeter/db/src/native/io/packet/server.rs b/oximeter/db/src/native/io/packet/server.rs index 00d0352857..0ef6d96d4b 100644 --- a/oximeter/db/src/native/io/packet/server.rs +++ b/oximeter/db/src/native/io/packet/server.rs @@ -6,12 +6,14 @@ //! Decode packets from the ClickHouse server. -use crate::native::{ - io, - packets::server::{Hello, Packet, PasswordComplexityRule}, - probes, Error, -}; -use bytes::{Buf as _, BytesMut}; +use crate::native::io; +use crate::native::packets::server::Hello; +use crate::native::packets::server::Packet; +use crate::native::packets::server::PasswordComplexityRule; +use crate::native::probes; +use crate::native::Error; +use bytes::Buf as _; +use bytes::BytesMut; /// A decoder for packets from the ClickHouse server. #[derive(Debug)] @@ -199,7 +201,9 @@ mod tests { use std::time::Duration; use super::*; - use crate::native::packets::server::{Exception, Progress, REVISION}; + use crate::native::packets::server::Exception; + use crate::native::packets::server::Progress; + use crate::native::packets::server::REVISION; use bytes::BufMut as _; use tokio_util::codec::Decoder as _; diff --git a/oximeter/db/src/native/io/progress.rs b/oximeter/db/src/native/io/progress.rs index c60b50cb75..e6e8586c68 100644 --- a/oximeter/db/src/native/io/progress.rs +++ b/oximeter/db/src/native/io/progress.rs @@ -6,7 +6,8 @@ //! Decode progress packets from the server. -use crate::native::{io, packets::server::Progress}; +use crate::native::io; +use crate::native::packets::server::Progress; use std::time::Duration; /// Decode a progress packet from the server, if possible. diff --git a/oximeter/db/src/native/io/string.rs b/oximeter/db/src/native/io/string.rs index e93ba67c2d..c8b838601a 100644 --- a/oximeter/db/src/native/io/string.rs +++ b/oximeter/db/src/native/io/string.rs @@ -12,7 +12,8 @@ use super::varuint; use crate::native::Error; -use bytes::{Buf, BufMut}; +use bytes::Buf; +use bytes::BufMut; /// Encode a string into the ClickHouse format. pub fn encode(s: impl AsRef, mut buf: impl BufMut) { diff --git a/oximeter/db/src/native/io/varuint.rs b/oximeter/db/src/native/io/varuint.rs index 0476c83aff..1b9e561c3f 100644 --- a/oximeter/db/src/native/io/varuint.rs +++ b/oximeter/db/src/native/io/varuint.rs @@ -15,7 +15,8 @@ //! iterating over the values (especially since ClickHouse does not append a //! fixed-size length header to its messages). -use bytes::{Buf, BufMut}; +use bytes::Buf; +use bytes::BufMut; /// Encode a u64 as a variable-length integer, returning the number of bytes /// written. diff --git a/oximeter/db/src/native/mod.rs b/oximeter/db/src/native/mod.rs index 15d013fa35..ce7efcc8d3 100644 --- a/oximeter/db/src/native/mod.rs +++ b/oximeter/db/src/native/mod.rs @@ -120,6 +120,7 @@ //! actually sent if we believe we have an outstanding query. pub use connection::Connection; +pub use connection::Pool; pub use packets::client::QueryResult; pub use packets::server::Exception; diff --git a/oximeter/db/src/native/packets/client.rs b/oximeter/db/src/native/packets/client.rs index 759aaabe6e..7d32ba11d8 100644 --- a/oximeter/db/src/native/packets/client.rs +++ b/oximeter/db/src/native/packets/client.rs @@ -6,11 +6,13 @@ //! Packets sent from client to server. -use super::server::{ProfileInfo, Progress}; +use super::server::ProfileInfo; +use super::server::Progress; use crate::native::block::Block; -use std::{ - borrow::Cow, collections::BTreeMap, net::SocketAddr, sync::LazyLock, -}; +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::net::SocketAddr; +use std::sync::LazyLock; use uuid::Uuid; /// A packet sent from client to server in the native protocol. diff --git a/oximeter/db/src/native/packets/server.rs b/oximeter/db/src/native/packets/server.rs index 098e120cfa..864d9397cf 100644 --- a/oximeter/db/src/native/packets/server.rs +++ b/oximeter/db/src/native/packets/server.rs @@ -6,7 +6,8 @@ //! Packets sent from the server. -use std::{fmt, time::Duration}; +use std::fmt; +use std::time::Duration; use crate::native::block::Block; diff --git a/oximeter/db/src/shells/mod.rs b/oximeter/db/src/shells/mod.rs index eb9a9bd39a..ccad0010aa 100644 --- a/oximeter/db/src/shells/mod.rs +++ b/oximeter/db/src/shells/mod.rs @@ -11,7 +11,7 @@ use dropshot::EmptyScanParams; use dropshot::WhichPage; use oximeter::TimeseriesSchema; -#[cfg(any(feature = "native-sql", test))] +#[cfg(any(feature = "native-sql-shell", test))] pub mod native; #[cfg(any(feature = "oxql", test))] pub mod oxql; diff --git a/oximeter/db/src/shells/native.rs b/oximeter/db/src/shells/native.rs index c0e86367da..f513435275 100644 --- a/oximeter/db/src/shells/native.rs +++ b/oximeter/db/src/shells/native.rs @@ -10,15 +10,14 @@ use crate::native::{self, block::ValueArray, QueryResult}; use anyhow::Context as _; use crossterm::style::Stylize; use display_error_chain::DisplayErrorChain; -use omicron_common::address::CLICKHOUSE_TCP_PORT; use reedline::{DefaultPrompt, DefaultPromptSegment, Reedline, Signal}; use std::net::{IpAddr, SocketAddr}; use tabled::{builder::Builder, settings::Style}; /// Run the native SQL shell. -pub async fn shell(addr: IpAddr) -> anyhow::Result<()> { +pub async fn shell(addr: IpAddr, port: u16) -> anyhow::Result<()> { usdt::register_probes()?; - let addr = SocketAddr::new(addr, CLICKHOUSE_TCP_PORT); + let addr = SocketAddr::new(addr, port); let mut conn = native::Connection::new(addr) .await .context("Trying to connect to ClickHouse server")?; diff --git a/oximeter/db/src/shells/oxql.rs b/oximeter/db/src/shells/oxql.rs index f46d08c0cf..909b4916ac 100644 --- a/oximeter/db/src/shells/oxql.rs +++ b/oximeter/db/src/shells/oxql.rs @@ -32,12 +32,13 @@ pub struct ShellOptions { /// Run/execute the OxQL shell. pub async fn shell( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { // Create the client. - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; // A workaround to ensure the client has all available timeseries when the // shell starts. diff --git a/oximeter/db/src/shells/sql.rs b/oximeter/db/src/shells/sql.rs index f75713da3b..4d8c332aaf 100644 --- a/oximeter/db/src/shells/sql.rs +++ b/oximeter/db/src/shells/sql.rs @@ -50,11 +50,12 @@ impl Default for ShellOptions { /// Run/execute the SQL shell. pub async fn shell( address: IpAddr, - port: u16, + http_port: u16, + native_port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { - let client = make_client(address, port, &log).await?; + let client = make_client(address, http_port, native_port, &log).await?; // A workaround to ensure the client has all available timeseries when the // shell starts. diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index 35f96dfd50..d845ce5a3e 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -63,7 +63,8 @@ async fn test_schemas_disjoint() -> anyhow::Result<()> { deployment.deploy().context("failed to deploy")?; let client1 = Client::new_with_request_timeout( - deployment.http_addr(1.into())?, + deployment.http_addr(1.into()), + deployment.native_addr(1.into()), log, request_timeout, ); @@ -158,12 +159,14 @@ async fn test_cluster() -> anyhow::Result<()> { deployment.deploy().context("failed to deploy")?; let client1 = Client::new_with_request_timeout( - deployment.http_addr(1.into())?, + deployment.http_addr(1.into()), + deployment.native_addr(1.into()), log, request_timeout, ); let client2 = Client::new_with_request_timeout( - deployment.http_addr(2.into())?, + deployment.http_addr(2.into()), + deployment.native_addr(2.into()), log, request_timeout, ); @@ -228,7 +231,8 @@ async fn test_cluster() -> anyhow::Result<()> { // Add a 3rd clickhouse server and wait for it to come up deployment.add_server().expect("failed to launch a 3rd clickhouse server"); let client3 = Client::new_with_request_timeout( - deployment.http_addr(3.into())?, + deployment.http_addr(3.into()), + deployment.native_addr(3.into()), log, request_timeout, ); @@ -329,7 +333,8 @@ async fn test_cluster() -> anyhow::Result<()> { // few hundred milliseconds. To shorten the length of our test, we create a // new client with a shorter timeout. let client1_short_timeout = Client::new_with_request_timeout( - deployment.http_addr(1.into())?, + deployment.http_addr(1.into()), + deployment.native_addr(1.into()), log, Duration::from_secs(2), ); diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index b3ab1d5ed5..fb854728c5 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -90,7 +90,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption" postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.2" } proc-macro2 = { version = "1.0.87" } -qorb = { version = "0.0.2", features = ["qtop"] } +qorb = { version = "0.1.1", features = ["qtop"] } quote = { version = "1.0.37" } rand = { version = "0.8.5", features = ["small_rng"] } regex = { version = "1.11.0" } @@ -206,7 +206,7 @@ pkcs8 = { version = "0.10.2", default-features = false, features = ["encryption" postgres-types = { version = "0.2.8", default-features = false, features = ["with-chrono-0_4", "with-serde_json-1", "with-uuid-1"] } predicates = { version = "3.1.2" } proc-macro2 = { version = "1.0.87" } -qorb = { version = "0.0.2", features = ["qtop"] } +qorb = { version = "0.1.1", features = ["qtop"] } quote = { version = "1.0.37" } rand = { version = "0.8.5", features = ["small_rng"] } regex = { version = "1.11.0" } From 5e96ce06202924d757775a0bb2011be3759cb589 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Thu, 17 Oct 2024 08:28:05 -0700 Subject: [PATCH 2/4] Review feedback - Fix EXPECTORATE test output - Use both HTTP and native addresses from oximeter collector config --- dev-tools/omdb/tests/usage_errors.out | 7 +++- oximeter/collector/src/lib.rs | 55 +++++++++++++-------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 56fa624771..de632819d6 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -783,13 +783,16 @@ Usage: omdb oxql [OPTIONS] Options: --log-level log level filter [env: LOG_LEVEL=] [default: warn] --summaries Print summaries of each SQL query run against the database - --elapsed Print the total elapsed query duration --color Color output [default: auto] [possible values: auto, always, never] + --elapsed Print the total elapsed query duration -h, --help Print help Connection Options: --clickhouse-url URL of the ClickHouse server to connect to [env: OMDB_CLICKHOUSE_URL=] + --clickhouse-native-url + URL of the ClickHouse server to connect to for the native protcol [env: + OMDB_CLICKHOUSE_NATIVE_URL=] --dns-server [env: OMDB_DNS_SERVER=] @@ -808,7 +811,7 @@ error: unexpected argument '--summarizes' found tip: a similar argument exists: '--summaries' -Usage: omdb oxql <--clickhouse-url |--summaries|--elapsed> +Usage: omdb oxql <--clickhouse-url |--clickhouse-native-url |--summaries|--elapsed> For more information, try '--help'. ============================================= diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 96af7d41c8..b2bd191feb 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -13,7 +13,6 @@ use dropshot::HttpServer; use dropshot::HttpServerStarter; use internal_dns_types::names::ServiceName; use omicron_common::address::get_internal_dns_server_addresses; -use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::address::DNS_PORT; use omicron_common::api::internal::nexus::ProducerEndpoint; use omicron_common::backoff; @@ -240,42 +239,42 @@ impl Oximeter { .map(|ip| SocketAddr::new(ip, DNS_PORT)) .collect(); + // Closure to create a single resolver. + let make_resolver = + |maybe_address, srv_name: ServiceName| -> BoxedResolver { + if let Some(address) = maybe_address { + Box::new(SingleHostResolver::new(address)) + } else { + Box::new(DnsResolver::new( + service::Name(srv_name.srv_name()), + bootstrap_dns.clone(), + DnsResolverConfig { + hardcoded_ttl: Some(tokio::time::Duration::MAX), + ..Default::default() + }, + )) + } + }; + // Closure to create _two_ resolvers, one to resolve the ClickHouse HTTP // SRV record, and one for the native TCP record. // // TODO(cleanup): This should be removed if / when we entirely switch to // the native protocol. let make_clickhouse_resolvers = || -> (BoxedResolver, BoxedResolver) { - if let Some(address) = config.db.address { - let http = Box::new(SingleHostResolver::new(address)); - let native_addr = - SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT); - let native = Box::new(SingleHostResolver::new(native_addr)); - (http, native) - } else { - let http_service = if config.db.replicated { + let http_resolver = make_resolver( + config.db.address, + if config.db.replicated { ServiceName::ClickhouseServer } else { ServiceName::Clickhouse - }; - let http = Box::new(DnsResolver::new( - service::Name(http_service.srv_name()), - bootstrap_dns.clone(), - DnsResolverConfig { - hardcoded_ttl: Some(tokio::time::Duration::MAX), - ..Default::default() - }, - )); - let native = Box::new(DnsResolver::new( - service::Name(ServiceName::ClickhouseNative.srv_name()), - bootstrap_dns.clone(), - DnsResolverConfig { - hardcoded_ttl: Some(tokio::time::Duration::MAX), - ..Default::default() - }, - )); - (http, native) - } + }, + ); + let native_resolver = make_resolver( + config.db.native_address, + ServiceName::ClickhouseNative, + ); + (http_resolver, native_resolver) }; let make_agent = || async { From 2f30751ab15adfa68f0f9cf060ace9641e3e9241 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Thu, 17 Oct 2024 13:18:14 -0700 Subject: [PATCH 3/4] Check connection health on recycle Adds in a call to ping the server in the `Connector::on_recycle()` method, if we haven't already validated the connection by cancelling a query. This means that qorb learns basically right away that a connection is broken, and tries to reestablish it more quickly. This obviates some of the mucking about with timeouts in earlier commits. --- oximeter/db/src/client/mod.rs | 11 +++++---- oximeter/db/src/native/connection.rs | 34 +++++++++++++++++---------- oximeter/db/src/native/mod.rs | 3 +++ oximeter/db/tests/integration_test.rs | 6 ++--- 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 39fb0526e3..0a09da8abe 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -18,7 +18,6 @@ pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; use crate::model; use crate::native; -use crate::native::connection::default_pool_policy; use crate::query; use crate::Error; use crate::Metric; @@ -208,7 +207,7 @@ impl Client { native_pool: DebugIgnore(Pool::new( native_resolver, Arc::new(native::connection::Connector), - default_pool_policy(), + Default::default(), )), schema, request_timeout, @@ -252,7 +251,7 @@ impl Client { native_pool: DebugIgnore(Pool::new( Box::new(SingleHostResolver::new(native_address)), Arc::new(native::connection::Connector), - default_pool_policy(), + Default::default(), )), schema, request_timeout, @@ -270,9 +269,11 @@ impl Client { } } - /// Ping the ClickHouse server to verify connectivitiy. + /// Ping the ClickHouse server to verify connectivity. pub async fn ping(&self) -> Result<(), Error> { - self.native_pool.claim().await?.ping().await.map_err(Error::Native)?; + let mut handle = self.native_pool.claim().await?; + trace!(self.log, "acquired native pool claim"); + handle.ping().await.map_err(Error::Native)?; trace!(self.log, "successful ping of ClickHouse server"); Ok(()) } diff --git a/oximeter/db/src/native/connection.rs b/oximeter/db/src/native/connection.rs index c8ee91ecaf..911788a91f 100644 --- a/oximeter/db/src/native/connection.rs +++ b/oximeter/db/src/native/connection.rs @@ -36,14 +36,6 @@ use uuid::Uuid; /// A pool of connections to a ClickHouse server over the native protocol. pub type Pool = qorb::pool::Pool; -/// The default connection pooling policy for native TCP connections. -pub fn default_pool_policy() -> qorb::policy::Policy { - qorb::policy::Policy { - claim_timeout: std::time::Duration::from_secs(10), - ..Default::default() - } -} - /// A type for making connections to a ClickHouse server. #[derive(Clone, Copy, Debug)] pub struct Connector; @@ -76,7 +68,16 @@ impl backend::Connector for Connector { &self, conn: &mut Self::Connection, ) -> Result<(), QorbError> { - conn.cancel().await.map_err(QorbError::from) + // We try to cancel an outstanding query. But if there is _no_ + // outstanding query, we sill want to run the validation check of + // pinging the server. That notifies `qorb` if the server is alive in + // the case that there was no query to cancel + if conn.cancel().await.map_err(QorbError::from)? { + Ok(()) + } else { + // No query, so let's run the validation check. + self.is_valid(conn).await + } } } @@ -173,18 +174,25 @@ impl Connection { Err(Error::UnexpectedPacket(packet.kind())) } Some(Err(e)) => Err(e), - None => Err(Error::Disconnected), + None => { + probes::disconnected!(|| ()); + Err(Error::Disconnected) + } } } // Cancel a running query, if one exists. - async fn cancel(&mut self) -> Result<(), Error> { + // + // This returns an error if there is a query and we could not cancel it for + // some reason. It returns `Ok(true)` if we successfully canceled the query, + // or `Ok(false)` if there was no query to cancel at all. + async fn cancel(&mut self) -> Result { if self.outstanding_query { self.writer.send(ClientPacket::Cancel).await?; // Await EOS, throwing everything else away except errors. let res = loop { match self.reader.next().await { - Some(Ok(ServerPacket::EndOfStream)) => break Ok(()), + Some(Ok(ServerPacket::EndOfStream)) => break Ok(true), Some(Ok(other_packet)) => { probes::unexpected__server__packet!( || other_packet.kind() @@ -197,7 +205,7 @@ impl Connection { self.outstanding_query = false; return res; } - Ok(()) + Ok(false) } /// Send a SQL query, possibly with data. diff --git a/oximeter/db/src/native/mod.rs b/oximeter/db/src/native/mod.rs index ce7efcc8d3..9bddd6ad5c 100644 --- a/oximeter/db/src/native/mod.rs +++ b/oximeter/db/src/native/mod.rs @@ -137,6 +137,9 @@ mod probes { /// Emitted when we receive a packet from the server, with its kind. fn packet__received(kind: &str) {} + /// Emitted when we learn we've been disconnected from the server. + fn disconnected() {} + /// Emitted when we receive a data packet, with details about the size and /// data types for each column. fn data__packet__received( diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index d845ce5a3e..3a1649959e 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -455,7 +455,7 @@ async fn wait_for_num_points( Ok(()) } -/// Try to ping the server until it is responds. +/// Try to ping the server until it responds. async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { poll::wait_for_condition( || async { @@ -464,8 +464,8 @@ async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { .await .map_err(|_| poll::CondCheckError::::NotYet) }, - &Duration::from_millis(1), - &Duration::from_secs(10), + &Duration::from_millis(100), + &Duration::from_secs(30), ) .await .with_context(|| { From d69155eb692c56e24756eaa1b18239813771d04d Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Fri, 18 Oct 2024 10:11:02 -0700 Subject: [PATCH 4/4] Bump disk import timeout in helios / deploy CI job --- .github/buildomat/jobs/deploy.sh | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/buildomat/jobs/deploy.sh b/.github/buildomat/jobs/deploy.sh index 018b532107..a9f1bf8a9c 100755 --- a/.github/buildomat/jobs/deploy.sh +++ b/.github/buildomat/jobs/deploy.sh @@ -418,7 +418,11 @@ done /usr/oxide/oxide --resolve "$OXIDE_RESOLVE" --cacert "$E2E_TLS_CERT" \ project create --name images --description "some images" -/usr/oxide/oxide --resolve "$OXIDE_RESOLVE" --cacert "$E2E_TLS_CERT" \ +# NOTE: Use a relatively large timeout on this call, to avoid #6771 +/usr/oxide/oxide \ + --resolve "$OXIDE_RESOLVE" \ + --cacert "$E2E_TLS_CERT" \ + --timeout 60 \ disk import \ --path debian-11-genericcloud-amd64.raw \ --disk debian11-boot \