Skip to content

Commit

Permalink
[ereport] a bunch more machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw committed Oct 12, 2024
1 parent 0a74e78 commit a6dd46d
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 25 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions ereporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ edition = "2021"
anyhow.workspace = true
chrono.workspace = true
dropshot.workspace = true
ereporter-api = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
ereporter-api.workspace = true
internal-dns.workspace = true
nexus-client.workspace = true
omicron-common.workspace = true
omicron-workspace-hack.workspace = true
uuid.workspace = true
serde.workspace = true
slog.workspace = true
slog-dtrace.workspace = true
tokio = { workspace = true, features = ["sync"] }

[lints]
workspace = true
203 changes: 185 additions & 18 deletions ereporter/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
use crate::buffer;
use crate::EreportData;
use dropshot::{
ConfigLogging, EmptyScanParams, HttpError, HttpResponseDeleted,
HttpResponseOk, PaginationParams, Query, RequestContext, ResultsPage,
WhichPage,
EmptyScanParams, HttpError, HttpResponseDeleted, HttpResponseOk,
PaginationParams, Query, RequestContext, ResultsPage, WhichPage,
};
use internal_dns::resolver::Resolver;
use internal_dns::ServiceName;
use omicron_common::api::external::Error;
use omicron_common::api::external::Generation;
use omicron_common::backoff;
use omicron_common::backoff::BackoffError;
use omicron_common::FileKv;
use slog::debug;
use slog::error;
use slog::warn;
use slog::Drain;
use slog::Logger;
use std::collections::VecDeque;
use std::net::IpAddr;
use std::net::SocketAddr;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

// Our public interface depends directly or indirectly on these types; we
// export them so that consumers need not depend on dropshot themselves and
// to simplify how we stage incompatible upgrades.
pub use dropshot::ConfigLogging;
pub use dropshot::ConfigLoggingIfExists;
pub use dropshot::ConfigLoggingLevel;

pub struct ServerStarter {
config: Config,
ctx: ServerContext,
ereports: mpsc::Receiver<EreportData>,
requests: mpsc::Receiver<buffer::ServerReq>,
dns_resolver: Option<Resolver>,
}

pub struct RunningServer {
Expand All @@ -26,20 +43,17 @@ pub struct RunningServer {
// TODO(eliza): hang onto the running dropshot stuff.
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct ReporterIdentity {
/// The UUID of the reporter endpoint.
/// UUID of the reporter.
pub id: Uuid,
/// The socket address on which to bind the reporter endpoint.
pub addr: SocketAddr,
/// The address to listen for ereport collection requests on.
pub address: SocketAddr,
}

pub struct Config {
pub reporter: ReporterIdentity,
/// The address at which we attempt to register as a producer.
///
/// If the address is not provided, the address of Nexus will be resolved
/// using internal DNS, based on the local address of the server being
/// configured.
/// How to discover the Nexus API to register the reporter.
pub registration_address: Option<SocketAddr>,
/// The maximum size of Dropshot requests.
pub request_body_max_bytes: usize,
Expand All @@ -63,6 +77,15 @@ pub enum LogConfig {
Logger(Logger),
}

/// How to discover Nexus' IP for registration.
#[derive(Clone)]
enum NexusDiscovery {
/// Use the provided socket address for the Nexus API.
Addr(SocketAddr),
/// Discover Nexus from internal DNS
Dns(Resolver),
}

#[derive(Clone)]
struct ServerContext {
tx: mpsc::Sender<buffer::ServerReq>,
Expand All @@ -74,18 +97,93 @@ impl ServerStarter {
pub fn new(config: Config) -> (crate::Reporter, Self) {
let (ereport_tx, ereports) = mpsc::channel(config.buffer_capacity);
let (tx, requests) = mpsc::channel(128);
let this =
Self { config, ereports, ctx: ServerContext { tx }, requests };
let this = Self {
config,
ereports,
ctx: ServerContext { tx },
requests,
dns_resolver: None,
};
(crate::Reporter(ereport_tx), this)
}

/// Use the provided internal DNS resolver rather than creating a new one.
pub fn with_resolver(self, resolver: Resolver) -> Self {
Self { dns_resolver: Some(resolver), ..self }
}

pub async fn start(self) -> anyhow::Result<RunningServer> {
let Self { config, ctx, ereports, requests } = self;
let log = todo!("eliza: log config");
// TODO:
let Self { config, ctx, ereports, requests, dns_resolver } = self;
let log = {
let base_logger = match config.log {
LogConfig::Config(conf) => conf.to_logger("ereporter")?,
LogConfig::Logger(log) => log.clone(),
};
let (drain, registration) = slog_dtrace::with_drain(base_logger);
let log = Logger::root(drain.fuse(), slog::o!(FileKv));
if let slog_dtrace::ProbeRegistration::Failed(e) = registration {
error!(log, "failed to register DTrace probes: {e}",);
} else {
debug!(log, "registered DTrace probes");
}
log
};

// 1. discover nexus

// Create a resolver if needed, or use Nexus's address directly.
let discovery = match (config.registration_address, dns_resolver) {
(Some(addr), _) => {
if addr.port() == 0 {
anyhow::bail!(
"Nexus registration address must have a real port"
);
}
debug!(
log,
"Nexus IP provided explicitly, registering with it";
"addr" => %addr,
);
NexusDiscovery::Addr(addr)
}
(None, None) => {
// Ensure that we've been provided with an IPv6 address if we're
// using DNS to resolve Nexus. That's required because we need
// to use the /48 to find our DNS server itself.
let IpAddr::V6(our_addr) = config.reporter.address.ip() else {
anyhow::bail!("server address must be IPv6 in order to resolve Nexus from DNS")
};
debug!(
log,
"Nexus IP not provided, will create an internal \
DNS resolver to resolve it"
);

let resolver = Resolver::new_from_ip(
log.new(slog::o!("component" => "internal-dns-resolver")),
our_addr,
)?;
NexusDiscovery::Dns(resolver)
}
(None, Some(resolver)) => {
debug!(
log,
"Nexus IP not provided, will use DNS to resolve it"
);
NexusDiscovery::Dns(resolver)
}
};
let nexus_addr = discovery.nexus_addr(&log).await;
let nexus_client = nexus_client::Client::new(
&format!("http://{nexus_addr}"),
log.clone(),
);

// 2. register server and recover sequence number
let seq = todo!("eliza: discover sequence number");
// TODO(eliza): perhaps registrations should be periodically refreshed?
let nexus_client::types::EreporterRegistered { seq } =
config.reporter.register(&log, &nexus_client).await;

// 3. spawn buffer task
let buffer_task = tokio::spawn(
crate::buffer::Buffer {
Expand All @@ -98,13 +196,82 @@ impl ServerStarter {
}
.run(),
);

// 4. spawn dropshot server
todo!("eliza: dropshot server");
// TODO(eliza): actually do that

Ok(RunningServer { buffer_task })
}
}

impl NexusDiscovery {
async fn nexus_addr(&self, log: &Logger) -> SocketAddr {
match self {
Self::Addr(addr) => *addr,
Self::Dns(resolver) => {
let log_failure = |error, delay| {
warn!(
log,
"failed to lookup Nexus IP, will retry";
"delay" => ?delay,
"error" => ?error,
);
};
let do_lookup = || async {
resolver
.lookup_socket_v6(ServiceName::Nexus)
.await
.map_err(|e| BackoffError::transient(e.to_string()))
.map(Into::into)
};
backoff::retry_notify(
backoff::retry_policy_internal_service(),
do_lookup,
log_failure,
)
.await
.expect("Expected infinite retry loop resolving Nexus address")
}
}
}
}

impl ReporterIdentity {
async fn register(
&self,
log: &Logger,
client: &nexus_client::Client,
) -> nexus_client::types::EreporterRegistered {
let log_failure = |error, delay| {
warn!(
log,
"failed to register ereporter with Nexus, will retry";
"delay" => ?delay,
"error" => ?error,
);
};
let info = nexus_client::types::EreporterInfo {
address: self.address.to_string(),
reporter_id: self.id,
};

let do_register = || async {
client
.cpapi_ereporters_post(&info)
.await
.map(|response| response.into_inner())
.map_err(|e| BackoffError::transient(e))
};
backoff::retry_notify(
backoff::retry_policy_internal_service(),
do_register,
log_failure,
)
.await
.expect("Expected infinite retry loop registering ereporter")
}
}

impl ereporter_api::EreporterApi for EreporterApiImpl {
type Context = ServerContext;

Expand Down
29 changes: 26 additions & 3 deletions nexus/internal-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ use nexus_types::{
},
internal_api::{
params::{
InstanceMigrateRequest, OximeterInfo, RackInitializationRequest,
SledAgentInfo, SwitchPutRequest, SwitchPutResponse,
EreporterInfo, InstanceMigrateRequest, OximeterInfo,
RackInitializationRequest, SledAgentInfo, SwitchPutRequest,
SwitchPutResponse,
},
views::{BackgroundTask, DemoSaga, Ipv4NatEntryView, Saga},
},
};
use omicron_common::{
api::{
external::{http_pagination::PaginatedById, Instance},
external::{http_pagination::PaginatedById, Generation, Instance},
internal::nexus::{
DiskRuntimeState, DownstairsClientStopRequest,
DownstairsClientStopped, ProducerEndpoint,
Expand Down Expand Up @@ -530,6 +531,19 @@ pub trait NexusInternalApi {
path_params: Path<ProbePathParam>,
query_params: Query<PaginatedById>,
) -> Result<HttpResponseOk<Vec<ProbeInfo>>, HttpError>;

// Error reports

/// Register an error reporter with Nexus, returning the next sequence
/// number for an error report from that reporter.
#[endpoint {
method = POST,
path = "/ereport/reporters",
}]
async fn cpapi_ereporters_post(
request_context: RequestContext<Self::Context>,
identity: TypedBody<EreporterInfo>,
) -> Result<HttpResponseOk<EreporterRegistered>, HttpError>;
}

/// Path parameters for Sled Agent requests (internal API)
Expand Down Expand Up @@ -649,3 +663,12 @@ pub struct SledId {
pub struct ProbePathParam {
pub sled: Uuid,
}

/// Response to error reporter registration requests.
#[derive(Clone, Debug, Serialize, JsonSchema)]
pub struct EreporterRegistered {
/// The starting sequence number of the next error report from this
/// reporter. If the reporter has not been seen by Nexus previously, this
/// may be 0.
pub seq: Generation,
}
20 changes: 20 additions & 0 deletions nexus/src/internal_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use nexus_types::external_api::params::UninitializedSledId;
use nexus_types::external_api::shared::ProbeInfo;
use nexus_types::external_api::shared::UninitializedSled;
use nexus_types::external_api::views::SledPolicy;
use nexus_types::internal_api::params::EreporterInfo;
use nexus_types::internal_api::params::InstanceMigrateRequest;
use nexus_types::internal_api::params::SledAgentInfo;
use nexus_types::internal_api::params::SwitchPutRequest;
Expand Down Expand Up @@ -939,4 +940,23 @@ impl NexusInternalApi for NexusInternalApiImpl {
.instrument_dropshot_handler(&rqctx, handler)
.await
}

async fn cpapi_ereporters_post(
rqctx: RequestContext<Self::Context>,
_identity: TypedBody<EreporterInfo>,
) -> Result<HttpResponseOk<EreporterRegistered>, HttpError> {
let apictx = &rqctx.context().context;
let handler = async {
// TODO(eliza): ACTUALLY IMPLEMENT THIS PART LOL
Err(HttpError::for_unavail(
Some("NOT YET IMPLEMENTED LMAO".to_string()),
"TODO eliza actually implement this part lol lmao :)"
.to_string(),
))
};
apictx
.internal_latencies
.instrument_dropshot_handler(&rqctx, handler)
.await
}
}
Loading

0 comments on commit a6dd46d

Please sign in to comment.