Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init timeseries database on connection acquisition #6878

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nexus/tests/integration_tests/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::integration_tests::metrics::wait_for_producer;
use nexus_test_interface::NexusServer;
use nexus_test_utils_macros::nexus_test;
use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError};
use oximeter_db::DbWrite;
use oximeter_db::DbInit;
use std::time::Duration;
use uuid::Uuid;

Expand Down
2 changes: 2 additions & 0 deletions oximeter/collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
camino.workspace = true
chrono.workspace = true
clap.workspace = true
Expand All @@ -22,6 +23,7 @@ oximeter.workspace = true
oximeter-api.workspace = true
oximeter-client.workspace = true
oximeter-db.workspace = true
qorb.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = [ "json" ] }
schemars.workspace = true
Expand Down
57 changes: 23 additions & 34 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ use anyhow::anyhow;
use chrono::DateTime;
use chrono::Utc;
use futures::TryStreamExt;
use internal_dns::resolver::Resolver;
use internal_dns::ServiceName;
use nexus_client::types::IdSortMode;
use nexus_client::Client as NexusClient;
use omicron_common::backoff;
use omicron_common::backoff::BackoffError;
use oximeter::types::ProducerResults;
use oximeter::types::ProducerResultsItem;
use oximeter_db::Client;
use oximeter_db::DbWrite;
use oximeter_db::{DbInit, DbWrite};
use qorb::claim::Handle;
use qorb::pool::Pool;
use qorb::resolver::BoxedResolver;
use slog::debug;
use slog::error;
use slog::info;
Expand All @@ -32,7 +34,6 @@ use slog::warn;
use slog::Logger;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::ops::Bound;
use std::sync::Arc;
Expand Down Expand Up @@ -387,7 +388,7 @@ impl OximeterAgent {
address: SocketAddrV6,
refresh_interval: Duration,
db_config: DbConfig,
resolver: &Resolver,
resolver: BoxedResolver,
log: &Logger,
replicated: bool,
) -> Result<Self, Error> {
Expand All @@ -399,22 +400,6 @@ impl OximeterAgent {
));
let insertion_log = log.new(o!("component" => "results-sink"));

// Construct the ClickHouse client first, propagate an error if we can't reach the
// database.
let db_address = if let Some(address) = db_config.address {
address
} else if replicated {
SocketAddr::V6(
resolver
.lookup_socket_v6(ServiceName::ClickhouseServer)
.await?,
)
} else {
SocketAddr::V6(
resolver.lookup_socket_v6(ServiceName::Clickhouse).await?,
)
};

// Determine the version of the database.
//
// There are three cases
Expand All @@ -429,7 +414,7 @@ impl OximeterAgent {
// - The DB doesn't exist at all. This reports a version number of 0. We
// need to create the DB here, at the latest version. This is used in
// fresh installations and tests.
let client = Client::new(db_address, &log);
let client = Client::new_with_pool(resolver, &log);
match client.check_db_is_at_expected_version().await {
Ok(_) => {}
Err(oximeter_db::Error::DatabaseVersionMismatch {
Expand Down Expand Up @@ -482,11 +467,14 @@ impl OximeterAgent {

/// Ensure the background task that polls Nexus periodically for our list of
/// assigned producers is running.
pub(crate) fn ensure_producer_refresh_task(&self, resolver: Resolver) {
pub(crate) fn ensure_producer_refresh_task(
&self,
nexus_pool: Pool<NexusClient>,
) {
let mut task = self.refresh_task.lock().unwrap();
if task.is_none() {
let refresh_task =
tokio::spawn(refresh_producer_list(self.clone(), resolver));
tokio::spawn(refresh_producer_list(self.clone(), nexus_pool));
*task = Some(refresh_task);
}
}
Expand Down Expand Up @@ -763,15 +751,16 @@ impl OximeterAgent {
}

// A task which periodically updates our list of producers from Nexus.
async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) {
async fn refresh_producer_list(
agent: OximeterAgent,
nexus_pool: Pool<NexusClient>,
) {
let mut interval = tokio::time::interval(agent.refresh_interval);
loop {
interval.tick().await;
info!(agent.log, "refreshing list of producers from Nexus");
let nexus_addr =
resolve_nexus_with_backoff(&agent.log, &resolver).await;
let url = format!("http://{}", nexus_addr);
let client = nexus_client::Client::new(&url, agent.log.clone());

let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await;
let mut stream = client.cpapi_assigned_producers_list_stream(
&agent.id,
// This is a _total_ limit, not a page size, so `None` means "get
Expand Down Expand Up @@ -826,10 +815,10 @@ async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) {
}
}

async fn resolve_nexus_with_backoff(
async fn claim_nexus_with_backoff(
log: &Logger,
resolver: &Resolver,
) -> SocketAddrV6 {
nexus_pool: &Pool<NexusClient>,
) -> Handle<NexusClient> {
let log_failure = |error, delay| {
warn!(
log,
Expand All @@ -839,8 +828,8 @@ async fn resolve_nexus_with_backoff(
);
};
let do_lookup = || async {
resolver
.lookup_socket_v6(ServiceName::Nexus)
nexus_pool
.claim()
.await
.map_err(|e| BackoffError::transient(e.to_string()))
};
Expand Down
133 changes: 106 additions & 27 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ use dropshot::ConfigLogging;
use dropshot::HttpError;
use dropshot::HttpServer;
use dropshot::HttpServerStarter;
use internal_dns::resolver::ResolveError;
use internal_dns::resolver::Resolver;
use internal_dns::ServiceName;
use omicron_common::address::get_internal_dns_server_addresses;
use omicron_common::address::DNS_PORT;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::backoff;
use omicron_common::FileKv;
use qorb::backend;
use qorb::resolver::AllBackends;
use qorb::resolver::BoxedResolver;
use qorb::resolver::Resolver;
use qorb::resolvers::dns::DnsResolver;
use qorb::resolvers::dns::DnsResolverConfig;
use qorb::service;
use serde::Deserialize;
use serde::Serialize;
use slog::debug;
Expand All @@ -26,12 +33,14 @@ use slog::o;
use slog::warn;
use slog::Drain;
use slog::Logger;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::watch;
use uuid::Uuid;

mod agent;
Expand All @@ -56,9 +65,6 @@ pub enum Error {
#[error(transparent)]
Database(#[from] oximeter_db::Error),

#[error(transparent)]
ResolveError(#[from] ResolveError),

#[error("Error running standalone")]
Standalone(#[from] anyhow::Error),
}
Expand Down Expand Up @@ -157,6 +163,49 @@ pub struct OximeterArguments {
pub address: SocketAddrV6,
}

// Provides an alternative to the DNS resolver for cases where we want to
// contact a backend without performing resolution.
struct SingleHostResolver {
tx: watch::Sender<AllBackends>,
}

impl SingleHostResolver {
fn new(address: SocketAddr) -> Self {
let backends = Arc::new(BTreeMap::from([(
backend::Name::new("singleton"),
backend::Backend { address },
)]));
let (tx, _rx) = watch::channel(backends.clone());
Self { tx }
}
}

impl Resolver for SingleHostResolver {
fn monitor(&mut self) -> watch::Receiver<AllBackends> {
self.tx.subscribe()
}
}

// A "qorb connector" which converts a SocketAddr into a nexus_client::Client.
struct NexusConnector {
log: Logger,
}

#[async_trait::async_trait]
impl backend::Connector for NexusConnector {
type Connection = nexus_client::Client;

async fn connect(
&self,
backend: &backend::Backend,
) -> Result<Self::Connection, backend::Error> {
Ok(nexus_client::Client::new(
&format!("http://{}", backend.address),
self.log.clone(),
))
}
}

/// A server used to collect metrics from components in the control plane.
pub struct Oximeter {
agent: Arc<OximeterAgent>,
Expand Down Expand Up @@ -202,10 +251,32 @@ impl Oximeter {
}
info!(log, "starting oximeter server");

let resolver = Resolver::new_from_ip(
log.new(o!("component" => "DnsResolver")),
*args.address.ip(),
)?;
// Use the address for Oximeter to infer the bootstrap DNS address
let bootstrap_dns: Vec<SocketAddr> =
get_internal_dns_server_addresses(*args.address.ip())
.into_iter()
.map(|ip| SocketAddr::new(ip, DNS_PORT))
.collect();

let make_clickhouse_resolver = || -> BoxedResolver {
if let Some(address) = config.db.address {
Box::new(SingleHostResolver::new(address))
} else {
let service = if config.db.replicated {
ServiceName::ClickhouseServer
} else {
ServiceName::Clickhouse
};
Box::new(DnsResolver::new(
service::Name(service.srv_name()),
bootstrap_dns.clone(),
DnsResolverConfig {
hardcoded_ttl: Some(tokio::time::Duration::MAX),
..Default::default()
},
))
}
};

let make_agent = || async {
debug!(log, "creating ClickHouse client");
Expand All @@ -215,7 +286,7 @@ impl Oximeter {
args.address,
config.refresh_interval,
config.db,
&resolver,
make_clickhouse_resolver(),
&log,
config.db.replicated,
)
Expand Down Expand Up @@ -256,24 +327,32 @@ impl Oximeter {
address: server.local_addr().to_string(),
collector_id: agent.id,
};

let nexus_pool = {
let nexus_resolver: BoxedResolver =
if let Some(address) = config.nexus_address {
Box::new(SingleHostResolver::new(address))
} else {
Box::new(DnsResolver::new(
service::Name(ServiceName::Nexus.srv_name()),
bootstrap_dns,
DnsResolverConfig {
hardcoded_ttl: Some(tokio::time::Duration::MAX),
..Default::default()
},
))
};

qorb::pool::Pool::new(
nexus_resolver,
Arc::new(NexusConnector { log: log.clone() }),
qorb::policy::Policy::default(),
)
};

let notify_nexus = || async {
debug!(log, "contacting nexus");
let nexus_address = if let Some(address) = config.nexus_address {
address
} else {
SocketAddr::V6(
resolver
.lookup_socket_v6(ServiceName::Nexus)
.await
.map_err(|e| {
backoff::BackoffError::transient(e.to_string())
})?,
)
};
let client = nexus_client::Client::new(
&format!("http://{nexus_address}"),
log.clone(),
);
let client = nexus_pool.claim().await.map_err(|e| e.to_string())?;
client.cpapi_collectors_post(&our_info).await.map_err(|e| {
match &e {
// Failures to reach nexus, or server errors on its side
Expand Down Expand Up @@ -307,7 +386,7 @@ impl Oximeter {

// Now that we've successfully registered, we'll start periodically
// polling for our list of producers from Nexus.
agent.ensure_producer_refresh_task(resolver);
agent.ensure_producer_refresh_task(nexus_pool);

info!(log, "oximeter registered with nexus"; "id" => ?agent.id);
Ok(Self { agent, server })
Expand Down
Loading