Skip to content

Commit

Permalink
Add ClickHouse native connection pool
Browse files Browse the repository at this point in the history
- Implement a connector and connection pool for talking to ClickHouse
  over the native TCP protocol
- Add the native TCP address or a `BoxedResolver` for resolving it to
  all code that connects to ClickHouse. This is a pretty noisy change,
  since we now need two addresses / objects everywhere. We'll take it
  back down to one when we completely make the switch to the native
  protocol and remove the HTTP interface.
- Remove the feature flag gating the native code, keeping that just for
  the prototype SQL client shell.
- NFC formatting change to bring the `oximeter_db::native` import style
  in line with the rest of the crate.
  • Loading branch information
bnaecker committed Oct 17, 2024
1 parent e0b3052 commit 516ee34
Show file tree
Hide file tree
Showing 33 changed files with 686 additions and 262 deletions.
39 changes: 35 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ clickhouse-admin-api = { path = "clickhouse-admin/api" }
clickhouse-admin-keeper-client = { path = "clients/clickhouse-admin-keeper-client" }
clickhouse-admin-server-client = { path = "clients/clickhouse-admin-server-client" }
clickhouse-admin-types = { path = "clickhouse-admin/types" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "ceec762e6a87d2a22bf56792a3025e145caa095e" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "a1b342c2558e835d09e6e39a40d3de798a29c2f" }
cockroach-admin-api = { path = "cockroach-admin/api" }
cockroach-admin-client = { path = "clients/cockroach-admin-client" }
cockroach-admin-types = { path = "cockroach-admin/types" }
Expand Down Expand Up @@ -520,7 +520,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev =
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
proptest = "1.5.0"
qorb = "0.0.2"
qorb = "0.1.1"
quote = "1.0"
rand = "0.8.5"
rand_core = "0.6.4"
Expand Down
56 changes: 49 additions & 7 deletions dev-tools/omdb/src/bin/omdb/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ pub struct OxqlArgs {
)]
clickhouse_url: Option<String>,

/// URL of the ClickHouse server to connect to for the native protcol.
#[arg(
long,
env = "OMDB_CLICKHOUSE_NATIVE_URL",
global = true,
help_heading = CONNECTION_OPTIONS_HEADING,
)]
clickhouse_native_url: Option<String>,

/// Print summaries of each SQL query run against the database.
#[clap(long = "summaries")]
print_summaries: bool,
Expand All @@ -47,29 +56,62 @@ impl OxqlArgs {
omdb: &Omdb,
log: &Logger,
) -> anyhow::Result<()> {
let addr = self.addr(omdb, log).await?;
let http_addr = self.resolve_http_addr(omdb, log).await?;
let native_addr = self.resolve_native_addr(omdb, log).await?;

let opts = ShellOptions {
print_summaries: self.print_summaries,
print_elapsed: self.print_elapsed,
};

oxql::shell(
addr.ip(),
addr.port(),
http_addr.ip(),
http_addr.port(),
native_addr.port(),
log.new(slog::o!("component" => "clickhouse-client")),
opts,
)
.await
}

/// Resolve the ClickHouse URL to a socket address.
async fn addr(
/// Resolve the ClickHouse native TCP socket address.
async fn resolve_native_addr(
&self,
omdb: &Omdb,
log: &Logger,
) -> anyhow::Result<SocketAddr> {
self.resolve_addr(
omdb,
log,
self.clickhouse_native_url.as_deref(),
ServiceName::ClickhouseNative,
)
.await
}

/// Resolve the ClickHouse HTTP URL to a socket address.
async fn resolve_http_addr(
&self,
omdb: &Omdb,
log: &Logger,
) -> anyhow::Result<SocketAddr> {
self.resolve_addr(
omdb,
log,
self.clickhouse_url.as_deref(),
ServiceName::Clickhouse,
)
.await
}

async fn resolve_addr(
&self,
omdb: &Omdb,
log: &Logger,
maybe_url: Option<&str>,
srv: ServiceName,
) -> anyhow::Result<SocketAddr> {
match &self.clickhouse_url {
match maybe_url {
Some(cli_or_env_url) => Url::parse(&cli_or_env_url)
.context(
"failed parsing URL from command-line or environment variable",
Expand All @@ -87,7 +129,7 @@ impl OxqlArgs {
Ok(SocketAddr::V6(
omdb.dns_lookup_one(
log.clone(),
ServiceName::Clickhouse,
srv,
)
.await
.context("failed looking up ClickHouse internal DNS entry")?,
Expand Down
27 changes: 19 additions & 8 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use internal_dns_types::names::ServiceName;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use nexus_db_queries::db::DataStore;
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::api::external::{DataPageParams, Error, ListResultVec};
use omicron_common::api::internal::nexus::{self, ProducerEndpoint};
use oximeter_client::Client as OximeterClient;
Expand Down Expand Up @@ -60,15 +60,26 @@ impl LazyTimeseriesClient {
pub(crate) async fn get(
&self,
) -> Result<oximeter_db::Client, ResolveError> {
let address = match &self.source {
ClientSource::FromIp { address } => *address,
ClientSource::FromDns { resolver } => SocketAddr::new(
resolver.lookup_ip(ServiceName::Clickhouse).await?,
CLICKHOUSE_HTTP_PORT,
),
let (http_address, native_address) = match &self.source {
ClientSource::FromIp { address } => {
let native_address =
SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT);
(*address, native_address)
}
ClientSource::FromDns { resolver } => {
let http_address = SocketAddr::from(
resolver.lookup_socket_v6(ServiceName::Clickhouse).await?,
);
let native_address = SocketAddr::from(
resolver
.lookup_socket_v6(ServiceName::ClickhouseNative)
.await?,
);
(http_address, native_address)
}
};

Ok(oximeter_db::Client::new(address, &self.log))
Ok(oximeter_db::Client::new(http_address, native_address, &self.log))
}
}

Expand Down
10 changes: 8 additions & 2 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
log.new(o!("component" => "oximeter")),
nexus_internal_addr,
clickhouse.http_address().port(),
clickhouse.native_address().port(),
collector_id,
)
.await
Expand Down Expand Up @@ -1449,11 +1450,16 @@ pub async fn start_sled_agent(
pub async fn start_oximeter(
log: Logger,
nexus_address: SocketAddr,
db_port: u16,
http_port: u16,
native_port: u16,
id: Uuid,
) -> Result<Oximeter, String> {
let db = oximeter_collector::DbConfig {
address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db_port)),
address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), http_port)),
native_address: Some(SocketAddr::new(
Ipv6Addr::LOCALHOST.into(),
native_port,
)),
batch_size: 10,
batch_interval: 1,
replicated: false,
Expand Down
8 changes: 7 additions & 1 deletion nexus/tests/integration_tests/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ async fn test_oximeter_reregistration() {

// ClickHouse client for verifying collection.
let ch_address = context.clickhouse.http_address().into();
let client = oximeter_db::Client::new(ch_address, &context.logctx.log);
let native_address = context.clickhouse.native_address().into();
let client = oximeter_db::Client::new(
ch_address,
native_address,
&context.logctx.log,
);
client
.init_single_node_db()
.await
Expand Down Expand Up @@ -302,6 +307,7 @@ async fn test_oximeter_reregistration() {
context.logctx.log.new(o!("component" => "oximeter")),
context.server.get_http_server_internal_address().await,
context.clickhouse.http_address().port(),
context.clickhouse.native_address().port(),
oximeter_id,
)
.await
Expand Down
20 changes: 16 additions & 4 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use chrono::Utc;
use futures::TryStreamExt;
use nexus_client::types::IdSortMode;
use nexus_client::Client as NexusClient;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::backoff;
use omicron_common::backoff::BackoffError;
use oximeter::types::ProducerResults;
Expand All @@ -34,6 +35,7 @@ use slog::warn;
use slog::Logger;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::ops::Bound;
use std::sync::Arc;
Expand Down Expand Up @@ -383,12 +385,15 @@ pub struct OximeterAgent {

impl OximeterAgent {
/// Construct a new agent with the given ID and logger.
// TODO(cleanup): Remove this lint when we have only a native resolver.
#[allow(clippy::too_many_arguments)]
pub async fn with_id(
id: Uuid,
address: SocketAddrV6,
refresh_interval: Duration,
db_config: DbConfig,
resolver: BoxedResolver,
http_resolver: BoxedResolver,
native_resolver: BoxedResolver,
log: &Logger,
replicated: bool,
) -> Result<Self, Error> {
Expand All @@ -414,7 +419,8 @@ impl OximeterAgent {
// - The DB doesn't exist at all. This reports a version number of 0. We
// need to create the DB here, at the latest version. This is used in
// fresh installations and tests.
let client = Client::new_with_pool(resolver, &log);
let client =
Client::new_with_pool(http_resolver, native_resolver, &log);
match client.check_db_is_at_expected_version().await {
Ok(_) => {}
Err(oximeter_db::Error::DatabaseVersionMismatch {
Expand Down Expand Up @@ -506,12 +512,18 @@ impl OximeterAgent {
// prints the results as they're received.
let insertion_log = log.new(o!("component" => "results-sink"));
if let Some(db_config) = db_config {
let Some(address) = db_config.address else {
let Some(http_address) = db_config.address else {
return Err(Error::Standalone(anyhow!(
"Must provide explicit IP address in standalone mode"
)));
};
let client = Client::new(address, &log);

// Grab the native TCP address, or construct one from the defaults.
let native_address =
db_config.native_address.unwrap_or_else(|| {
SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT)
});
let client = Client::new(http_address, native_address, &log);
let replicated = client.is_oximeter_cluster().await?;
if !replicated {
client.init_single_node_db().await?;
Expand Down
Loading

0 comments on commit 516ee34

Please sign in to comment.