From a6dd46d3eee7cd7f3e938e4acaf6a6066e714584 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 12 Oct 2024 12:41:38 -0700 Subject: [PATCH] [ereport] a bunch more machinery --- Cargo.lock | 4 + ereporter/Cargo.toml | 8 +- ereporter/src/server.rs | 203 +++++++++++++++++++-- nexus/internal-api/src/lib.rs | 29 ++- nexus/src/internal_api/http_entrypoints.rs | 20 ++ nexus/types/src/internal_api/params.rs | 11 ++ openapi/ereporter.json | 5 +- openapi/nexus-internal.json | 71 +++++++ 8 files changed, 326 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad3e5e723e..b5c9616095 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2725,9 +2725,13 @@ dependencies = [ "chrono", "dropshot", "ereporter-api", + "internal-dns", + "nexus-client", "omicron-common", "omicron-workspace-hack", + "serde", "slog", + "slog-dtrace", "tokio", "uuid", ] diff --git a/ereporter/Cargo.toml b/ereporter/Cargo.toml index de065e48da..09b7bb4a91 100644 --- a/ereporter/Cargo.toml +++ b/ereporter/Cargo.toml @@ -7,12 +7,16 @@ edition = "2021" anyhow.workspace = true chrono.workspace = true dropshot.workspace = true -ereporter-api = { workspace = true } -tokio = { workspace = true, features = ["sync"] } +ereporter-api.workspace = true +internal-dns.workspace = true +nexus-client.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true uuid.workspace = true +serde.workspace = true slog.workspace = true +slog-dtrace.workspace = true +tokio = { workspace = true, features = ["sync"] } [lints] workspace = true diff --git a/ereporter/src/server.rs b/ereporter/src/server.rs index d43623273a..25213f530e 100644 --- a/ereporter/src/server.rs +++ b/ereporter/src/server.rs @@ -1,23 +1,40 @@ use crate::buffer; use crate::EreportData; use dropshot::{ - ConfigLogging, EmptyScanParams, HttpError, HttpResponseDeleted, - HttpResponseOk, PaginationParams, Query, RequestContext, ResultsPage, - WhichPage, + EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk, + PaginationParams, Query, RequestContext, ResultsPage, WhichPage, }; +use internal_dns::resolver::Resolver; +use internal_dns::ServiceName; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; +use omicron_common::backoff; +use omicron_common::backoff::BackoffError; +use omicron_common::FileKv; +use slog::debug; +use slog::error; +use slog::warn; +use slog::Drain; use slog::Logger; use std::collections::VecDeque; +use std::net::IpAddr; use std::net::SocketAddr; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; +// Our public interface depends directly or indirectly on these types; we +// export them so that consumers need not depend on dropshot themselves and +// to simplify how we stage incompatible upgrades. +pub use dropshot::ConfigLogging; +pub use dropshot::ConfigLoggingIfExists; +pub use dropshot::ConfigLoggingLevel; + pub struct ServerStarter { config: Config, ctx: ServerContext, ereports: mpsc::Receiver, requests: mpsc::Receiver, + dns_resolver: Option, } pub struct RunningServer { @@ -26,20 +43,17 @@ pub struct RunningServer { // TODO(eliza): hang onto the running dropshot stuff. } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] pub struct ReporterIdentity { - /// The UUID of the reporter endpoint. + /// UUID of the reporter. pub id: Uuid, - /// The socket address on which to bind the reporter endpoint. - pub addr: SocketAddr, + /// The address to listen for ereport collection requests on. + pub address: SocketAddr, } pub struct Config { pub reporter: ReporterIdentity, - /// The address at which we attempt to register as a producer. - /// - /// If the address is not provided, the address of Nexus will be resolved - /// using internal DNS, based on the local address of the server being - /// configured. + /// How to discover the Nexus API to register the reporter. pub registration_address: Option, /// The maximum size of Dropshot requests. pub request_body_max_bytes: usize, @@ -63,6 +77,15 @@ pub enum LogConfig { Logger(Logger), } +/// How to discover Nexus' IP for registration. +#[derive(Clone)] +enum NexusDiscovery { + /// Use the provided socket address for the Nexus API. + Addr(SocketAddr), + /// Discover Nexus from internal DNS + Dns(Resolver), +} + #[derive(Clone)] struct ServerContext { tx: mpsc::Sender, @@ -74,18 +97,93 @@ impl ServerStarter { pub fn new(config: Config) -> (crate::Reporter, Self) { let (ereport_tx, ereports) = mpsc::channel(config.buffer_capacity); let (tx, requests) = mpsc::channel(128); - let this = - Self { config, ereports, ctx: ServerContext { tx }, requests }; + let this = Self { + config, + ereports, + ctx: ServerContext { tx }, + requests, + dns_resolver: None, + }; (crate::Reporter(ereport_tx), this) } + /// Use the provided internal DNS resolver rather than creating a new one. + pub fn with_resolver(self, resolver: Resolver) -> Self { + Self { dns_resolver: Some(resolver), ..self } + } + pub async fn start(self) -> anyhow::Result { - let Self { config, ctx, ereports, requests } = self; - let log = todo!("eliza: log config"); - // TODO: + let Self { config, ctx, ereports, requests, dns_resolver } = self; + let log = { + let base_logger = match config.log { + LogConfig::Config(conf) => conf.to_logger("ereporter")?, + LogConfig::Logger(log) => log.clone(), + }; + let (drain, registration) = slog_dtrace::with_drain(base_logger); + let log = Logger::root(drain.fuse(), slog::o!(FileKv)); + if let slog_dtrace::ProbeRegistration::Failed(e) = registration { + error!(log, "failed to register DTrace probes: {e}",); + } else { + debug!(log, "registered DTrace probes"); + } + log + }; + // 1. discover nexus + + // Create a resolver if needed, or use Nexus's address directly. + let discovery = match (config.registration_address, dns_resolver) { + (Some(addr), _) => { + if addr.port() == 0 { + anyhow::bail!( + "Nexus registration address must have a real port" + ); + } + debug!( + log, + "Nexus IP provided explicitly, registering with it"; + "addr" => %addr, + ); + NexusDiscovery::Addr(addr) + } + (None, None) => { + // Ensure that we've been provided with an IPv6 address if we're + // using DNS to resolve Nexus. That's required because we need + // to use the /48 to find our DNS server itself. + let IpAddr::V6(our_addr) = config.reporter.address.ip() else { + anyhow::bail!("server address must be IPv6 in order to resolve Nexus from DNS") + }; + debug!( + log, + "Nexus IP not provided, will create an internal \ + DNS resolver to resolve it" + ); + + let resolver = Resolver::new_from_ip( + log.new(slog::o!("component" => "internal-dns-resolver")), + our_addr, + )?; + NexusDiscovery::Dns(resolver) + } + (None, Some(resolver)) => { + debug!( + log, + "Nexus IP not provided, will use DNS to resolve it" + ); + NexusDiscovery::Dns(resolver) + } + }; + let nexus_addr = discovery.nexus_addr(&log).await; + let nexus_client = nexus_client::Client::new( + &format!("http://{nexus_addr}"), + log.clone(), + ); + // 2. register server and recover sequence number - let seq = todo!("eliza: discover sequence number"); + // TODO(eliza): perhaps registrations should be periodically refreshed? + let nexus_client::types::EreporterRegistered { seq } = + config.reporter.register(&log, &nexus_client).await; + // 3. spawn buffer task let buffer_task = tokio::spawn( crate::buffer::Buffer { @@ -98,13 +196,82 @@ impl ServerStarter { } .run(), ); + // 4. spawn dropshot server - todo!("eliza: dropshot server"); + // TODO(eliza): actually do that Ok(RunningServer { buffer_task }) } } +impl NexusDiscovery { + async fn nexus_addr(&self, log: &Logger) -> SocketAddr { + match self { + Self::Addr(addr) => *addr, + Self::Dns(resolver) => { + let log_failure = |error, delay| { + warn!( + log, + "failed to lookup Nexus IP, will retry"; + "delay" => ?delay, + "error" => ?error, + ); + }; + let do_lookup = || async { + resolver + .lookup_socket_v6(ServiceName::Nexus) + .await + .map_err(|e| BackoffError::transient(e.to_string())) + .map(Into::into) + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_lookup, + log_failure, + ) + .await + .expect("Expected infinite retry loop resolving Nexus address") + } + } + } +} + +impl ReporterIdentity { + async fn register( + &self, + log: &Logger, + client: &nexus_client::Client, + ) -> nexus_client::types::EreporterRegistered { + let log_failure = |error, delay| { + warn!( + log, + "failed to register ereporter with Nexus, will retry"; + "delay" => ?delay, + "error" => ?error, + ); + }; + let info = nexus_client::types::EreporterInfo { + address: self.address.to_string(), + reporter_id: self.id, + }; + + let do_register = || async { + client + .cpapi_ereporters_post(&info) + .await + .map(|response| response.into_inner()) + .map_err(|e| BackoffError::transient(e)) + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_register, + log_failure, + ) + .await + .expect("Expected infinite retry loop registering ereporter") + } +} + impl ereporter_api::EreporterApi for EreporterApiImpl { type Context = ServerContext; diff --git a/nexus/internal-api/src/lib.rs b/nexus/internal-api/src/lib.rs index 044537946b..e80fb80f45 100644 --- a/nexus/internal-api/src/lib.rs +++ b/nexus/internal-api/src/lib.rs @@ -20,15 +20,16 @@ use nexus_types::{ }, internal_api::{ params::{ - InstanceMigrateRequest, OximeterInfo, RackInitializationRequest, - SledAgentInfo, SwitchPutRequest, SwitchPutResponse, + EreporterInfo, InstanceMigrateRequest, OximeterInfo, + RackInitializationRequest, SledAgentInfo, SwitchPutRequest, + SwitchPutResponse, }, views::{BackgroundTask, DemoSaga, Ipv4NatEntryView, Saga}, }, }; use omicron_common::{ api::{ - external::{http_pagination::PaginatedById, Instance}, + external::{http_pagination::PaginatedById, Generation, Instance}, internal::nexus::{ DiskRuntimeState, DownstairsClientStopRequest, DownstairsClientStopped, ProducerEndpoint, @@ -530,6 +531,19 @@ pub trait NexusInternalApi { path_params: Path, query_params: Query, ) -> Result>, HttpError>; + + // Error reports + + /// Register an error reporter with Nexus, returning the next sequence + /// number for an error report from that reporter. + #[endpoint { + method = POST, + path = "/ereport/reporters", + }] + async fn cpapi_ereporters_post( + request_context: RequestContext, + identity: TypedBody, + ) -> Result, HttpError>; } /// Path parameters for Sled Agent requests (internal API) @@ -649,3 +663,12 @@ pub struct SledId { pub struct ProbePathParam { pub sled: Uuid, } + +/// Response to error reporter registration requests. +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct EreporterRegistered { + /// The starting sequence number of the next error report from this + /// reporter. If the reporter has not been seen by Nexus previously, this + /// may be 0. + pub seq: Generation, +} diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index 2829fdb2a6..5a2f0c040d 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -30,6 +30,7 @@ use nexus_types::external_api::params::UninitializedSledId; use nexus_types::external_api::shared::ProbeInfo; use nexus_types::external_api::shared::UninitializedSled; use nexus_types::external_api::views::SledPolicy; +use nexus_types::internal_api::params::EreporterInfo; use nexus_types::internal_api::params::InstanceMigrateRequest; use nexus_types::internal_api::params::SledAgentInfo; use nexus_types::internal_api::params::SwitchPutRequest; @@ -939,4 +940,23 @@ impl NexusInternalApi for NexusInternalApiImpl { .instrument_dropshot_handler(&rqctx, handler) .await } + + async fn cpapi_ereporters_post( + rqctx: RequestContext, + _identity: TypedBody, + ) -> Result, HttpError> { + let apictx = &rqctx.context().context; + let handler = async { + // TODO(eliza): ACTUALLY IMPLEMENT THIS PART LOL + Err(HttpError::for_unavail( + Some("NOT YET IMPLEMENTED LMAO".to_string()), + "TODO eliza actually implement this part lol lmao :)" + .to_string(), + )) + }; + apictx + .internal_latencies + .instrument_dropshot_handler(&rqctx, handler) + .await + } } diff --git a/nexus/types/src/internal_api/params.rs b/nexus/types/src/internal_api/params.rs index c803f003f1..9eba13ca00 100644 --- a/nexus/types/src/internal_api/params.rs +++ b/nexus/types/src/internal_api/params.rs @@ -214,3 +214,14 @@ pub struct InstanceMigrateRequest { /// The ID of the sled to which to migrate the target instance. pub dst_sled_id: Uuid, } + +/// Message sent by an error reporter to register itself with Nexus. +#[derive(Debug, Clone, Copy, JsonSchema, Serialize, Deserialize)] +pub struct EreporterInfo { + /// The UUID of the reporting entity. + pub reporter_id: Uuid, + + /// The address on which the error reporter server listens for ingestion + /// requests. + pub address: SocketAddr, +} diff --git a/openapi/ereporter.json b/openapi/ereporter.json index b160d577fa..81c50dc308 100644 --- a/openapi/ereporter.json +++ b/openapi/ereporter.json @@ -12,7 +12,7 @@ "paths": { "/ereports": { "get": { - "summary": "Get a list of ereports , paginated by sequence number.", + "summary": "Get a list of ereports, paginated by sequence number.", "operationId": "ereports_list", "parameters": [ { @@ -61,7 +61,8 @@ }, "/ereports/{seq}": { "delete": { - "summary": "Truncate ereports with sequence numbers less than or equal to `seq`.", + "summary": "Informs the reporter that it may freely discard ereports with sequence", + "description": "numbers less than or equal to `seq`.", "operationId": "ereports_truncate", "parameters": [ { diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index 9226b9d319..0156e60224 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -746,6 +746,41 @@ } } }, + "/ereport/reporters": { + "post": { + "summary": "Register an error reporter with Nexus, returning the next sequence", + "description": "number for an error report from that reporter.", + "operationId": "cpapi_ereporters_post", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EreporterInfo" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/EreporterRegistered" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/instances/{instance_id}/migrate": { "post": { "operationId": "instance_migrate", @@ -3264,6 +3299,42 @@ "secs" ] }, + "EreporterInfo": { + "description": "Message sent by an error reporter to register itself with Nexus.", + "type": "object", + "properties": { + "address": { + "description": "The address on which the error reporter server listens for ingestion requests.", + "type": "string" + }, + "reporter_id": { + "description": "The UUID of the reporting entity.", + "type": "string", + "format": "uuid" + } + }, + "required": [ + "address", + "reporter_id" + ] + }, + "EreporterRegistered": { + "description": "Response to error reporter registration requests.", + "type": "object", + "properties": { + "seq": { + "description": "The starting sequence number of the next error report from this reporter. If the reporter has not been seen by Nexus previously, this may be 0.", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + } + }, + "required": [ + "seq" + ] + }, "Error": { "description": "Error information from a response.", "type": "object",