diff --git a/Cargo.lock b/Cargo.lock index f16e785b6b..7e6b9e4792 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1350,6 +1350,9 @@ dependencies = [ "schemars", "serde", "serde_json", + "slog", + "slog-async", + "slog-term", ] [[package]] @@ -6299,6 +6302,7 @@ dependencies = [ "clap", "clickhouse-admin-api", "clickhouse-admin-types", + "clickward", "dropshot 0.10.2-dev", "expectorate", "http 0.2.12", @@ -6310,6 +6314,8 @@ dependencies = [ "omicron-workspace-hack", "openapi-lint", "openapiv3", + "oximeter-db", + "oximeter-test-utils", "schemars", "serde", "serde_json", @@ -7608,10 +7614,14 @@ dependencies = [ name = "oximeter-test-utils" version = "0.1.0" dependencies = [ + "anyhow", "chrono", + "clickward", + "omicron-test-utils", "omicron-workspace-hack", "oximeter-macro-impl", "oximeter-types", + "slog", "uuid", ] diff --git a/clickhouse-admin/Cargo.toml b/clickhouse-admin/Cargo.toml index 270f779d7e..84b04f6caa 100644 --- a/clickhouse-admin/Cargo.toml +++ b/clickhouse-admin/Cargo.toml @@ -30,9 +30,13 @@ toml.workspace = true omicron-workspace-hack.workspace = true [dev-dependencies] +clickward.workspace = true +dropshot.workspace = true expectorate.workspace = true nexus-test-utils.workspace = true omicron-test-utils.workspace = true +oximeter-db.workspace = true +oximeter-test-utils.workspace = true openapi-lint.workspace = true openapiv3.workspace = true serde_json.workspace = true diff --git a/clickhouse-admin/api/src/lib.rs b/clickhouse-admin/api/src/lib.rs index a63499a7c5..af0aba14b6 100644 --- a/clickhouse-admin/api/src/lib.rs +++ b/clickhouse-admin/api/src/lib.rs @@ -3,8 +3,10 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; -use clickhouse_admin_types::{KeeperSettings, ServerSettings}; -use dropshot::{HttpError, HttpResponseCreated, RequestContext, TypedBody}; +use clickhouse_admin_types::{KeeperSettings, Lgif, ServerSettings}; +use dropshot::{ + HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody, +}; use omicron_common::api::external::Generation; use schemars::JsonSchema; use serde::Deserialize; @@ -50,4 +52,15 @@ pub trait ClickhouseAdminApi { rqctx: RequestContext, body: TypedBody, ) -> Result, HttpError>; + + /// Retrieve a logically grouped information file from a keeper node. + /// This information is used internally by ZooKeeper to manage snapshots + /// and logs for consistency and recovery. + #[endpoint { + method = GET, + path = "/keeper/lgif", + }] + async fn lgif( + rqctx: RequestContext, + ) -> Result, HttpError>; } diff --git a/clickhouse-admin/src/bin/clickhouse-admin.rs b/clickhouse-admin/src/bin/clickhouse-admin.rs index 734cd91e8d..3391a3459a 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin.rs @@ -7,7 +7,7 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use clap::Parser; -use omicron_clickhouse_admin::{Clickward, Config}; +use omicron_clickhouse_admin::{ClickhouseCli, Clickward, Config}; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; use std::net::{SocketAddr, SocketAddrV6}; @@ -27,6 +27,14 @@ enum Args { /// Path to the server configuration file #[clap(long, short, action)] config: Utf8PathBuf, + + /// Address on which the clickhouse server or keeper is listening on + #[clap(long, short = 'l', action)] + listen_address: SocketAddrV6, + + /// Path to the clickhouse binary + #[clap(long, short, action)] + binary_path: Utf8PathBuf, }, } @@ -41,17 +49,21 @@ async fn main_impl() -> Result<(), CmdError> { let args = Args::parse(); match args { - Args::Run { http_address, config } => { + Args::Run { http_address, config, listen_address, binary_path } => { let mut config = Config::from_file(&config) .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); - let clickward = Clickward::new(); + let clickhouse_cli = + ClickhouseCli::new(binary_path, listen_address); - let server = - omicron_clickhouse_admin::start_server(clickward, config) - .await - .map_err(|err| CmdError::Failure(anyhow!(err)))?; + let server = omicron_clickhouse_admin::start_server( + clickward, + clickhouse_cli, + config, + ) + .await + .map_err(|err| CmdError::Failure(anyhow!(err)))?; server.await.map_err(|err| { CmdError::Failure(anyhow!( "server failed after starting: {err}" diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs new file mode 100644 index 0000000000..edaa2c3488 --- /dev/null +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -0,0 +1,130 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use anyhow::Result; +use camino::Utf8PathBuf; +use clickhouse_admin_types::Lgif; +use dropshot::HttpError; +use illumos_utils::{output_to_exec_error, ExecutionError}; +use slog::Logger; +use slog_error_chain::{InlineErrorChain, SlogInlineError}; +use std::ffi::OsStr; +use std::io; +use std::net::SocketAddrV6; +use tokio::process::Command; + +#[derive(Debug, thiserror::Error, SlogInlineError)] +pub enum ClickhouseCliError { + #[error("failed to run `clickhouse {subcommand}`")] + Run { + description: &'static str, + subcommand: String, + #[source] + err: io::Error, + }, + #[error(transparent)] + ExecutionError(#[from] ExecutionError), + #[error("failed to parse command output")] + Parse { + description: &'static str, + stdout: String, + stderr: String, + #[source] + err: anyhow::Error, + }, +} + +impl From for HttpError { + fn from(err: ClickhouseCliError) -> Self { + match err { + ClickhouseCliError::Run { .. } + | ClickhouseCliError::Parse { .. } + | ClickhouseCliError::ExecutionError(_) => { + let message = InlineErrorChain::new(&err).to_string(); + HttpError { + status_code: http::StatusCode::INTERNAL_SERVER_ERROR, + error_code: Some(String::from("Internal")), + external_message: message.clone(), + internal_message: message, + } + } + } + } +} + +#[derive(Debug)] +pub struct ClickhouseCli { + /// Path to where the clickhouse binary is located + pub binary_path: Utf8PathBuf, + /// Address on where the clickhouse keeper is listening on + pub listen_address: SocketAddrV6, + pub log: Option, +} + +impl ClickhouseCli { + pub fn new(binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self { + Self { binary_path, listen_address, log: None } + } + + pub fn with_log(mut self, log: Logger) -> Self { + self.log = Some(log); + self + } + + pub async fn lgif(&self) -> Result { + self.keeper_client_non_interactive( + "lgif", + "Retrieve logically grouped information file", + Lgif::parse, + self.log.clone().unwrap(), + ) + .await + } + + async fn keeper_client_non_interactive( + &self, + query: &str, + subcommand_description: &'static str, + parse: F, + log: Logger, + ) -> Result + where + F: FnOnce(&Logger, &[u8]) -> Result, + { + let mut command = Command::new(&self.binary_path); + command + .arg("keeper-client") + .arg("--host") + .arg(&format!("[{}]", self.listen_address.ip())) + .arg("--port") + .arg(&format!("{}", self.listen_address.port())) + .arg("--query") + .arg(query); + + let output = command.output().await.map_err(|err| { + let err_args: Vec<&OsStr> = command.as_std().get_args().collect(); + let err_args_parsed: Vec = err_args + .iter() + .map(|&os_str| os_str.to_string_lossy().into_owned()) + .collect(); + let err_args_str = err_args_parsed.join(" "); + ClickhouseCliError::Run { + description: subcommand_description, + subcommand: err_args_str, + err, + } + })?; + + if !output.status.success() { + return Err(output_to_exec_error(command.as_std(), &output).into()); + } + + parse(&log, &output.stdout).map_err(|err| ClickhouseCliError::Parse { + description: subcommand_description, + stdout: String::from_utf8_lossy(&output.stdout).to_string(), + stderr: String::from_utf8_lossy(&output.stdout).to_string(), + err, + }) + } +} diff --git a/clickhouse-admin/src/context.rs b/clickhouse-admin/src/context.rs index cab875fe1d..665d19528a 100644 --- a/clickhouse-admin/src/context.rs +++ b/clickhouse-admin/src/context.rs @@ -2,20 +2,29 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use crate::Clickward; +use crate::{ClickhouseCli, Clickward}; use slog::Logger; pub struct ServerContext { clickward: Clickward, + clickhouse_cli: ClickhouseCli, _log: Logger, } impl ServerContext { - pub fn new(clickward: Clickward, _log: Logger) -> Self { - Self { clickward, _log } + pub fn new( + clickward: Clickward, + clickhouse_cli: ClickhouseCli, + _log: Logger, + ) -> Self { + Self { clickward, clickhouse_cli, _log } } pub fn clickward(&self) -> &Clickward { &self.clickward } + + pub fn clickhouse_cli(&self) -> &ClickhouseCli { + &self.clickhouse_cli + } } diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index ddeb35916f..976b3e3513 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -5,7 +5,10 @@ use crate::context::ServerContext; use clickhouse_admin_api::*; use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; -use dropshot::{HttpError, HttpResponseCreated, RequestContext, TypedBody}; +use clickhouse_admin_types::Lgif; +use dropshot::{ + HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody, +}; use std::sync::Arc; type ClickhouseApiDescription = dropshot::ApiDescription>; @@ -44,4 +47,12 @@ impl ClickhouseAdminApi for ClickhouseAdminImpl { let output = ctx.clickward().generate_keeper_config(keeper.settings)?; Ok(HttpResponseCreated(output)) } + + async fn lgif( + rqctx: RequestContext, + ) -> Result, HttpError> { + let ctx = rqctx.context(); + let output = ctx.clickhouse_cli().lgif().await?; + Ok(HttpResponseOk(output)) + } } diff --git a/clickhouse-admin/src/lib.rs b/clickhouse-admin/src/lib.rs index a48588c544..511a32dd50 100644 --- a/clickhouse-admin/src/lib.rs +++ b/clickhouse-admin/src/lib.rs @@ -11,11 +11,13 @@ use std::error::Error; use std::io; use std::sync::Arc; +mod clickhouse_cli; mod clickward; mod config; mod context; mod http_entrypoints; +pub use clickhouse_cli::ClickhouseCli; pub use clickward::Clickward; pub use config::Config; @@ -34,6 +36,7 @@ pub type Server = dropshot::HttpServer>; /// Start the dropshot server pub async fn start_server( clickward: Clickward, + clickhouse_cli: ClickhouseCli, server_config: Config, ) -> Result { let (drain, registration) = slog_dtrace::with_drain( @@ -56,6 +59,8 @@ pub async fn start_server( let context = ServerContext::new( clickward, + clickhouse_cli + .with_log(log.new(slog::o!("component" => "ClickhouseCli"))), log.new(slog::o!("component" => "ServerContext")), ); let http_server_starter = dropshot::HttpServerStarter::new( diff --git a/clickhouse-admin/tests/integration_test.rs b/clickhouse-admin/tests/integration_test.rs new file mode 100644 index 0000000000..9967988958 --- /dev/null +++ b/clickhouse-admin/tests/integration_test.rs @@ -0,0 +1,69 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use anyhow::Context; +use camino::Utf8PathBuf; +use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId}; +use dropshot::test_util::log_prefix_for_test; +use omicron_clickhouse_admin::ClickhouseCli; +use omicron_test_utils::dev::test_setup_log; +use oximeter_test_utils::wait_for_keepers; +use slog::info; +use std::net::{Ipv6Addr, SocketAddrV6}; +use std::str::FromStr; + +#[tokio::test] +async fn test_lgif_parsing() -> anyhow::Result<()> { + let logctx = test_setup_log("test_lgif_parsing"); + let log = logctx.log.clone(); + + let (parent_dir, prefix) = log_prefix_for_test(logctx.test_name()); + let path = parent_dir.join(format!("{prefix}-oximeter-clickward-test")); + std::fs::create_dir(&path)?; + + // We use the default ports in `test_schemas_disjoint` and must use a + // separate set here in case the two tests run concurrently. + let base_ports = BasePorts { + keeper: 29000, + raft: 29100, + clickhouse_tcp: 29200, + clickhouse_http: 29300, + clickhouse_interserver_http: 29400, + }; + + let config = DeploymentConfig { + path: path.clone(), + base_ports, + cluster_name: "oximeter_cluster".to_string(), + }; + + let mut deployment = Deployment::new(config); + + // We only need a single keeper to test the lgif command + let num_keepers = 1; + let num_replicas = 1; + deployment + .generate_config(num_keepers, num_replicas) + .context("failed to generate config")?; + deployment.deploy().context("failed to deploy")?; + + wait_for_keepers(&log, &deployment, vec![KeeperId(1)]).await?; + + let clickhouse_cli = ClickhouseCli::new( + Utf8PathBuf::from_str("clickhouse").unwrap(), + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 29001, 0, 0), + ) + .with_log(log.clone()); + + let lgif = clickhouse_cli.lgif().await.unwrap(); + + // The first log index from a newly created cluster should always be 1 + assert_eq!(lgif.first_log_idx, 1); + + info!(&log, "Cleaning up test"); + deployment.teardown()?; + std::fs::remove_dir_all(path)?; + logctx.cleanup_successful(); + Ok(()) +} diff --git a/clickhouse-admin/types/Cargo.toml b/clickhouse-admin/types/Cargo.toml index 68004303e0..5b2f532e74 100644 --- a/clickhouse-admin/types/Cargo.toml +++ b/clickhouse-admin/types/Cargo.toml @@ -18,4 +18,9 @@ omicron-workspace-hack.workspace = true schemars.workspace = true serde.workspace = true serde_json.workspace = true +slog.workspace = true expectorate.workspace = true + +[dev-dependencies] +slog-async.workspace = true +slog-term.workspace = true \ No newline at end of file diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 90779c8933..7fe3b99fd9 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -2,15 +2,17 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use atomicwrites::AtomicFile; use camino::Utf8PathBuf; use derive_more::{Add, AddAssign, Display, From}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use slog::{info, Logger}; use std::fs::create_dir; use std::io::{ErrorKind, Write}; use std::net::Ipv6Addr; +use std::str::FromStr; pub mod config; use config::*; @@ -206,21 +208,132 @@ impl KeeperSettings { } } +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +/// Logically grouped information file from a keeper node +pub struct Lgif { + /// Index of the first log entry in the current log segment + pub first_log_idx: u64, + /// Term of the leader when the first log entry was created + pub first_log_term: u64, + /// Index of the last log entry in the current log segment + pub last_log_idx: u64, + /// Term of the leader when the last log entry was created + pub last_log_term: u64, + /// Index of the last committed log entry + pub last_committed_log_idx: u64, + /// Index of the last committed log entry from the leader's perspective + pub leader_committed_log_idx: u64, + /// Target index for log commitment during replication or recovery + pub target_committed_log_idx: u64, + /// Index of the most recent snapshot taken + pub last_snapshot_idx: u64, +} + +impl Lgif { + pub fn parse(log: &Logger, data: &[u8]) -> Result { + // The reponse we get from running `clickhouse keeper-client -h {HOST} --q lgif` + // isn't in any known format (e.g. JSON), but rather a series of lines with key-value + // pairs separated by a tab: + // + // ```console + // $ clickhouse keeper-client -h localhost -p 20001 --q lgif + // first_log_idx 1 + // first_log_term 1 + // last_log_idx 10889 + // last_log_term 20 + // last_committed_log_idx 10889 + // leader_committed_log_idx 10889 + // target_committed_log_idx 10889 + // last_snapshot_idx 9465 + // ``` + let s = String::from_utf8_lossy(data); + info!( + log, + "Retrieved data from `clickhouse keeper-config lgif`"; + "output" => ?s + ); + + let expected = Lgif::expected_keys(); + + // Verify the output contains the same amount of lines as the expected keys. + // This will ensure we catch any new key-value pairs appended to the lgif output. + let lines = s.trim().lines(); + if expected.len() != lines.count() { + bail!( + "Output from the Keeper differs to the expected output keys \ + Output: {s:?} \ + Expected output keys: {expected:?}" + ); + } + + let mut vals: Vec = Vec::new(); + for (line, expected_key) in s.lines().zip(expected.clone()) { + let mut split = line.split('\t'); + let Some(key) = split.next() else { + bail!("Returned None while attempting to retrieve key"); + }; + if key != expected_key { + bail!("Extracted key `{key:?}` from output differs from expected key `{expected_key}`"); + } + let Some(val) = split.next() else { + bail!("Command output has a line that does not contain a key-value pair: {key:?}"); + }; + let val = match u64::from_str(val) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value {val:?} into u64 for key {key}: {e}"), + }; + vals.push(val); + } + + let mut iter = vals.into_iter(); + Ok(Lgif { + first_log_idx: iter.next().unwrap(), + first_log_term: iter.next().unwrap(), + last_log_idx: iter.next().unwrap(), + last_log_term: iter.next().unwrap(), + last_committed_log_idx: iter.next().unwrap(), + leader_committed_log_idx: iter.next().unwrap(), + target_committed_log_idx: iter.next().unwrap(), + last_snapshot_idx: iter.next().unwrap(), + }) + } + + fn expected_keys() -> Vec<&'static str> { + vec![ + "first_log_idx", + "first_log_term", + "last_log_idx", + "last_log_term", + "last_committed_log_idx", + "leader_committed_log_idx", + "target_committed_log_idx", + "last_snapshot_idx", + ] + } +} + #[cfg(test)] mod tests { - use std::{ - net::{Ipv4Addr, Ipv6Addr}, - str::FromStr, - }; - use camino::Utf8PathBuf; use camino_tempfile::Builder; + use slog::{o, Drain}; + use slog_term::{FullFormat, PlainDecorator, TestStdoutWriter}; + use std::net::{Ipv4Addr, Ipv6Addr}; + use std::str::FromStr; use crate::{ - ClickhouseHost, KeeperId, KeeperSettings, RaftServerSettings, ServerId, - ServerSettings, + ClickhouseHost, KeeperId, KeeperSettings, Lgif, RaftServerSettings, + ServerId, ServerSettings, }; + fn log() -> slog::Logger { + let decorator = PlainDecorator::new(TestStdoutWriter); + let drain = FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + slog::Logger::root(drain, o!()) + } + #[test] fn test_generate_keeper_config() { let config_dir = Builder::new() @@ -310,4 +423,122 @@ mod tests { expectorate::assert_contents(expected_file, &generated_content); } + + #[test] + fn test_full_lgif_parse_success() { + let log = log(); + let data = + "first_log_idx\t1\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let lgif = Lgif::parse(&log, data).unwrap(); + + assert!(lgif.first_log_idx == 1); + assert!(lgif.first_log_term == 1); + assert!(lgif.last_log_idx == 4386); + assert!(lgif.last_log_term == 1); + assert!(lgif.last_committed_log_idx == 4386); + assert!(lgif.leader_committed_log_idx == 4386); + assert!(lgif.target_committed_log_idx == 4386); + assert!(lgif.last_snapshot_idx == 0); + } + + #[test] + fn test_missing_keys_lgif_parse_fail() { + let log = log(); + let data = + "first_log_idx\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Output from the Keeper differs to the expected output keys \ + Output: \"first_log_idx\\t1\\nlast_log_idx\\t4386\\nlast_log_term\\t1\\nlast_committed_log_idx\\t4386\\nleader_committed_log_idx\\t4386\\ntarget_committed_log_idx\\t4386\\nlast_snapshot_idx\\t0\\n\\n\" \ + Expected output keys: [\"first_log_idx\", \"first_log_term\", \"last_log_idx\", \"last_log_term\", \"last_committed_log_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\", \"last_snapshot_idx\"]" + ); + } + + #[test] + fn test_empty_value_lgif_parse_fail() { + let log = log(); + let data = + "first_log_idx\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Command output has a line that does not contain a key-value pair: \"first_log_idx\"" + ); + } + + #[test] + fn test_non_u64_value_lgif_parse_fail() { + let log = log(); + let data = + "first_log_idx\t1\nfirst_log_term\tBOB\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Unable to convert value \"BOB\" into u64 for key first_log_term: invalid digit found in string" + ); + } + + #[test] + fn test_non_existent_key_with_correct_value_lgif_parse_fail() { + let log = log(); + let data = + "first_log_idx\t1\nfirst_log\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Extracted key `\"first_log\"` from output differs from expected key `first_log_term`" + ); + } + + #[test] + fn test_additional_key_value_pairs_in_output_parse_fail() { + let log = log(); + let data = "first_log_idx\t1\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\nlast_snapshot_idx\t3\n\n" + .as_bytes(); + + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Output from the Keeper differs to the expected output keys \ + Output: \"first_log_idx\\t1\\nfirst_log_term\\t1\\nlast_log_idx\\t4386\\nlast_log_term\\t1\\nlast_committed_log_idx\\t4386\\nleader_committed_log_idx\\t4386\\ntarget_committed_log_idx\\t4386\\nlast_snapshot_idx\\t0\\nlast_snapshot_idx\\t3\\n\\n\" \ + Expected output keys: [\"first_log_idx\", \"first_log_term\", \"last_log_idx\", \"last_log_term\", \"last_committed_log_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\", \"last_snapshot_idx\"]", + ); + } + + #[test] + fn test_empty_output_parse_fail() { + let log = log(); + let data = "".as_bytes(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Output from the Keeper differs to the expected output keys \ + Output: \"\" \ + Expected output keys: [\"first_log_idx\", \"first_log_term\", \"last_log_idx\", \"last_log_term\", \"last_committed_log_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\", \"last_snapshot_idx\"]", + ); + } } diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index 820c8e8243..6862654124 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -45,6 +45,31 @@ } } }, + "/keeper/lgif": { + "get": { + "summary": "Retrieve a logically grouped information file from a keeper node.", + "description": "This information is used internally by ZooKeeper to manage snapshots and logs for consistency and recovery.", + "operationId": "lgif", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Lgif" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/server/config": { "put": { "summary": "Generate a ClickHouse configuration file for a server node on a specified", @@ -353,6 +378,70 @@ "raft_servers" ] }, + "Lgif": { + "description": "Logically grouped information file from a keeper node", + "type": "object", + "properties": { + "first_log_idx": { + "description": "Index of the first log entry in the current log segment", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "first_log_term": { + "description": "Term of the leader when the first log entry was created", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_committed_log_idx": { + "description": "Index of the last committed log entry", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_log_idx": { + "description": "Index of the last log entry in the current log segment", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_log_term": { + "description": "Term of the leader when the last log entry was created", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_snapshot_idx": { + "description": "Index of the most recent snapshot taken", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "leader_committed_log_idx": { + "description": "Index of the last committed log entry from the leader's perspective", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "target_committed_log_idx": { + "description": "Target index for log commitment during replication or recovery", + "type": "integer", + "format": "uint64", + "minimum": 0 + } + }, + "required": [ + "first_log_idx", + "first_log_term", + "last_committed_log_idx", + "last_log_idx", + "last_log_term", + "last_snapshot_idx", + "leader_committed_log_idx", + "target_committed_log_idx" + ] + }, "LogConfig": { "type": "object", "properties": { diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index 05f41b3872..35f96dfd50 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -3,15 +3,13 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use anyhow::Context; -use clickward::{ - BasePorts, Deployment, DeploymentConfig, KeeperClient, KeeperError, - KeeperId, -}; +use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId}; use dropshot::test_util::log_prefix_for_test; use omicron_test_utils::dev::poll; use omicron_test_utils::dev::test_setup_log; use oximeter_db::{Client, DbWrite, OxqlResult, Sample, TestDbWrite}; -use slog::{debug, info, Logger}; +use oximeter_test_utils::wait_for_keepers; +use slog::{info, Logger}; use std::collections::BTreeSet; use std::default::Default; use std::time::Duration; @@ -452,59 +450,6 @@ async fn wait_for_num_points( Ok(()) } -/// Wait for all keeper servers to be capable of handling commands -async fn wait_for_keepers( - log: &Logger, - deployment: &Deployment, - ids: Vec, -) -> anyhow::Result<()> { - let mut keepers = vec![]; - for id in &ids { - keepers.push(KeeperClient::new(deployment.keeper_addr(*id)?)); - } - - poll::wait_for_condition( - || async { - let mut done = true; - for keeper in &keepers { - match keeper.config().await { - Ok(config) => { - // The node isn't really up yet - if config.len() != keepers.len() { - done = false; - debug!(log, "Keeper config not set"; - "addr" => keeper.addr(), - "expected" => keepers.len(), - "got" => config.len() - ); - break; - } - } - Err(e) => { - done = false; - debug!(log, "Keeper connection error: {}", e; - "addr" => keeper.addr() - ); - break; - } - } - } - if !done { - Err(poll::CondCheckError::::NotYet) - } else { - Ok(()) - } - }, - &Duration::from_millis(1), - &Duration::from_secs(30), - ) - .await - .with_context(|| format!("failed to contact all keepers: {ids:?}"))?; - - info!(log, "Keepers ready: {ids:?}"); - Ok(()) -} - /// Try to ping the server until it is responds. async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { poll::wait_for_condition( diff --git a/oximeter/test-utils/Cargo.toml b/oximeter/test-utils/Cargo.toml index f463e74aca..0bff56583e 100644 --- a/oximeter/test-utils/Cargo.toml +++ b/oximeter/test-utils/Cargo.toml @@ -8,8 +8,12 @@ license = "MPL-2.0" workspace = true [dependencies] +anyhow.workspace = true chrono.workspace = true +clickward.workspace = true omicron-workspace-hack.workspace = true +omicron-test-utils.workspace = true oximeter-macro-impl.workspace = true oximeter-types.workspace = true +slog.workspace =true uuid.workspace = true diff --git a/oximeter/test-utils/src/lib.rs b/oximeter/test-utils/src/lib.rs index 04c49add65..02f928abc0 100644 --- a/oximeter/test-utils/src/lib.rs +++ b/oximeter/test-utils/src/lib.rs @@ -14,6 +14,9 @@ // lots of related issues and discussion. extern crate self as oximeter; +use anyhow::Context; +use clickward::{Deployment, KeeperClient, KeeperError, KeeperId}; +use omicron_test_utils::dev::poll; use oximeter_macro_impl::{Metric, Target}; use oximeter_types::histogram; use oximeter_types::histogram::{Histogram, Record}; @@ -22,6 +25,8 @@ use oximeter_types::types::{ Cumulative, Datum, DatumType, FieldType, FieldValue, Measurement, Sample, }; use oximeter_types::{Metric, Target}; +use slog::{debug, info, Logger}; +use std::time::Duration; use uuid::Uuid; #[derive(Target)] @@ -127,6 +132,59 @@ pub fn generate_test_samples( samples } +/// Wait for all keeper servers to be capable of handling commands +pub async fn wait_for_keepers( + log: &Logger, + deployment: &Deployment, + ids: Vec, +) -> anyhow::Result<()> { + let mut keepers = vec![]; + for id in &ids { + keepers.push(KeeperClient::new(deployment.keeper_addr(*id)?)); + } + + poll::wait_for_condition( + || async { + let mut done = true; + for keeper in &keepers { + match keeper.config().await { + Ok(config) => { + // The node isn't really up yet + if config.len() != keepers.len() { + done = false; + debug!(log, "Keeper config not set"; + "addr" => keeper.addr(), + "expected" => keepers.len(), + "got" => config.len() + ); + break; + } + } + Err(e) => { + done = false; + debug!(log, "Keeper connection error: {}", e; + "addr" => keeper.addr() + ); + break; + } + } + } + if !done { + Err(poll::CondCheckError::::NotYet) + } else { + Ok(()) + } + }, + &Duration::from_millis(1), + &Duration::from_secs(30), + ) + .await + .with_context(|| format!("failed to contact all keepers: {ids:?}"))?; + + info!(log, "Keepers ready: {ids:?}"); + Ok(()) +} + #[cfg(test)] mod tests { use chrono::Utc; diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index bce3c35ba3..f805267977 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -129,6 +129,12 @@ const IPV6_UNSPECIFIED: IpAddr = IpAddr::V6(Ipv6Addr::UNSPECIFIED); const COCKROACH: &str = "/opt/oxide/cockroachdb/bin/cockroach"; +const CLICKHOUSE_SERVER_BINARY: &str = + "/opt/oxide//opt/oxide/clickhouse_server/clickhouse"; +const CLICKHOUSE_KEEPER_BINARY: &str = + "/opt/oxide//opt/oxide/clickhouse_keeper/clickhouse"; +const CLICKHOUSE_BINARY: &str = "/opt/oxide//opt/oxide/clickhouse/clickhouse"; + pub const SWITCH_ZONE_BASEBOARD_FILE: &str = "/opt/oxide/baseboard.json"; #[derive(thiserror::Error, Debug, slog_error_chain::SlogInlineError)] @@ -1601,12 +1607,21 @@ impl ServiceManager { ) .to_string(); + let ch_address = SocketAddr::new( + IpAddr::V6(listen_addr), + CLICKHOUSE_HTTP_PORT, + ) + .to_string(); + let clickhouse_admin_config = - PropertyGroupBuilder::new("config").add_property( - "http_address", - "astring", - admin_address, - ); + PropertyGroupBuilder::new("config") + .add_property("http_address", "astring", admin_address) + .add_property("ch_address", "astring", ch_address) + .add_property( + "ch_binary", + "astring", + CLICKHOUSE_BINARY, + ); let clickhouse_admin_service = ServiceBuilder::new("oxide/clickhouse-admin").add_instance( ServiceInstanceBuilder::new("default") @@ -1674,12 +1689,21 @@ impl ServiceManager { ) .to_string(); + let ch_address = SocketAddr::new( + IpAddr::V6(listen_addr), + CLICKHOUSE_HTTP_PORT, + ) + .to_string(); + let clickhouse_admin_config = - PropertyGroupBuilder::new("config").add_property( - "http_address", - "astring", - admin_address, - ); + PropertyGroupBuilder::new("config") + .add_property("http_address", "astring", admin_address) + .add_property("ch_address", "astring", ch_address) + .add_property( + "ch_binary", + "astring", + CLICKHOUSE_SERVER_BINARY, + ); let clickhouse_admin_service = ServiceBuilder::new("oxide/clickhouse-admin").add_instance( ServiceInstanceBuilder::new("default") @@ -1750,12 +1774,21 @@ impl ServiceManager { ) .to_string(); + let ch_address = SocketAddr::new( + IpAddr::V6(listen_addr), + CLICKHOUSE_KEEPER_TCP_PORT, + ) + .to_string(); + let clickhouse_admin_config = - PropertyGroupBuilder::new("config").add_property( - "http_address", - "astring", - admin_address, - ); + PropertyGroupBuilder::new("config") + .add_property("http_address", "astring", admin_address) + .add_property("ch_address", "astring", ch_address) + .add_property( + "ch_binary", + "astring", + CLICKHOUSE_KEEPER_BINARY, + ); let clickhouse_admin_service = ServiceBuilder::new("oxide/clickhouse-admin").add_instance( ServiceInstanceBuilder::new("default") diff --git a/smf/clickhouse-admin/manifest.xml b/smf/clickhouse-admin/manifest.xml index 84d61e4caa..379c738f5d 100644 --- a/smf/clickhouse-admin/manifest.xml +++ b/smf/clickhouse-admin/manifest.xml @@ -17,12 +17,14 @@ + +