Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClickHouse native connection pool #6889

Merged
merged 5 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ clickhouse-admin-api = { path = "clickhouse-admin/api" }
clickhouse-admin-keeper-client = { path = "clients/clickhouse-admin-keeper-client" }
clickhouse-admin-server-client = { path = "clients/clickhouse-admin-server-client" }
clickhouse-admin-types = { path = "clickhouse-admin/types" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "ceec762e6a87d2a22bf56792a3025e145caa095e" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "a1b342c2558e835d09e6e39a40d3de798a29c2f" }
cockroach-admin-api = { path = "cockroach-admin/api" }
cockroach-admin-client = { path = "clients/cockroach-admin-client" }
cockroach-admin-types = { path = "cockroach-admin/types" }
Expand Down Expand Up @@ -520,7 +520,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev =
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
proptest = "1.5.0"
qorb = "0.0.2"
qorb = "0.1.1"
quote = "1.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down
56 changes: 49 additions & 7 deletions dev-tools/omdb/src/bin/omdb/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ pub struct OxqlArgs {
)]
clickhouse_url: Option<String>,

/// URL of the ClickHouse server to connect to for the native protcol.
#[arg(
long,
env = "OMDB_CLICKHOUSE_NATIVE_URL",
global = true,
help_heading = CONNECTION_OPTIONS_HEADING,
)]
clickhouse_native_url: Option<String>,

/// Print summaries of each SQL query run against the database.
#[clap(long = "summaries")]
print_summaries: bool,
Expand All @@ -47,29 +56,62 @@ impl OxqlArgs {
omdb: &Omdb,
log: &Logger,
) -> anyhow::Result<()> {
let addr = self.addr(omdb, log).await?;
let http_addr = self.resolve_http_addr(omdb, log).await?;
let native_addr = self.resolve_native_addr(omdb, log).await?;

let opts = ShellOptions {
print_summaries: self.print_summaries,
print_elapsed: self.print_elapsed,
};

oxql::shell(
addr.ip(),
addr.port(),
http_addr.ip(),
http_addr.port(),
native_addr.port(),
log.new(slog::o!("component" => "clickhouse-client")),
opts,
)
.await
}

/// Resolve the ClickHouse URL to a socket address.
async fn addr(
/// Resolve the ClickHouse native TCP socket address.
async fn resolve_native_addr(
&self,
omdb: &Omdb,
log: &Logger,
) -> anyhow::Result<SocketAddr> {
self.resolve_addr(
omdb,
log,
self.clickhouse_native_url.as_deref(),
ServiceName::ClickhouseNative,
)
.await
}

/// Resolve the ClickHouse HTTP URL to a socket address.
async fn resolve_http_addr(
&self,
omdb: &Omdb,
log: &Logger,
) -> anyhow::Result<SocketAddr> {
self.resolve_addr(
omdb,
log,
self.clickhouse_url.as_deref(),
ServiceName::Clickhouse,
)
.await
}

async fn resolve_addr(
&self,
omdb: &Omdb,
log: &Logger,
maybe_url: Option<&str>,
srv: ServiceName,
) -> anyhow::Result<SocketAddr> {
match &self.clickhouse_url {
match maybe_url {
Some(cli_or_env_url) => Url::parse(&cli_or_env_url)
.context(
"failed parsing URL from command-line or environment variable",
Expand All @@ -87,7 +129,7 @@ impl OxqlArgs {
Ok(SocketAddr::V6(
omdb.dns_lookup_one(
log.clone(),
ServiceName::Clickhouse,
srv,
)
.await
.context("failed looking up ClickHouse internal DNS entry")?,
Expand Down
7 changes: 5 additions & 2 deletions dev-tools/omdb/tests/usage_errors.out
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,16 @@ Usage: omdb oxql [OPTIONS]
Options:
--log-level <LOG_LEVEL> log level filter [env: LOG_LEVEL=] [default: warn]
--summaries Print summaries of each SQL query run against the database
--elapsed Print the total elapsed query duration
--color <COLOR> Color output [default: auto] [possible values: auto, always, never]
--elapsed Print the total elapsed query duration
-h, --help Print help

Connection Options:
--clickhouse-url <CLICKHOUSE_URL>
URL of the ClickHouse server to connect to [env: OMDB_CLICKHOUSE_URL=]
--clickhouse-native-url <CLICKHOUSE_NATIVE_URL>
URL of the ClickHouse server to connect to for the native protcol [env:
OMDB_CLICKHOUSE_NATIVE_URL=]
--dns-server <DNS_SERVER>
[env: OMDB_DNS_SERVER=]

Expand All @@ -808,7 +811,7 @@ error: unexpected argument '--summarizes' found

tip: a similar argument exists: '--summaries'

Usage: omdb oxql <--clickhouse-url <CLICKHOUSE_URL>|--summaries|--elapsed>
Usage: omdb oxql <--clickhouse-url <CLICKHOUSE_URL>|--clickhouse-native-url <CLICKHOUSE_NATIVE_URL>|--summaries|--elapsed>

For more information, try '--help'.
=============================================
27 changes: 19 additions & 8 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use internal_dns_types::names::ServiceName;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use nexus_db_queries::db::DataStore;
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::api::external::{DataPageParams, Error, ListResultVec};
use omicron_common::api::internal::nexus::{self, ProducerEndpoint};
use oximeter_client::Client as OximeterClient;
Expand Down Expand Up @@ -60,15 +60,26 @@ impl LazyTimeseriesClient {
pub(crate) async fn get(
&self,
) -> Result<oximeter_db::Client, ResolveError> {
let address = match &self.source {
ClientSource::FromIp { address } => *address,
ClientSource::FromDns { resolver } => SocketAddr::new(
resolver.lookup_ip(ServiceName::Clickhouse).await?,
CLICKHOUSE_HTTP_PORT,
),
let (http_address, native_address) = match &self.source {
ClientSource::FromIp { address } => {
let native_address =
SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT);
(*address, native_address)
}
ClientSource::FromDns { resolver } => {
let http_address = SocketAddr::from(
resolver.lookup_socket_v6(ServiceName::Clickhouse).await?,
);
let native_address = SocketAddr::from(
resolver
.lookup_socket_v6(ServiceName::ClickhouseNative)
.await?,
);
(http_address, native_address)
}
};

Ok(oximeter_db::Client::new(address, &self.log))
Ok(oximeter_db::Client::new(http_address, native_address, &self.log))
}
}

Expand Down
10 changes: 8 additions & 2 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
log.new(o!("component" => "oximeter")),
nexus_internal_addr,
clickhouse.http_address().port(),
clickhouse.native_address().port(),
collector_id,
)
.await
Expand Down Expand Up @@ -1449,11 +1450,16 @@ pub async fn start_sled_agent(
pub async fn start_oximeter(
log: Logger,
nexus_address: SocketAddr,
db_port: u16,
http_port: u16,
native_port: u16,
id: Uuid,
) -> Result<Oximeter, String> {
let db = oximeter_collector::DbConfig {
address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db_port)),
address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), http_port)),
native_address: Some(SocketAddr::new(
Ipv6Addr::LOCALHOST.into(),
native_port,
)),
batch_size: 10,
batch_interval: 1,
replicated: false,
Expand Down
8 changes: 7 additions & 1 deletion nexus/tests/integration_tests/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ async fn test_oximeter_reregistration() {

// ClickHouse client for verifying collection.
let ch_address = context.clickhouse.http_address().into();
let client = oximeter_db::Client::new(ch_address, &context.logctx.log);
let native_address = context.clickhouse.native_address().into();
let client = oximeter_db::Client::new(
ch_address,
native_address,
&context.logctx.log,
);
client
.init_single_node_db()
.await
Expand Down Expand Up @@ -302,6 +307,7 @@ async fn test_oximeter_reregistration() {
context.logctx.log.new(o!("component" => "oximeter")),
context.server.get_http_server_internal_address().await,
context.clickhouse.http_address().port(),
context.clickhouse.native_address().port(),
oximeter_id,
)
.await
Expand Down
20 changes: 16 additions & 4 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use chrono::Utc;
use futures::TryStreamExt;
use nexus_client::types::IdSortMode;
use nexus_client::Client as NexusClient;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::backoff;
use omicron_common::backoff::BackoffError;
use oximeter::types::ProducerResults;
Expand All @@ -34,6 +35,7 @@ use slog::warn;
use slog::Logger;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::ops::Bound;
use std::sync::Arc;
Expand Down Expand Up @@ -383,12 +385,15 @@ pub struct OximeterAgent {

impl OximeterAgent {
/// Construct a new agent with the given ID and logger.
// TODO(cleanup): Remove this lint when we have only a native resolver.
#[allow(clippy::too_many_arguments)]
pub async fn with_id(
id: Uuid,
address: SocketAddrV6,
refresh_interval: Duration,
db_config: DbConfig,
resolver: BoxedResolver,
http_resolver: BoxedResolver,
native_resolver: BoxedResolver,
log: &Logger,
replicated: bool,
) -> Result<Self, Error> {
Expand All @@ -414,7 +419,8 @@ impl OximeterAgent {
// - The DB doesn't exist at all. This reports a version number of 0. We
// need to create the DB here, at the latest version. This is used in
// fresh installations and tests.
let client = Client::new_with_pool(resolver, &log);
let client =
Client::new_with_pool(http_resolver, native_resolver, &log);
match client.check_db_is_at_expected_version().await {
Ok(_) => {}
Err(oximeter_db::Error::DatabaseVersionMismatch {
Expand Down Expand Up @@ -506,12 +512,18 @@ impl OximeterAgent {
// prints the results as they're received.
let insertion_log = log.new(o!("component" => "results-sink"));
if let Some(db_config) = db_config {
let Some(address) = db_config.address else {
let Some(http_address) = db_config.address else {
return Err(Error::Standalone(anyhow!(
"Must provide explicit IP address in standalone mode"
)));
};
let client = Client::new(address, &log);

// Grab the native TCP address, or construct one from the defaults.
let native_address =
db_config.native_address.unwrap_or_else(|| {
SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT)
});
let client = Client::new(http_address, native_address, &log);
let replicated = client.is_oximeter_cluster().await?;
if !replicated {
client.init_single_node_db().await?;
Expand Down
Loading
Loading