Skip to content

Commit

Permalink
Merge branch 'main' into qorb-terminate
Browse files Browse the repository at this point in the history
  • Loading branch information
smklein committed Oct 16, 2024
2 parents 6326cde + e0b3052 commit a8a27cc
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 73 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions oximeter/collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
camino.workspace = true
chrono.workspace = true
clap.workspace = true
Expand All @@ -23,6 +24,7 @@ oximeter.workspace = true
oximeter-api.workspace = true
oximeter-client.workspace = true
oximeter-db.workspace = true
qorb.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = [ "json" ] }
schemars.workspace = true
Expand Down
55 changes: 22 additions & 33 deletions oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ use anyhow::anyhow;
use chrono::DateTime;
use chrono::Utc;
use futures::TryStreamExt;
use internal_dns_resolver::Resolver;
use internal_dns_types::names::ServiceName;
use nexus_client::types::IdSortMode;
use nexus_client::Client as NexusClient;
use omicron_common::backoff;
use omicron_common::backoff::BackoffError;
use oximeter::types::ProducerResults;
use oximeter::types::ProducerResultsItem;
use oximeter_db::Client;
use oximeter_db::DbWrite;
use qorb::claim::Handle;
use qorb::pool::Pool;
use qorb::resolver::BoxedResolver;
use slog::debug;
use slog::error;
use slog::info;
Expand All @@ -32,7 +34,6 @@ use slog::warn;
use slog::Logger;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::ops::Bound;
use std::sync::Arc;
Expand Down Expand Up @@ -387,7 +388,7 @@ impl OximeterAgent {
address: SocketAddrV6,
refresh_interval: Duration,
db_config: DbConfig,
resolver: &Resolver,
resolver: BoxedResolver,
log: &Logger,
replicated: bool,
) -> Result<Self, Error> {
Expand All @@ -399,22 +400,6 @@ impl OximeterAgent {
));
let insertion_log = log.new(o!("component" => "results-sink"));

// Construct the ClickHouse client first, propagate an error if we can't reach the
// database.
let db_address = if let Some(address) = db_config.address {
address
} else if replicated {
SocketAddr::V6(
resolver
.lookup_socket_v6(ServiceName::ClickhouseServer)
.await?,
)
} else {
SocketAddr::V6(
resolver.lookup_socket_v6(ServiceName::Clickhouse).await?,
)
};

// Determine the version of the database.
//
// There are three cases
Expand All @@ -429,7 +414,7 @@ impl OximeterAgent {
// - The DB doesn't exist at all. This reports a version number of 0. We
// need to create the DB here, at the latest version. This is used in
// fresh installations and tests.
let client = Client::new(db_address, &log);
let client = Client::new_with_pool(resolver, &log);
match client.check_db_is_at_expected_version().await {
Ok(_) => {}
Err(oximeter_db::Error::DatabaseVersionMismatch {
Expand Down Expand Up @@ -482,11 +467,14 @@ impl OximeterAgent {

/// Ensure the background task that polls Nexus periodically for our list of
/// assigned producers is running.
pub(crate) fn ensure_producer_refresh_task(&self, resolver: Resolver) {
pub(crate) fn ensure_producer_refresh_task(
&self,
nexus_pool: Pool<NexusClient>,
) {
let mut task = self.refresh_task.lock().unwrap();
if task.is_none() {
let refresh_task =
tokio::spawn(refresh_producer_list(self.clone(), resolver));
tokio::spawn(refresh_producer_list(self.clone(), nexus_pool));
*task = Some(refresh_task);
}
}
Expand Down Expand Up @@ -763,15 +751,16 @@ impl OximeterAgent {
}

// A task which periodically updates our list of producers from Nexus.
async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) {
async fn refresh_producer_list(
agent: OximeterAgent,
nexus_pool: Pool<NexusClient>,
) {
let mut interval = tokio::time::interval(agent.refresh_interval);
loop {
interval.tick().await;
info!(agent.log, "refreshing list of producers from Nexus");
let nexus_addr =
resolve_nexus_with_backoff(&agent.log, &resolver).await;
let url = format!("http://{}", nexus_addr);
let client = nexus_client::Client::new(&url, agent.log.clone());

let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await;
let mut stream = client.cpapi_assigned_producers_list_stream(
&agent.id,
// This is a _total_ limit, not a page size, so `None` means "get
Expand Down Expand Up @@ -826,10 +815,10 @@ async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) {
}
}

async fn resolve_nexus_with_backoff(
async fn claim_nexus_with_backoff(
log: &Logger,
resolver: &Resolver,
) -> SocketAddrV6 {
nexus_pool: &Pool<NexusClient>,
) -> Handle<NexusClient> {
let log_failure = |error, delay| {
warn!(
log,
Expand All @@ -839,8 +828,8 @@ async fn resolve_nexus_with_backoff(
);
};
let do_lookup = || async {
resolver
.lookup_socket_v6(ServiceName::Nexus)
nexus_pool
.claim()
.await
.map_err(|e| BackoffError::transient(e.to_string()))
};
Expand Down
133 changes: 106 additions & 27 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,19 @@ use dropshot::ConfigLogging;
use dropshot::HttpError;
use dropshot::HttpServer;
use dropshot::HttpServerStarter;
use internal_dns_resolver::ResolveError;
use internal_dns_resolver::Resolver;
use internal_dns_types::names::ServiceName;
use omicron_common::address::get_internal_dns_server_addresses;
use omicron_common::address::DNS_PORT;
use omicron_common::api::internal::nexus::ProducerEndpoint;
use omicron_common::backoff;
use omicron_common::FileKv;
use qorb::backend;
use qorb::resolver::AllBackends;
use qorb::resolver::BoxedResolver;
use qorb::resolver::Resolver;
use qorb::resolvers::dns::DnsResolver;
use qorb::resolvers::dns::DnsResolverConfig;
use qorb::service;
use serde::Deserialize;
use serde::Serialize;
use slog::debug;
Expand All @@ -26,12 +33,14 @@ use slog::o;
use slog::warn;
use slog::Drain;
use slog::Logger;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::watch;
use uuid::Uuid;

mod agent;
Expand All @@ -56,9 +65,6 @@ pub enum Error {
#[error(transparent)]
Database(#[from] oximeter_db::Error),

#[error(transparent)]
ResolveError(#[from] ResolveError),

#[error("Error running standalone")]
Standalone(#[from] anyhow::Error),
}
Expand Down Expand Up @@ -157,6 +163,49 @@ pub struct OximeterArguments {
pub address: SocketAddrV6,
}

// Provides an alternative to the DNS resolver for cases where we want to
// contact a backend without performing resolution.
struct SingleHostResolver {
tx: watch::Sender<AllBackends>,
}

impl SingleHostResolver {
fn new(address: SocketAddr) -> Self {
let backends = Arc::new(BTreeMap::from([(
backend::Name::new("singleton"),
backend::Backend { address },
)]));
let (tx, _rx) = watch::channel(backends.clone());
Self { tx }
}
}

impl Resolver for SingleHostResolver {
fn monitor(&mut self) -> watch::Receiver<AllBackends> {
self.tx.subscribe()
}
}

// A "qorb connector" which converts a SocketAddr into a nexus_client::Client.
struct NexusConnector {
log: Logger,
}

#[async_trait::async_trait]
impl backend::Connector for NexusConnector {
type Connection = nexus_client::Client;

async fn connect(
&self,
backend: &backend::Backend,
) -> Result<Self::Connection, backend::Error> {
Ok(nexus_client::Client::new(
&format!("http://{}", backend.address),
self.log.clone(),
))
}
}

/// A server used to collect metrics from components in the control plane.
pub struct Oximeter {
agent: Arc<OximeterAgent>,
Expand Down Expand Up @@ -202,10 +251,32 @@ impl Oximeter {
}
info!(log, "starting oximeter server");

let resolver = Resolver::new_from_ip(
log.new(o!("component" => "DnsResolver")),
*args.address.ip(),
)?;
// Use the address for Oximeter to infer the bootstrap DNS address
let bootstrap_dns: Vec<SocketAddr> =
get_internal_dns_server_addresses(*args.address.ip())
.into_iter()
.map(|ip| SocketAddr::new(ip, DNS_PORT))
.collect();

let make_clickhouse_resolver = || -> BoxedResolver {
if let Some(address) = config.db.address {
Box::new(SingleHostResolver::new(address))
} else {
let service = if config.db.replicated {
ServiceName::ClickhouseServer
} else {
ServiceName::Clickhouse
};
Box::new(DnsResolver::new(
service::Name(service.srv_name()),
bootstrap_dns.clone(),
DnsResolverConfig {
hardcoded_ttl: Some(tokio::time::Duration::MAX),
..Default::default()
},
))
}
};

let make_agent = || async {
debug!(log, "creating ClickHouse client");
Expand All @@ -215,7 +286,7 @@ impl Oximeter {
args.address,
config.refresh_interval,
config.db,
&resolver,
make_clickhouse_resolver(),
&log,
config.db.replicated,
)
Expand Down Expand Up @@ -256,24 +327,32 @@ impl Oximeter {
address: server.local_addr().to_string(),
collector_id: agent.id,
};

let nexus_pool = {
let nexus_resolver: BoxedResolver =
if let Some(address) = config.nexus_address {
Box::new(SingleHostResolver::new(address))
} else {
Box::new(DnsResolver::new(
service::Name(ServiceName::Nexus.srv_name()),
bootstrap_dns,
DnsResolverConfig {
hardcoded_ttl: Some(tokio::time::Duration::MAX),
..Default::default()
},
))
};

qorb::pool::Pool::new(
nexus_resolver,
Arc::new(NexusConnector { log: log.clone() }),
qorb::policy::Policy::default(),
)
};

let notify_nexus = || async {
debug!(log, "contacting nexus");
let nexus_address = if let Some(address) = config.nexus_address {
address
} else {
SocketAddr::V6(
resolver
.lookup_socket_v6(ServiceName::Nexus)
.await
.map_err(|e| {
backoff::BackoffError::transient(e.to_string())
})?,
)
};
let client = nexus_client::Client::new(
&format!("http://{nexus_address}"),
log.clone(),
);
let client = nexus_pool.claim().await.map_err(|e| e.to_string())?;
client.cpapi_collectors_post(&our_info).await.map_err(|e| {
match &e {
// Failures to reach nexus, or server errors on its side
Expand Down Expand Up @@ -307,7 +386,7 @@ impl Oximeter {

// Now that we've successfully registered, we'll start periodically
// polling for our list of producers from Nexus.
agent.ensure_producer_refresh_task(resolver);
agent.ensure_producer_refresh_task(nexus_pool);

info!(log, "oximeter registered with nexus"; "id" => ?agent.id);
Ok(Self { agent, server })
Expand Down
2 changes: 2 additions & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ chrono.workspace = true
chrono-tz.workspace = true
clap.workspace = true
clickward.workspace = true
debug-ignore.workspace = true
dropshot.workspace = true
futures.workspace = true
gethostname.workspace = true
Expand All @@ -30,6 +31,7 @@ omicron-common.workspace = true
omicron-workspace-hack.workspace = true
oximeter.workspace = true
oxql-types.workspace = true
qorb.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
Loading

0 comments on commit a8a27cc

Please sign in to comment.