From e3a40bfe76e80eb6ef4293fa062292c491c0e688 Mon Sep 17 00:00:00 2001 From: karencfv Date: Tue, 10 Sep 2024 16:19:32 +1200 Subject: [PATCH 01/20] Basic lgif endpoint --- clickhouse-admin/api/src/lib.rs | 14 ++- clickhouse-admin/src/bin/clickhouse-admin.rs | 25 +++-- clickhouse-admin/src/clickhouse_cli.rs | 108 +++++++++++++++++++ clickhouse-admin/src/context.rs | 15 ++- clickhouse-admin/src/http_entrypoints.rs | 10 +- clickhouse-admin/src/lib.rs | 4 + openapi/clickhouse-admin.json | 26 +++++ sled-agent/src/services.rs | 63 ++++++++--- smf/clickhouse-admin/manifest.xml | 4 +- 9 files changed, 241 insertions(+), 28 deletions(-) create mode 100644 clickhouse-admin/src/clickhouse_cli.rs diff --git a/clickhouse-admin/api/src/lib.rs b/clickhouse-admin/api/src/lib.rs index a63499a7c5..2fd6eaf397 100644 --- a/clickhouse-admin/api/src/lib.rs +++ b/clickhouse-admin/api/src/lib.rs @@ -4,7 +4,7 @@ use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; use clickhouse_admin_types::{KeeperSettings, ServerSettings}; -use dropshot::{HttpError, HttpResponseCreated, RequestContext, TypedBody}; +use dropshot::{HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody}; use omicron_common::api::external::Generation; use schemars::JsonSchema; use serde::Deserialize; @@ -50,4 +50,16 @@ pub trait ClickhouseAdminApi { rqctx: RequestContext, body: TypedBody, ) -> Result, HttpError>; + + /// Retrieve a logically grouped information file from a keeper node. + /// This information is used internally by ZooKeeper to manage snapshots + /// and logs for consistency and recovery. + #[endpoint { + method = GET, + path = "/keeper/lgif", + }] + async fn lgif( + rqctx: RequestContext, + // TODO: Actually return something useful + ) -> Result, HttpError>; } diff --git a/clickhouse-admin/src/bin/clickhouse-admin.rs b/clickhouse-admin/src/bin/clickhouse-admin.rs index 734cd91e8d..b2e451488c 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin.rs @@ -7,7 +7,7 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use clap::Parser; -use omicron_clickhouse_admin::{Clickward, Config}; +use omicron_clickhouse_admin::{ClickhouseCli, Clickward, Config}; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; use std::net::{SocketAddr, SocketAddrV6}; @@ -27,6 +27,14 @@ enum Args { /// Path to the server configuration file #[clap(long, short, action)] config: Utf8PathBuf, + + /// Address on which the clickhouse server or keeper is listening on + #[clap(long, short = 'l', action)] + listen_address: SocketAddrV6, + + /// Path to the clickhouse binary + #[clap(long, short, action)] + binary_path: Utf8PathBuf, }, } @@ -41,17 +49,20 @@ async fn main_impl() -> Result<(), CmdError> { let args = Args::parse(); match args { - Args::Run { http_address, config } => { + Args::Run { http_address, config, listen_address, binary_path } => { let mut config = Config::from_file(&config) .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); - let clickward = Clickward::new(); + let keeper_client = ClickhouseCli::new(binary_path, listen_address); - let server = - omicron_clickhouse_admin::start_server(clickward, config) - .await - .map_err(|err| CmdError::Failure(anyhow!(err)))?; + let server = omicron_clickhouse_admin::start_server( + clickward, + keeper_client, + config, + ) + .await + .map_err(|err| CmdError::Failure(anyhow!(err)))?; server.await.map_err(|err| { CmdError::Failure(anyhow!( "server failed after starting: {err}" diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs new file mode 100644 index 0000000000..967b0412e5 --- /dev/null +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -0,0 +1,108 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use anyhow::Result; +use camino::Utf8PathBuf; +use dropshot::HttpError; +use illumos_utils::{ExecutionError, output_to_exec_error}; +use slog_error_chain::{InlineErrorChain, SlogInlineError}; +use std::ffi::OsStr; +use std::io; +use std::net::SocketAddrV6; +use tokio::process::Command; + +#[derive(Debug, thiserror::Error, SlogInlineError)] +pub enum ClickhouseCliError { + #[error("failed to run `clickhouse {subcommand}`")] + Run { + description: &'static str, + subcommand: String, + #[source] + err: io::Error, + }, + #[error(transparent)] + ExecutionError(#[from] ExecutionError), +} + +impl From for HttpError { + fn from(err: ClickhouseCliError) -> Self { + match err { + ClickhouseCliError::Run { .. } + | ClickhouseCliError::ExecutionError(_) => { + let message = InlineErrorChain::new(&err).to_string(); + HttpError { + status_code: http::StatusCode::INTERNAL_SERVER_ERROR, + error_code: Some(String::from("Internal")), + external_message: message.clone(), + internal_message: message, + } + } + } + } +} + +#[derive(Debug)] +pub struct ClickhouseCli { + /// Path to where the clickhouse binary is located + pub binary_path: Utf8PathBuf, + /// Address on where the clickhouse keeper is listening on + pub listen_address: SocketAddrV6, +} + +impl ClickhouseCli { + pub fn new (binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self { + Self {binary_path, listen_address} + } + + pub async fn lgif(&self) -> Result { + self.keeper_client_non_interactive( + ["lgif"].into_iter(), + "Retrieve logically grouped information file", + ) + .await + } + + async fn keeper_client_non_interactive<'a, I>( + &self, + subcommand_args: I, + subcommand_description: &'static str, + ) -> Result + where + I: Iterator, + { + let mut command = Command::new(&self.binary_path); + command + .arg("keeper-client") + .arg("--host") + .arg(&format!("[{}]", self.listen_address.ip())) + .arg("--port") + .arg(&format!("{}", self.listen_address.port())) + .arg("--query"); + + let args: Vec<&'a str> = subcommand_args.collect(); + let query = args.join(" "); + command.arg(query); + + let output = command.output().await.map_err(|err| { + let args: Vec<&OsStr> = command.as_std().get_args().collect(); + let args_parsed: Vec = args + .iter() + .map(|&os_str| os_str.to_string_lossy().into_owned()) + .collect(); + let args_str = args_parsed.join(" "); + ClickhouseCliError::Run { + description: subcommand_description, + subcommand: args_str, + err, + } + })?; + + if !output.status.success() { + return Err(output_to_exec_error(command.as_std(), &output).into()); + } + + // TODO: Actually parse this + Ok(String::from_utf8_lossy(&output.stdout).to_string()) + } +} diff --git a/clickhouse-admin/src/context.rs b/clickhouse-admin/src/context.rs index cab875fe1d..665d19528a 100644 --- a/clickhouse-admin/src/context.rs +++ b/clickhouse-admin/src/context.rs @@ -2,20 +2,29 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use crate::Clickward; +use crate::{ClickhouseCli, Clickward}; use slog::Logger; pub struct ServerContext { clickward: Clickward, + clickhouse_cli: ClickhouseCli, _log: Logger, } impl ServerContext { - pub fn new(clickward: Clickward, _log: Logger) -> Self { - Self { clickward, _log } + pub fn new( + clickward: Clickward, + clickhouse_cli: ClickhouseCli, + _log: Logger, + ) -> Self { + Self { clickward, clickhouse_cli, _log } } pub fn clickward(&self) -> &Clickward { &self.clickward } + + pub fn clickhouse_cli(&self) -> &ClickhouseCli { + &self.clickhouse_cli + } } diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index ddeb35916f..b9d1405207 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -5,7 +5,7 @@ use crate::context::ServerContext; use clickhouse_admin_api::*; use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; -use dropshot::{HttpError, HttpResponseCreated, RequestContext, TypedBody}; +use dropshot::{HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody}; use std::sync::Arc; type ClickhouseApiDescription = dropshot::ApiDescription>; @@ -44,4 +44,12 @@ impl ClickhouseAdminApi for ClickhouseAdminImpl { let output = ctx.clickward().generate_keeper_config(keeper.settings)?; Ok(HttpResponseCreated(output)) } + + async fn lgif( + rqctx: RequestContext, + ) -> Result, HttpError> { + let ctx = rqctx.context(); + let output = ctx.clickhouse_cli().lgif().await?; + Ok(HttpResponseOk(output)) + } } diff --git a/clickhouse-admin/src/lib.rs b/clickhouse-admin/src/lib.rs index a48588c544..6539fe2105 100644 --- a/clickhouse-admin/src/lib.rs +++ b/clickhouse-admin/src/lib.rs @@ -11,11 +11,13 @@ use std::error::Error; use std::io; use std::sync::Arc; +mod clickhouse_cli; mod clickward; mod config; mod context; mod http_entrypoints; +pub use clickhouse_cli::ClickhouseCli; pub use clickward::Clickward; pub use config::Config; @@ -34,6 +36,7 @@ pub type Server = dropshot::HttpServer>; /// Start the dropshot server pub async fn start_server( clickward: Clickward, + keeper_client: ClickhouseCli, server_config: Config, ) -> Result { let (drain, registration) = slog_dtrace::with_drain( @@ -56,6 +59,7 @@ pub async fn start_server( let context = ServerContext::new( clickward, + keeper_client, log.new(slog::o!("component" => "ServerContext")), ); let http_server_starter = dropshot::HttpServerStarter::new( diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index 820c8e8243..8f91d4de62 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -45,6 +45,32 @@ } } }, + "/keeper/lgif": { + "get": { + "summary": "Retrieve a logically grouped information file from a keeper node.", + "description": "This information is used internally by ZooKeeper to manage snapshots and logs for consistency and recovery.", + "operationId": "lgif", + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "title": "String", + "type": "string" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/server/config": { "put": { "summary": "Generate a ClickHouse configuration file for a server node on a specified", diff --git a/sled-agent/src/services.rs b/sled-agent/src/services.rs index f386fd1d0f..f9f919284b 100644 --- a/sled-agent/src/services.rs +++ b/sled-agent/src/services.rs @@ -128,6 +128,12 @@ const IPV6_UNSPECIFIED: IpAddr = IpAddr::V6(Ipv6Addr::UNSPECIFIED); const COCKROACH: &str = "/opt/oxide/cockroachdb/bin/cockroach"; +const CLICKHOUSE_SERVER_BINARY: &str = + "/opt/oxide//opt/oxide/clickhouse_server/clickhouse"; +const CLICKHOUSE_KEEPER_BINARY: &str = + "/opt/oxide//opt/oxide/clickhouse_keeper/clickhouse"; +const CLICKHOUSE_BINARY: &str = "/opt/oxide//opt/oxide/clickhouse/clickhouse"; + pub const SWITCH_ZONE_BASEBOARD_FILE: &str = "/opt/oxide/baseboard.json"; #[derive(thiserror::Error, Debug, slog_error_chain::SlogInlineError)] @@ -1579,12 +1585,21 @@ impl ServiceManager { ) .to_string(); + let ch_address = SocketAddr::new( + IpAddr::V6(listen_addr), + CLICKHOUSE_HTTP_PORT, + ) + .to_string(); + let clickhouse_admin_config = - PropertyGroupBuilder::new("config").add_property( - "http_address", - "astring", - admin_address, - ); + PropertyGroupBuilder::new("config") + .add_property("http_address", "astring", admin_address) + .add_property("ch_address", "astring", ch_address) + .add_property( + "ch_binary", + "astring", + CLICKHOUSE_BINARY, + ); let clickhouse_admin_service = ServiceBuilder::new("oxide/clickhouse-admin").add_instance( ServiceInstanceBuilder::new("default") @@ -1652,12 +1667,21 @@ impl ServiceManager { ) .to_string(); + let ch_address = SocketAddr::new( + IpAddr::V6(listen_addr), + CLICKHOUSE_HTTP_PORT, + ) + .to_string(); + let clickhouse_admin_config = - PropertyGroupBuilder::new("config").add_property( - "http_address", - "astring", - admin_address, - ); + PropertyGroupBuilder::new("config") + .add_property("http_address", "astring", admin_address) + .add_property("ch_address", "astring", ch_address) + .add_property( + "ch_binary", + "astring", + CLICKHOUSE_SERVER_BINARY, + ); let clickhouse_admin_service = ServiceBuilder::new("oxide/clickhouse-admin").add_instance( ServiceInstanceBuilder::new("default") @@ -1728,12 +1752,21 @@ impl ServiceManager { ) .to_string(); + let ch_address = SocketAddr::new( + IpAddr::V6(listen_addr), + CLICKHOUSE_KEEPER_TCP_PORT, + ) + .to_string(); + let clickhouse_admin_config = - PropertyGroupBuilder::new("config").add_property( - "http_address", - "astring", - admin_address, - ); + PropertyGroupBuilder::new("config") + .add_property("http_address", "astring", admin_address) + .add_property("ch_address", "astring", ch_address) + .add_property( + "ch_binary", + "astring", + CLICKHOUSE_KEEPER_BINARY, + ); let clickhouse_admin_service = ServiceBuilder::new("oxide/clickhouse-admin").add_instance( ServiceInstanceBuilder::new("default") diff --git a/smf/clickhouse-admin/manifest.xml b/smf/clickhouse-admin/manifest.xml index 84d61e4caa..379c738f5d 100644 --- a/smf/clickhouse-admin/manifest.xml +++ b/smf/clickhouse-admin/manifest.xml @@ -17,12 +17,14 @@ + + From 78520f547a0fceea29750d3dec7be38ac652c6ea Mon Sep 17 00:00:00 2001 From: karencfv Date: Tue, 10 Sep 2024 18:38:30 +1200 Subject: [PATCH 02/20] Return Lgif --- clickhouse-admin/api/src/lib.rs | 9 ++++--- clickhouse-admin/src/clickhouse_cli.rs | 34 ++++++++++++++++++------ clickhouse-admin/src/http_entrypoints.rs | 7 +++-- clickhouse-admin/types/src/lib.rs | 34 ++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 14 deletions(-) diff --git a/clickhouse-admin/api/src/lib.rs b/clickhouse-admin/api/src/lib.rs index 2fd6eaf397..af0aba14b6 100644 --- a/clickhouse-admin/api/src/lib.rs +++ b/clickhouse-admin/api/src/lib.rs @@ -3,8 +3,10 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; -use clickhouse_admin_types::{KeeperSettings, ServerSettings}; -use dropshot::{HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody}; +use clickhouse_admin_types::{KeeperSettings, Lgif, ServerSettings}; +use dropshot::{ + HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody, +}; use omicron_common::api::external::Generation; use schemars::JsonSchema; use serde::Deserialize; @@ -60,6 +62,5 @@ pub trait ClickhouseAdminApi { }] async fn lgif( rqctx: RequestContext, - // TODO: Actually return something useful - ) -> Result, HttpError>; + ) -> Result, HttpError>; } diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index 967b0412e5..507f603e07 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -4,8 +4,9 @@ use anyhow::Result; use camino::Utf8PathBuf; +use clickhouse_admin_types::Lgif; use dropshot::HttpError; -use illumos_utils::{ExecutionError, output_to_exec_error}; +use illumos_utils::{output_to_exec_error, ExecutionError}; use slog_error_chain::{InlineErrorChain, SlogInlineError}; use std::ffi::OsStr; use std::io; @@ -23,12 +24,22 @@ pub enum ClickhouseCliError { }, #[error(transparent)] ExecutionError(#[from] ExecutionError), + #[error("failed to parse command output")] + Parse { + description: &'static str, + stdout: String, + stderr: String, + #[source] + err: anyhow::Error, + }, } impl From for HttpError { fn from(err: ClickhouseCliError) -> Self { match err { ClickhouseCliError::Run { .. } + // TODO: Can I make this message better? + | ClickhouseCliError::Parse { .. } | ClickhouseCliError::ExecutionError(_) => { let message = InlineErrorChain::new(&err).to_string(); HttpError { @@ -51,25 +62,28 @@ pub struct ClickhouseCli { } impl ClickhouseCli { - pub fn new (binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self { - Self {binary_path, listen_address} + pub fn new(binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self { + Self { binary_path, listen_address } } - pub async fn lgif(&self) -> Result { + pub async fn lgif(&self) -> Result { self.keeper_client_non_interactive( ["lgif"].into_iter(), "Retrieve logically grouped information file", + Lgif::parse, ) .await } - async fn keeper_client_non_interactive<'a, I>( + async fn keeper_client_non_interactive<'a, I, F, T>( &self, subcommand_args: I, subcommand_description: &'static str, - ) -> Result + parse: F, + ) -> Result where I: Iterator, + F: FnOnce(&[u8]) -> Result, { let mut command = Command::new(&self.binary_path); command @@ -102,7 +116,11 @@ impl ClickhouseCli { return Err(output_to_exec_error(command.as_std(), &output).into()); } - // TODO: Actually parse this - Ok(String::from_utf8_lossy(&output.stdout).to_string()) + parse(&output.stdout).map_err(|err| ClickhouseCliError::Parse { + description: subcommand_description, + stdout: String::from_utf8_lossy(&output.stdout).to_string(), + stderr: String::from_utf8_lossy(&output.stdout).to_string(), + err, + }) } } diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index b9d1405207..976b3e3513 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -5,7 +5,10 @@ use crate::context::ServerContext; use clickhouse_admin_api::*; use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig}; -use dropshot::{HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody}; +use clickhouse_admin_types::Lgif; +use dropshot::{ + HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody, +}; use std::sync::Arc; type ClickhouseApiDescription = dropshot::ApiDescription>; @@ -47,7 +50,7 @@ impl ClickhouseAdminApi for ClickhouseAdminImpl { async fn lgif( rqctx: RequestContext, - ) -> Result, HttpError> { + ) -> Result, HttpError> { let ctx = rqctx.context(); let output = ctx.clickhouse_cli().lgif().await?; Ok(HttpResponseOk(output)) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 90779c8933..b8b666c98c 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use std::fs::create_dir; use std::io::{ErrorKind, Write}; use std::net::Ipv6Addr; +use std::str::FromStr; pub mod config; use config::*; @@ -206,6 +207,39 @@ impl KeeperSettings { } } +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub struct Lgif { + first_log_idx: u64, + first_log_term: u64, + last_log_idx: u64, + last_log_term: u64, + last_committed_log_idx: u64, + leader_committed_log_idx: u64, + target_committed_log_idx: u64, + last_snapshot_idx: u64, +} + +impl Lgif { + pub fn parse(data: &[u8]) -> Result { + let binding = String::from_utf8_lossy(data); + let output = binding.as_ref(); + let p: Vec<&str> = output.split(|c| c == '\t' || c == '\n').collect(); + + // TODO: Make this not suck + Ok(Self { + first_log_idx: u64::from_str(p[1]).unwrap(), + first_log_term: u64::from_str(p[3]).unwrap(), + last_log_idx: u64::from_str(p[5]).unwrap(), + last_log_term: u64::from_str(p[7]).unwrap(), + last_committed_log_idx: u64::from_str(p[9]).unwrap(), + leader_committed_log_idx: u64::from_str(p[11]).unwrap(), + target_committed_log_idx: u64::from_str(p[13]).unwrap(), + last_snapshot_idx: u64::from_str(p[15]).unwrap(), + }) + } +} + #[cfg(test)] mod tests { use std::{ From 40ae8f2c4eba0b1be82cd06d2ac4395310cbfac3 Mon Sep 17 00:00:00 2001 From: karencfv Date: Tue, 10 Sep 2024 18:39:35 +1200 Subject: [PATCH 03/20] generate opeapi spec --- openapi/clickhouse-admin.json | 58 +++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index 8f91d4de62..0c89cca1e2 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -56,8 +56,7 @@ "content": { "application/json": { "schema": { - "title": "String", - "type": "string" + "$ref": "#/components/schemas/Lgif" } } } @@ -379,6 +378,61 @@ "raft_servers" ] }, + "Lgif": { + "type": "object", + "properties": { + "first_log_idx": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "first_log_term": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_committed_log_idx": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_log_idx": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_log_term": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "last_snapshot_idx": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "leader_committed_log_idx": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "target_committed_log_idx": { + "type": "integer", + "format": "uint64", + "minimum": 0 + } + }, + "required": [ + "first_log_idx", + "first_log_term", + "last_committed_log_idx", + "last_log_idx", + "last_log_term", + "last_snapshot_idx", + "leader_committed_log_idx", + "target_committed_log_idx" + ] + }, "LogConfig": { "type": "object", "properties": { From 9959fd75b9cff587f67be52db5c0a1f4f1722f8b Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 11 Sep 2024 14:01:38 +1200 Subject: [PATCH 04/20] improve the parsing a bit --- clickhouse-admin/types/src/lib.rs | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index b8b666c98c..689b4450ec 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -8,6 +8,7 @@ use camino::Utf8PathBuf; use derive_more::{Add, AddAssign, Display, From}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fs::create_dir; use std::io::{ErrorKind, Write}; use std::net::Ipv6Addr; @@ -224,18 +225,30 @@ impl Lgif { pub fn parse(data: &[u8]) -> Result { let binding = String::from_utf8_lossy(data); let output = binding.as_ref(); - let p: Vec<&str> = output.split(|c| c == '\t' || c == '\n').collect(); + let mut p: Vec<&str> = output.split(|c| c == '\t' || c == '\n').collect(); + + let mut lgif: HashMap<&str, &str> = HashMap::new(); + + // TODO: First I need to make sure it's an even number? + let items_to_add = p.len()/2; + // Create a hashmap with the vec + for _ in 0..items_to_add { + let key = p.remove(0); + let value = p.remove(0); + + lgif.insert(key, value); + } // TODO: Make this not suck Ok(Self { - first_log_idx: u64::from_str(p[1]).unwrap(), - first_log_term: u64::from_str(p[3]).unwrap(), - last_log_idx: u64::from_str(p[5]).unwrap(), - last_log_term: u64::from_str(p[7]).unwrap(), - last_committed_log_idx: u64::from_str(p[9]).unwrap(), - leader_committed_log_idx: u64::from_str(p[11]).unwrap(), - target_committed_log_idx: u64::from_str(p[13]).unwrap(), - last_snapshot_idx: u64::from_str(p[15]).unwrap(), + first_log_idx: u64::from_str(lgif.get("first_log_idx").unwrap()).unwrap(), + first_log_term: u64::from_str(lgif.get("first_log_term").unwrap()).unwrap(), + last_log_idx: u64::from_str(lgif.get("last_log_idx").unwrap()).unwrap(), + last_log_term: u64::from_str(lgif.get("last_log_term").unwrap()).unwrap(), + last_committed_log_idx: u64::from_str(lgif.get("last_committed_log_idx").unwrap()).unwrap(), + leader_committed_log_idx: u64::from_str(lgif.get("leader_committed_log_idx").unwrap()).unwrap(), + target_committed_log_idx: u64::from_str(lgif.get("target_committed_log_idx").unwrap()).unwrap(), + last_snapshot_idx: u64::from_str(lgif.get("last_snapshot_idx").unwrap()).unwrap(), }) } } From bc1705a33fc67ba26a5aa753b0b13fd1c7a5c41b Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 11 Sep 2024 15:12:02 +1200 Subject: [PATCH 05/20] better... --- clickhouse-admin/types/src/lib.rs | 55 ++++++++++++++++++------------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 689b4450ec..e7a72d5d6b 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -224,31 +224,40 @@ pub struct Lgif { impl Lgif { pub fn parse(data: &[u8]) -> Result { let binding = String::from_utf8_lossy(data); - let output = binding.as_ref(); - let mut p: Vec<&str> = output.split(|c| c == '\t' || c == '\n').collect(); + let lines = binding.lines(); + let mut lgif: HashMap = HashMap::new(); + + for line in lines { + let line = line.trim(); + if !line.is_empty() { + let l: Vec<&str> = line.split('\t').collect(); + + if l.len() != 2 { + // TODO: Log that there was an empty line + continue; + } + + // println!("Vec: {:#?}", l); + let key = l[0].to_string(); + let value = match u64::from_str(l[1]) { + Ok(v) => v, + // TODO: Log error + Err(_) => continue, + }; + + lgif.insert(key, value); + } + }; - let mut lgif: HashMap<&str, &str> = HashMap::new(); - - // TODO: First I need to make sure it's an even number? - let items_to_add = p.len()/2; - // Create a hashmap with the vec - for _ in 0..items_to_add { - let key = p.remove(0); - let value = p.remove(0); - - lgif.insert(key, value); - } - - // TODO: Make this not suck Ok(Self { - first_log_idx: u64::from_str(lgif.get("first_log_idx").unwrap()).unwrap(), - first_log_term: u64::from_str(lgif.get("first_log_term").unwrap()).unwrap(), - last_log_idx: u64::from_str(lgif.get("last_log_idx").unwrap()).unwrap(), - last_log_term: u64::from_str(lgif.get("last_log_term").unwrap()).unwrap(), - last_committed_log_idx: u64::from_str(lgif.get("last_committed_log_idx").unwrap()).unwrap(), - leader_committed_log_idx: u64::from_str(lgif.get("leader_committed_log_idx").unwrap()).unwrap(), - target_committed_log_idx: u64::from_str(lgif.get("target_committed_log_idx").unwrap()).unwrap(), - last_snapshot_idx: u64::from_str(lgif.get("last_snapshot_idx").unwrap()).unwrap(), + first_log_idx: lgif.remove("first_log_idx").unwrap(), + first_log_term: lgif.remove("first_log_term").unwrap(), + last_log_idx: lgif.remove("last_log_idx").unwrap(), + last_log_term: lgif.remove("last_log_term").unwrap(), + last_committed_log_idx: lgif.remove("last_committed_log_idx").unwrap(), + leader_committed_log_idx: lgif.remove("leader_committed_log_idx").unwrap(), + target_committed_log_idx: lgif.remove("target_committed_log_idx").unwrap(), + last_snapshot_idx: lgif.remove("last_snapshot_idx").unwrap(), }) } } From bb3a272d4ecec2acb23834b570fdecf0e02c2289 Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 11 Sep 2024 16:28:01 +1200 Subject: [PATCH 06/20] Improvement... --- clickhouse-admin/types/src/lib.rs | 110 +++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 24 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index e7a72d5d6b..799d8446a8 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -208,17 +208,59 @@ impl KeeperSettings { } } -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] -#[serde(rename_all = "snake_case")] -pub struct Lgif { - first_log_idx: u64, - first_log_term: u64, - last_log_idx: u64, - last_log_term: u64, - last_committed_log_idx: u64, - leader_committed_log_idx: u64, - target_committed_log_idx: u64, - last_snapshot_idx: u64, +macro_rules! define_struct_and_get_field_names { + ( + struct $name:ident { + $($field_name:ident: $field_type:ty),* $(,)? + } + ) => { + #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] + #[serde(rename_all = "snake_case")] + pub struct $name { + $($field_name: $field_type),* + } + + // Implement a method to check if a field name matches a given string + impl $name { + pub fn field_exists(field_name: &str) -> bool { + match field_name { + $( + stringify!($field_name) => true, + )* + _ => false, + } + } + + // Method to set a field's value based on the field name + // TODO: Improve on the result + pub fn set_field_value(&mut self, field_name: &str, value: Option) -> Result<(), &'static str> { + match field_name { + $( + stringify!($field_name) => { + self.$field_name = value; + Ok(()) + }, + )* + _ => Err("Field name not found."), + } + } + } + }; +} + +//#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +//#[serde(rename_all = "snake_case")] +define_struct_and_get_field_names! { + struct Lgif { + first_log_idx: Option, + first_log_term: Option, + last_log_idx: Option, + last_log_term: Option, + last_committed_log_idx: Option, + leader_committed_log_idx: Option, + target_committed_log_idx: Option, + last_snapshot_idx: Option, +} } impl Lgif { @@ -237,7 +279,6 @@ impl Lgif { continue; } - // println!("Vec: {:#?}", l); let key = l[0].to_string(); let value = match u64::from_str(l[1]) { Ok(v) => v, @@ -247,18 +288,39 @@ impl Lgif { lgif.insert(key, value); } - }; - - Ok(Self { - first_log_idx: lgif.remove("first_log_idx").unwrap(), - first_log_term: lgif.remove("first_log_term").unwrap(), - last_log_idx: lgif.remove("last_log_idx").unwrap(), - last_log_term: lgif.remove("last_log_term").unwrap(), - last_committed_log_idx: lgif.remove("last_committed_log_idx").unwrap(), - leader_committed_log_idx: lgif.remove("leader_committed_log_idx").unwrap(), - target_committed_log_idx: lgif.remove("target_committed_log_idx").unwrap(), - last_snapshot_idx: lgif.remove("last_snapshot_idx").unwrap(), - }) + }; + + let mut parsed_data = Self { + first_log_idx: None, + first_log_term: None, + last_log_idx: None, + last_log_term: None, + last_committed_log_idx: None, + leader_committed_log_idx: None, + target_committed_log_idx: None, + last_snapshot_idx: None, + }; + + for (key, value) in lgif { + if Lgif::field_exists(&key) { + let _ = parsed_data.set_field_value(&key, Some(value)); + } else { + continue; + }; + } + + + Ok(parsed_data) + // Ok(Self { + // first_log_idx: lgif.remove("first_log_idx"), + // first_log_term: lgif.remove("first_log_term"), + // last_log_idx: lgif.remove("last_log_idx"), + // last_log_term: lgif.remove("last_log_term"), + // last_committed_log_idx: lgif.remove("last_committed_log_idx"), + // leader_committed_log_idx: lgif.remove("leader_committed_log_idx"), + // target_committed_log_idx: lgif.remove("target_committed_log_idx"), + // last_snapshot_idx: lgif.remove("last_snapshot_idx"), + // }) } } From 3c8ada05ab24ada575aef81494db6c5c88207f74 Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 11 Sep 2024 20:08:16 +1200 Subject: [PATCH 07/20] clean up --- clickhouse-admin/types/src/lib.rs | 98 +++++++++++++------------------ 1 file changed, 40 insertions(+), 58 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 799d8446a8..323cfcb6d6 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use atomicwrites::AtomicFile; use camino::Utf8PathBuf; use derive_more::{Add, AddAssign, Display, From}; @@ -208,7 +208,7 @@ impl KeeperSettings { } } -macro_rules! define_struct_and_get_field_names { +macro_rules! define_struct_and_set_values { ( struct $name:ident { $($field_name:ident: $field_type:ty),* $(,)? @@ -220,54 +220,54 @@ macro_rules! define_struct_and_get_field_names { $($field_name: $field_type),* } - // Implement a method to check if a field name matches a given string impl $name { - pub fn field_exists(field_name: &str) -> bool { - match field_name { - $( - stringify!($field_name) => true, - )* - _ => false, - } - } - - // Method to set a field's value based on the field name - // TODO: Improve on the result - pub fn set_field_value(&mut self, field_name: &str, value: Option) -> Result<(), &'static str> { - match field_name { + // Check if a field name matches a given string, and set its value + pub fn set_field_value(&mut self, key: &str, value: Option) -> Result<()> { + match key { $( stringify!($field_name) => { self.$field_name = value; Ok(()) }, )* - _ => Err("Field name not found."), + _ => bail!("Field name '{}' not found.", key), } } } }; } -//#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] -//#[serde(rename_all = "snake_case")] -define_struct_and_get_field_names! { - struct Lgif { - first_log_idx: Option, - first_log_term: Option, - last_log_idx: Option, - last_log_term: Option, - last_committed_log_idx: Option, - leader_committed_log_idx: Option, - target_committed_log_idx: Option, - last_snapshot_idx: Option, -} +define_struct_and_set_values! { + struct Lgif { + first_log_idx: Option, + first_log_term: Option, + last_log_idx: Option, + last_log_term: Option, + last_committed_log_idx: Option, + leader_committed_log_idx: Option, + target_committed_log_idx: Option, + last_snapshot_idx: Option, + } } impl Lgif { + pub fn new() -> Self { + Self { + first_log_idx: None, + first_log_term: None, + last_log_idx: None, + last_log_term: None, + last_committed_log_idx: None, + leader_committed_log_idx: None, + target_committed_log_idx: None, + last_snapshot_idx: None, + } + } + pub fn parse(data: &[u8]) -> Result { let binding = String::from_utf8_lossy(data); let lines = binding.lines(); - let mut lgif: HashMap = HashMap::new(); + let mut lgif: HashMap = HashMap::new(); for line in lines { let line = line.trim(); @@ -280,7 +280,7 @@ impl Lgif { } let key = l[0].to_string(); - let value = match u64::from_str(l[1]) { + let value = match u128::from_str(l[1]) { Ok(v) => v, // TODO: Log error Err(_) => continue, @@ -290,37 +290,19 @@ impl Lgif { } }; - let mut parsed_data = Self { - first_log_idx: None, - first_log_term: None, - last_log_idx: None, - last_log_term: None, - last_committed_log_idx: None, - leader_committed_log_idx: None, - target_committed_log_idx: None, - last_snapshot_idx: None, - }; - + let mut parsed_data = Lgif::new(); for (key, value) in lgif { - if Lgif::field_exists(&key) { - let _ = parsed_data.set_field_value(&key, Some(value)); - } else { - continue; + match parsed_data.set_field_value(&key, Some(value)) { + Ok(()) => (), + Err(e) => { + // TODO: Log the error + println!("{e}"); + (); + } }; } - Ok(parsed_data) - // Ok(Self { - // first_log_idx: lgif.remove("first_log_idx"), - // first_log_term: lgif.remove("first_log_term"), - // last_log_idx: lgif.remove("last_log_idx"), - // last_log_term: lgif.remove("last_log_term"), - // last_committed_log_idx: lgif.remove("last_committed_log_idx"), - // leader_committed_log_idx: lgif.remove("leader_committed_log_idx"), - // target_committed_log_idx: lgif.remove("target_committed_log_idx"), - // last_snapshot_idx: lgif.remove("last_snapshot_idx"), - // }) } } From 83c786944812bf52417e7f9065cd8ecf7193b6b5 Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 11 Sep 2024 20:09:41 +1200 Subject: [PATCH 08/20] fmt --- clickhouse-admin/types/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 323cfcb6d6..3357a69cd0 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -288,7 +288,7 @@ impl Lgif { lgif.insert(key, value); } - }; + } let mut parsed_data = Lgif::new(); for (key, value) in lgif { @@ -297,7 +297,6 @@ impl Lgif { Err(e) => { // TODO: Log the error println!("{e}"); - (); } }; } From 481aad81bd52d9df440b16f30c05ee9937c60009 Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 11 Sep 2024 20:11:04 +1200 Subject: [PATCH 09/20] openapi spec --- openapi/clickhouse-admin.json | 36 +++++++++++++++++------------------ 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index 0c89cca1e2..b93e62832d 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -382,56 +382,54 @@ "type": "object", "properties": { "first_log_idx": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 }, "first_log_term": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 }, "last_committed_log_idx": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 }, "last_log_idx": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 }, "last_log_term": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 }, "last_snapshot_idx": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 }, "leader_committed_log_idx": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 }, "target_committed_log_idx": { + "nullable": true, "type": "integer", - "format": "uint64", + "format": "uint128", "minimum": 0 } - }, - "required": [ - "first_log_idx", - "first_log_term", - "last_committed_log_idx", - "last_log_idx", - "last_log_term", - "last_snapshot_idx", - "leader_committed_log_idx", - "target_committed_log_idx" - ] + } }, "LogConfig": { "type": "object", From 0866bbe4bf0fbfcdbe53fdad111eb4fd4d66eab4 Mon Sep 17 00:00:00 2001 From: karencfv Date: Thu, 12 Sep 2024 10:57:46 +1200 Subject: [PATCH 10/20] Add some tests --- clickhouse-admin/types/src/lib.rs | 77 ++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 3357a69cd0..7350bd333f 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -316,8 +316,8 @@ mod tests { use camino_tempfile::Builder; use crate::{ - ClickhouseHost, KeeperId, KeeperSettings, RaftServerSettings, ServerId, - ServerSettings, + ClickhouseHost, KeeperId, KeeperSettings, Lgif, RaftServerSettings, + ServerId, ServerSettings, }; #[test] @@ -409,4 +409,77 @@ mod tests { expectorate::assert_contents(expected_file, &generated_content); } + + #[test] + fn test_full_lgif_parse_success() { + let data = + "first_log_idx\t1\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 + \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let lgif = Lgif::parse(data).unwrap(); + + assert!(lgif.first_log_idx == Some(1)); + assert!(lgif.first_log_term == Some(1)); + assert!(lgif.last_log_idx == Some(4386)); + assert!(lgif.last_log_term == Some(1)); + assert!(lgif.last_committed_log_idx == Some(4386)); + assert!(lgif.leader_committed_log_idx == Some(4386)); + assert!(lgif.target_committed_log_idx == Some(4386)); + assert!(lgif.last_snapshot_idx == Some(0)); + } + + #[test] + fn test_partial_lgif_parse_success() { + let data = + "first_log_idx\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 + \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let lgif = Lgif::parse(data).unwrap(); + + assert!(lgif.first_log_idx == Some(1)); + assert!(lgif.first_log_term == None); + assert!(lgif.last_log_idx == Some(4386)); + assert!(lgif.last_log_term == Some(1)); + assert!(lgif.last_committed_log_idx == Some(4386)); + assert!(lgif.leader_committed_log_idx == Some(4386)); + assert!(lgif.target_committed_log_idx == Some(4386)); + assert!(lgif.last_snapshot_idx == Some(0)); + } + + #[test] + fn test_empty_value_lgif_parse_success() { + let data = + "first_log_idx\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 + \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let lgif = Lgif::parse(data).unwrap(); + + assert!(lgif.first_log_idx == None); + assert!(lgif.first_log_term == Some(1)); + assert!(lgif.last_log_idx == Some(4386)); + assert!(lgif.last_log_term == Some(1)); + assert!(lgif.last_committed_log_idx == Some(4386)); + assert!(lgif.leader_committed_log_idx == Some(4386)); + assert!(lgif.target_committed_log_idx == Some(4386)); + assert!(lgif.last_snapshot_idx == Some(0)); + } + + #[test] + fn test_nonsense_lgif_parse_success() { + let data = + "Mmmbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\tdu + \nYeah, yeah\nMmmbop, ba duba dop ba\nDu bop, ba du dop ba\nDu bop, ba du dop ba du + \nYeah, yeah" + .as_bytes(); + let lgif = Lgif::parse(data).unwrap(); + + assert!(lgif.first_log_idx == None); + assert!(lgif.first_log_term == None); + assert!(lgif.last_log_idx == None); + assert!(lgif.last_log_term == None); + assert!(lgif.last_committed_log_idx == None); + assert!(lgif.leader_committed_log_idx == None); + assert!(lgif.target_committed_log_idx == None); + assert!(lgif.last_snapshot_idx == None); + } } From 6c442d05b77c9dc65cad874273c1e220d9c622b5 Mon Sep 17 00:00:00 2001 From: karencfv Date: Thu, 12 Sep 2024 11:34:09 +1200 Subject: [PATCH 11/20] Add some docs --- clickhouse-admin/types/src/lib.rs | 41 ++++++++++++++++++++++++++++--- openapi/clickhouse-admin.json | 9 +++++++ 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 7350bd333f..16fb41aeba 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -210,18 +210,26 @@ impl KeeperSettings { macro_rules! define_struct_and_set_values { ( + #[$doc_struct:meta] struct $name:ident { - $($field_name:ident: $field_type:ty),* $(,)? + $( + #[$doc_field:meta] + $field_name:ident: $field_type:ty + ),* $(,)? } ) => { #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] #[serde(rename_all = "snake_case")] + #[$doc_struct] pub struct $name { - $($field_name: $field_type),* + $( + #[$doc_field] + $field_name: $field_type + ),* } impl $name { - // Check if a field name matches a given string, and set its value + // Check if a field name matches a given key and set its value pub fn set_field_value(&mut self, key: &str, value: Option) -> Result<()> { match key { $( @@ -238,14 +246,23 @@ macro_rules! define_struct_and_set_values { } define_struct_and_set_values! { + /// Logically grouped information file from a keeper node struct Lgif { + /// Index of the first log entry in the current log segment first_log_idx: Option, + /// Term of the leader when the first log entry was created first_log_term: Option, + /// Index of the last log entry in the current log segment last_log_idx: Option, + /// Term of the leader when the last log entry was created last_log_term: Option, + /// Index of the last committed log entry last_committed_log_idx: Option, + /// Index of the last committed log entry from the leader's perspective leader_committed_log_idx: Option, + /// Target index for log commitment during replication or recovery target_committed_log_idx: Option, + /// Index of the most recent snapshot taken last_snapshot_idx: Option, } } @@ -464,6 +481,24 @@ mod tests { assert!(lgif.last_snapshot_idx == Some(0)); } + #[test] + fn test_non_u128_value_lgif_parse_success() { + let data = + "first_log_idx\t1\nfirst_log_term\tBOB\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 + \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let lgif = Lgif::parse(data).unwrap(); + + assert!(lgif.first_log_idx == Some(1)); + assert!(lgif.first_log_term == None); + assert!(lgif.last_log_idx == Some(4386)); + assert!(lgif.last_log_term == Some(1)); + assert!(lgif.last_committed_log_idx == Some(4386)); + assert!(lgif.leader_committed_log_idx == Some(4386)); + assert!(lgif.target_committed_log_idx == Some(4386)); + assert!(lgif.last_snapshot_idx == Some(0)); + } + #[test] fn test_nonsense_lgif_parse_success() { let data = diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index b93e62832d..2c10955e05 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -379,52 +379,61 @@ ] }, "Lgif": { + "description": "Logically grouped information file from a keeper node", "type": "object", "properties": { "first_log_idx": { "nullable": true, + "description": "Index of the first log entry in the current log segment", "type": "integer", "format": "uint128", "minimum": 0 }, "first_log_term": { "nullable": true, + "description": "Term of the leader when the first log entry was created", "type": "integer", "format": "uint128", "minimum": 0 }, "last_committed_log_idx": { "nullable": true, + "description": "Index of the last committed log entry", "type": "integer", "format": "uint128", "minimum": 0 }, "last_log_idx": { "nullable": true, + "description": "Index of the last log entry in the current log segment", "type": "integer", "format": "uint128", "minimum": 0 }, "last_log_term": { "nullable": true, + "description": "Term of the leader when the last log entry was created", "type": "integer", "format": "uint128", "minimum": 0 }, "last_snapshot_idx": { "nullable": true, + "description": "Index of the most recent snapshot taken", "type": "integer", "format": "uint128", "minimum": 0 }, "leader_committed_log_idx": { "nullable": true, + "description": "Index of the last committed log entry from the leader's perspective", "type": "integer", "format": "uint128", "minimum": 0 }, "target_committed_log_idx": { "nullable": true, + "description": "Target index for log commitment during replication or recovery", "type": "integer", "format": "uint128", "minimum": 0 From c8a1f8999728667cb143b455a3264612196bedd3 Mon Sep 17 00:00:00 2001 From: karencfv Date: Thu, 12 Sep 2024 12:55:55 +1200 Subject: [PATCH 12/20] Add some logging --- Cargo.lock | 3 + clickhouse-admin/src/bin/clickhouse-admin.rs | 5 +- clickhouse-admin/src/clickhouse_cli.rs | 15 +++- clickhouse-admin/src/lib.rs | 5 +- clickhouse-admin/types/Cargo.toml | 5 ++ clickhouse-admin/types/src/lib.rs | 75 ++++++++++++++++---- 6 files changed, 89 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2844ad1cf..b0f7b6cb33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1155,6 +1155,9 @@ dependencies = [ "schemars", "serde", "serde_json", + "slog", + "slog-async", + "slog-term", ] [[package]] diff --git a/clickhouse-admin/src/bin/clickhouse-admin.rs b/clickhouse-admin/src/bin/clickhouse-admin.rs index b2e451488c..3391a3459a 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin.rs @@ -54,11 +54,12 @@ async fn main_impl() -> Result<(), CmdError> { .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); let clickward = Clickward::new(); - let keeper_client = ClickhouseCli::new(binary_path, listen_address); + let clickhouse_cli = + ClickhouseCli::new(binary_path, listen_address); let server = omicron_clickhouse_admin::start_server( clickward, - keeper_client, + clickhouse_cli, config, ) .await diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index 507f603e07..599ae34403 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -7,6 +7,7 @@ use camino::Utf8PathBuf; use clickhouse_admin_types::Lgif; use dropshot::HttpError; use illumos_utils::{output_to_exec_error, ExecutionError}; +use slog::Logger; use slog_error_chain::{InlineErrorChain, SlogInlineError}; use std::ffi::OsStr; use std::io; @@ -59,11 +60,17 @@ pub struct ClickhouseCli { pub binary_path: Utf8PathBuf, /// Address on where the clickhouse keeper is listening on pub listen_address: SocketAddrV6, + pub log: Option, } impl ClickhouseCli { pub fn new(binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self { - Self { binary_path, listen_address } + Self { binary_path, listen_address, log: None } + } + + pub fn with_log(mut self, log: Logger) -> Self { + self.log = Some(log); + self } pub async fn lgif(&self) -> Result { @@ -71,6 +78,7 @@ impl ClickhouseCli { ["lgif"].into_iter(), "Retrieve logically grouped information file", Lgif::parse, + self.log.clone().unwrap(), ) .await } @@ -80,10 +88,11 @@ impl ClickhouseCli { subcommand_args: I, subcommand_description: &'static str, parse: F, + log: Logger, ) -> Result where I: Iterator, - F: FnOnce(&[u8]) -> Result, + F: FnOnce(&Logger, &[u8]) -> Result, { let mut command = Command::new(&self.binary_path); command @@ -116,7 +125,7 @@ impl ClickhouseCli { return Err(output_to_exec_error(command.as_std(), &output).into()); } - parse(&output.stdout).map_err(|err| ClickhouseCliError::Parse { + parse(&log, &output.stdout).map_err(|err| ClickhouseCliError::Parse { description: subcommand_description, stdout: String::from_utf8_lossy(&output.stdout).to_string(), stderr: String::from_utf8_lossy(&output.stdout).to_string(), diff --git a/clickhouse-admin/src/lib.rs b/clickhouse-admin/src/lib.rs index 6539fe2105..511a32dd50 100644 --- a/clickhouse-admin/src/lib.rs +++ b/clickhouse-admin/src/lib.rs @@ -36,7 +36,7 @@ pub type Server = dropshot::HttpServer>; /// Start the dropshot server pub async fn start_server( clickward: Clickward, - keeper_client: ClickhouseCli, + clickhouse_cli: ClickhouseCli, server_config: Config, ) -> Result { let (drain, registration) = slog_dtrace::with_drain( @@ -59,7 +59,8 @@ pub async fn start_server( let context = ServerContext::new( clickward, - keeper_client, + clickhouse_cli + .with_log(log.new(slog::o!("component" => "ClickhouseCli"))), log.new(slog::o!("component" => "ServerContext")), ); let http_server_starter = dropshot::HttpServerStarter::new( diff --git a/clickhouse-admin/types/Cargo.toml b/clickhouse-admin/types/Cargo.toml index 68004303e0..5b2f532e74 100644 --- a/clickhouse-admin/types/Cargo.toml +++ b/clickhouse-admin/types/Cargo.toml @@ -18,4 +18,9 @@ omicron-workspace-hack.workspace = true schemars.workspace = true serde.workspace = true serde_json.workspace = true +slog.workspace = true expectorate.workspace = true + +[dev-dependencies] +slog-async.workspace = true +slog-term.workspace = true \ No newline at end of file diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 16fb41aeba..4a909866f9 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -8,6 +8,7 @@ use camino::Utf8PathBuf; use derive_more::{Add, AddAssign, Display, From}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use slog::{error, info, Logger}; use std::collections::HashMap; use std::fs::create_dir; use std::io::{ErrorKind, Write}; @@ -281,7 +282,7 @@ impl Lgif { } } - pub fn parse(data: &[u8]) -> Result { + pub fn parse(log: &Logger, data: &[u8]) -> Result { let binding = String::from_utf8_lossy(data); let lines = binding.lines(); let mut lgif: HashMap = HashMap::new(); @@ -292,15 +293,27 @@ impl Lgif { let l: Vec<&str> = line.split('\t').collect(); if l.len() != 2 { - // TODO: Log that there was an empty line + info!( + log, + "Command output has a line that does not contain two items"; + "output line" => ?l, + ); continue; } let key = l[0].to_string(); - let value = match u128::from_str(l[1]) { + let raw_value = l[1]; + let value = match u128::from_str(raw_value) { Ok(v) => v, - // TODO: Log error - Err(_) => continue, + Err(e) => { + error!( + log, + "Unable to convert value into u128"; + "value" => ?raw_value, + "error" => ?e, + ); + continue; + } }; lgif.insert(key, value); @@ -312,8 +325,13 @@ impl Lgif { match parsed_data.set_field_value(&key, Some(value)) { Ok(()) => (), Err(e) => { - // TODO: Log the error - println!("{e}"); + error!( + log, + "Unable to set Lgif struct field with key value pair"; + "key" => ?key, + "value" => ?value, + "error" => ?e, + ); } }; } @@ -331,12 +349,21 @@ mod tests { use camino::Utf8PathBuf; use camino_tempfile::Builder; + use slog::{o, Drain}; + use slog_term::{FullFormat, PlainDecorator, TestStdoutWriter}; use crate::{ ClickhouseHost, KeeperId, KeeperSettings, Lgif, RaftServerSettings, ServerId, ServerSettings, }; + fn log() -> slog::Logger { + let decorator = PlainDecorator::new(TestStdoutWriter); + let drain = FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + slog::Logger::root(drain, o!()) + } + #[test] fn test_generate_keeper_config() { let config_dir = Builder::new() @@ -429,11 +456,12 @@ mod tests { #[test] fn test_full_lgif_parse_success() { + let log = log(); let data = "first_log_idx\t1\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(data).unwrap(); + let lgif = Lgif::parse(&log, data).unwrap(); assert!(lgif.first_log_idx == Some(1)); assert!(lgif.first_log_term == Some(1)); @@ -447,11 +475,12 @@ mod tests { #[test] fn test_partial_lgif_parse_success() { + let log = log(); let data = "first_log_idx\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(data).unwrap(); + let lgif = Lgif::parse(&log, data).unwrap(); assert!(lgif.first_log_idx == Some(1)); assert!(lgif.first_log_term == None); @@ -465,11 +494,12 @@ mod tests { #[test] fn test_empty_value_lgif_parse_success() { + let log = log(); let data = "first_log_idx\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(data).unwrap(); + let lgif = Lgif::parse(&log, data).unwrap(); assert!(lgif.first_log_idx == None); assert!(lgif.first_log_term == Some(1)); @@ -483,11 +513,31 @@ mod tests { #[test] fn test_non_u128_value_lgif_parse_success() { + let log = log(); let data = "first_log_idx\t1\nfirst_log_term\tBOB\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(data).unwrap(); + let lgif = Lgif::parse(&log, data).unwrap(); + + assert!(lgif.first_log_idx == Some(1)); + assert!(lgif.first_log_term == None); + assert!(lgif.last_log_idx == Some(4386)); + assert!(lgif.last_log_term == Some(1)); + assert!(lgif.last_committed_log_idx == Some(4386)); + assert!(lgif.leader_committed_log_idx == Some(4386)); + assert!(lgif.target_committed_log_idx == Some(4386)); + assert!(lgif.last_snapshot_idx == Some(0)); + } + + #[test] + fn test_non_existent_key_with_correct_value_lgif_parse_success() { + let log = log(); + let data = + "first_log_idx\t1\nfirst_log\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 + \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + .as_bytes(); + let lgif = Lgif::parse(&log, data).unwrap(); assert!(lgif.first_log_idx == Some(1)); assert!(lgif.first_log_term == None); @@ -501,12 +551,13 @@ mod tests { #[test] fn test_nonsense_lgif_parse_success() { + let log = log(); let data = "Mmmbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\tdu \nYeah, yeah\nMmmbop, ba duba dop ba\nDu bop, ba du dop ba\nDu bop, ba du dop ba du \nYeah, yeah" .as_bytes(); - let lgif = Lgif::parse(data).unwrap(); + let lgif = Lgif::parse(&log, data).unwrap(); assert!(lgif.first_log_idx == None); assert!(lgif.first_log_term == None); From 380139faf1c81c24aaa17e1832cf30fcc559345b Mon Sep 17 00:00:00 2001 From: karencfv Date: Thu, 12 Sep 2024 14:35:07 +1200 Subject: [PATCH 13/20] Clean up and more tests --- clickhouse-admin/src/clickhouse_cli.rs | 1 - clickhouse-admin/types/src/lib.rs | 67 +++++++++++++++++++++++--- 2 files changed, 59 insertions(+), 9 deletions(-) diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index 599ae34403..24d89a1424 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -39,7 +39,6 @@ impl From for HttpError { fn from(err: ClickhouseCliError) -> Self { match err { ClickhouseCliError::Run { .. } - // TODO: Can I make this message better? | ClickhouseCliError::Parse { .. } | ClickhouseCliError::ExecutionError(_) => { let message = InlineErrorChain::new(&err).to_string(); diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 4a909866f9..a511fbda40 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -242,6 +242,13 @@ macro_rules! define_struct_and_set_values { _ => bail!("Field name '{}' not found.", key), } } + + fn are_all_fields_none(&self) -> bool { + $( + self.$field_name.is_none() && + )* + true + } } }; } @@ -283,6 +290,35 @@ impl Lgif { } pub fn parse(log: &Logger, data: &[u8]) -> Result { + // The reponse we get from running `clickhouse keeper-client -h {HOST} --q lgif` + // isn't in any known format (e.g. JSON), but rather a series of lines with key-value + // pairs separated by a tab: + // + // ```console + // $ clickhouse keeper-client -h localhost -p 20001 --q lgif + // first_log_idx 1 + // first_log_term 1 + // last_log_idx 10889 + // last_log_term 20 + // last_committed_log_idx 10889 + // leader_committed_log_idx 10889 + // target_committed_log_idx 10889 + // last_snapshot_idx 9465 + // ``` + // + // To parse the data we follow these steps: + // + // 1. Create an iterator over the lines of a string. These are split at newlines. + // 2. Each line is split by the tab, and we make sure that the key and value are + // valid. If a value is not valid, we ignore it and mode on. We want to keep + // other key-value pairs if they are valid. + // 3. Once we have a HashMap of valid key-value pairs, we set the fields of the + // Lgif struct with the retrieved data. To do this we make sure that the name + // of each HasMap key matches one of the field names and then we set the + // corresponding value. If a key does not match any of the struct's fields we + // log an error, but continue populating all valid key-value pairs. + // 4. We return an error only if the response had no valid key-value pairs. + // let binding = String::from_utf8_lossy(data); let lines = binding.lines(); let mut lgif: HashMap = HashMap::new(); @@ -336,21 +372,22 @@ impl Lgif { }; } + if parsed_data.are_all_fields_none() { + bail!("Unable to parse `clickhouse keeper-client -q lgif` response: {}", binding); + } + Ok(parsed_data) } } #[cfg(test)] mod tests { - use std::{ - net::{Ipv4Addr, Ipv6Addr}, - str::FromStr, - }; - use camino::Utf8PathBuf; use camino_tempfile::Builder; use slog::{o, Drain}; use slog_term::{FullFormat, PlainDecorator, TestStdoutWriter}; + use std::net::{Ipv4Addr, Ipv6Addr}; + use std::str::FromStr; use crate::{ ClickhouseHost, KeeperId, KeeperSettings, Lgif, RaftServerSettings, @@ -554,8 +591,7 @@ mod tests { let log = log(); let data = "Mmmbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\tdu - \nYeah, yeah\nMmmbop, ba duba dop ba\nDu bop, ba du dop ba\nDu bop, ba du dop ba du - \nYeah, yeah" + \ntarget_committed_log_idx\t4386\n\n" .as_bytes(); let lgif = Lgif::parse(&log, data).unwrap(); @@ -565,7 +601,22 @@ mod tests { assert!(lgif.last_log_term == None); assert!(lgif.last_committed_log_idx == None); assert!(lgif.leader_committed_log_idx == None); - assert!(lgif.target_committed_log_idx == None); + assert!(lgif.target_committed_log_idx == Some(4386)); assert!(lgif.last_snapshot_idx == None); } + + #[test] + fn test_all_nonsense_lgif_parse_fail() { + let log = log(); + let data = "Mmmbop, ba duba dop ba\nDu bop, ba du dop ba\nYeah, yeah" + .as_bytes(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Unable to parse `clickhouse keeper-client -q lgif` response: Mmmbop, ba duba dop ba\nDu bop, ba du dop ba\nYeah, yeah", + ); + } } From 3b5bb4539513d7bc3b06c727ba80bde511224d90 Mon Sep 17 00:00:00 2001 From: karencfv Date: Fri, 13 Sep 2024 12:19:07 +1200 Subject: [PATCH 14/20] address comments --- clickhouse-admin/src/clickhouse_cli.rs | 2 +- clickhouse-admin/types/src/lib.rs | 30 +++++++++++++------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index 24d89a1424..f7376439f7 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -110,7 +110,7 @@ impl ClickhouseCli { let args: Vec<&OsStr> = command.as_std().get_args().collect(); let args_parsed: Vec = args .iter() - .map(|&os_str| os_str.to_string_lossy().into_owned()) + .map(|&os_str| os_str.to_str().unwrap().to_owned()) .collect(); let args_str = args_parsed.join(" "); ClickhouseCliError::Run { diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index a511fbda40..570fff0e16 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -231,7 +231,7 @@ macro_rules! define_struct_and_set_values { impl $name { // Check if a field name matches a given key and set its value - pub fn set_field_value(&mut self, key: &str, value: Option) -> Result<()> { + pub fn set_field_value(&mut self, key: &str, value: Option) -> Result<()> { match key { $( stringify!($field_name) => { @@ -257,21 +257,21 @@ define_struct_and_set_values! { /// Logically grouped information file from a keeper node struct Lgif { /// Index of the first log entry in the current log segment - first_log_idx: Option, + first_log_idx: Option, /// Term of the leader when the first log entry was created - first_log_term: Option, + first_log_term: Option, /// Index of the last log entry in the current log segment - last_log_idx: Option, + last_log_idx: Option, /// Term of the leader when the last log entry was created - last_log_term: Option, + last_log_term: Option, /// Index of the last committed log entry - last_committed_log_idx: Option, + last_committed_log_idx: Option, /// Index of the last committed log entry from the leader's perspective - leader_committed_log_idx: Option, + leader_committed_log_idx: Option, /// Target index for log commitment during replication or recovery - target_committed_log_idx: Option, + target_committed_log_idx: Option, /// Index of the most recent snapshot taken - last_snapshot_idx: Option, + last_snapshot_idx: Option, } } @@ -310,18 +310,18 @@ impl Lgif { // // 1. Create an iterator over the lines of a string. These are split at newlines. // 2. Each line is split by the tab, and we make sure that the key and value are - // valid. If a value is not valid, we ignore it and mode on. We want to keep + // valid. If a value is not valid, we ignore it and move on. We want to keep // other key-value pairs if they are valid. // 3. Once we have a HashMap of valid key-value pairs, we set the fields of the // Lgif struct with the retrieved data. To do this we make sure that the name - // of each HasMap key matches one of the field names and then we set the + // of each HashMap key matches one of the field names and then we set the // corresponding value. If a key does not match any of the struct's fields we // log an error, but continue populating all valid key-value pairs. // 4. We return an error only if the response had no valid key-value pairs. // let binding = String::from_utf8_lossy(data); let lines = binding.lines(); - let mut lgif: HashMap = HashMap::new(); + let mut lgif: HashMap = HashMap::new(); for line in lines { let line = line.trim(); @@ -339,12 +339,12 @@ impl Lgif { let key = l[0].to_string(); let raw_value = l[1]; - let value = match u128::from_str(raw_value) { + let value = match u64::from_str(raw_value) { Ok(v) => v, Err(e) => { error!( log, - "Unable to convert value into u128"; + "Unable to convert value into u64"; "value" => ?raw_value, "error" => ?e, ); @@ -549,7 +549,7 @@ mod tests { } #[test] - fn test_non_u128_value_lgif_parse_success() { + fn test_non_u64_value_lgif_parse_success() { let log = log(); let data = "first_log_idx\t1\nfirst_log_term\tBOB\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 From 7a6619b32c57f68beb59c4782631b1a2fc04a6d7 Mon Sep 17 00:00:00 2001 From: karencfv Date: Fri, 13 Sep 2024 12:33:00 +1200 Subject: [PATCH 15/20] clean up --- clickhouse-admin/src/clickhouse_cli.rs | 14 +++++--------- openapi/clickhouse-admin.json | 16 ++++++++-------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index f7376439f7..c387502240 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -74,7 +74,7 @@ impl ClickhouseCli { pub async fn lgif(&self) -> Result { self.keeper_client_non_interactive( - ["lgif"].into_iter(), + "lgif", "Retrieve logically grouped information file", Lgif::parse, self.log.clone().unwrap(), @@ -82,15 +82,14 @@ impl ClickhouseCli { .await } - async fn keeper_client_non_interactive<'a, I, F, T>( + async fn keeper_client_non_interactive<'a, F, T>( &self, - subcommand_args: I, + query: &str, subcommand_description: &'static str, parse: F, log: Logger, ) -> Result where - I: Iterator, F: FnOnce(&Logger, &[u8]) -> Result, { let mut command = Command::new(&self.binary_path); @@ -100,11 +99,8 @@ impl ClickhouseCli { .arg(&format!("[{}]", self.listen_address.ip())) .arg("--port") .arg(&format!("{}", self.listen_address.port())) - .arg("--query"); - - let args: Vec<&'a str> = subcommand_args.collect(); - let query = args.join(" "); - command.arg(query); + .arg("--query") + .arg(query); let output = command.output().await.map_err(|err| { let args: Vec<&OsStr> = command.as_std().get_args().collect(); diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index 2c10955e05..bee324a6d9 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -386,56 +386,56 @@ "nullable": true, "description": "Index of the first log entry in the current log segment", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 }, "first_log_term": { "nullable": true, "description": "Term of the leader when the first log entry was created", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 }, "last_committed_log_idx": { "nullable": true, "description": "Index of the last committed log entry", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 }, "last_log_idx": { "nullable": true, "description": "Index of the last log entry in the current log segment", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 }, "last_log_term": { "nullable": true, "description": "Term of the leader when the last log entry was created", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 }, "last_snapshot_idx": { "nullable": true, "description": "Index of the most recent snapshot taken", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 }, "leader_committed_log_idx": { "nullable": true, "description": "Index of the last committed log entry from the leader's perspective", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 }, "target_committed_log_idx": { "nullable": true, "description": "Target index for log commitment during replication or recovery", "type": "integer", - "format": "uint128", + "format": "uint64", "minimum": 0 } } From 8d1104df7004b241bd05abb5792a7e7fe417ea88 Mon Sep 17 00:00:00 2001 From: karencfv Date: Tue, 17 Sep 2024 18:00:01 +1200 Subject: [PATCH 16/20] address comments --- clickhouse-admin/src/clickhouse_cli.rs | 12 +- clickhouse-admin/types/src/lib.rs | 228 ++++++++++++------------- 2 files changed, 111 insertions(+), 129 deletions(-) diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index c387502240..edaa2c3488 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -82,7 +82,7 @@ impl ClickhouseCli { .await } - async fn keeper_client_non_interactive<'a, F, T>( + async fn keeper_client_non_interactive( &self, query: &str, subcommand_description: &'static str, @@ -103,15 +103,15 @@ impl ClickhouseCli { .arg(query); let output = command.output().await.map_err(|err| { - let args: Vec<&OsStr> = command.as_std().get_args().collect(); - let args_parsed: Vec = args + let err_args: Vec<&OsStr> = command.as_std().get_args().collect(); + let err_args_parsed: Vec = err_args .iter() - .map(|&os_str| os_str.to_str().unwrap().to_owned()) + .map(|&os_str| os_str.to_string_lossy().into_owned()) .collect(); - let args_str = args_parsed.join(" "); + let err_args_str = err_args_parsed.join(" "); ClickhouseCliError::Run { description: subcommand_description, - subcommand: args_str, + subcommand: err_args_str, err, } })?; diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 570fff0e16..cf280af7f9 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -8,7 +8,7 @@ use camino::Utf8PathBuf; use derive_more::{Add, AddAssign, Display, From}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use slog::{error, info, Logger}; +use slog::{info, Logger}; use std::collections::HashMap; use std::fs::create_dir; use std::io::{ErrorKind, Write}; @@ -230,8 +230,10 @@ macro_rules! define_struct_and_set_values { } impl $name { - // Check if a field name matches a given key and set its value - pub fn set_field_value(&mut self, key: &str, value: Option) -> Result<()> { + // Check if a field name matches a given key and set its value. + // Since the struct will be serialised into JSON for the reponse. + // we want to make sure the keys are exactly the same. + pub fn set_field_value(&mut self, key: &str, value: u64) -> Result<()> { match key { $( stringify!($field_name) => { @@ -239,16 +241,9 @@ macro_rules! define_struct_and_set_values { Ok(()) }, )* - _ => bail!("Field name '{}' not found.", key), + _ => bail!("Key '{}' is not part of the expected keys.", key), } } - - fn are_all_fields_none(&self) -> bool { - $( - self.$field_name.is_none() && - )* - true - } } }; } @@ -257,35 +252,35 @@ define_struct_and_set_values! { /// Logically grouped information file from a keeper node struct Lgif { /// Index of the first log entry in the current log segment - first_log_idx: Option, + first_log_idx: u64, /// Term of the leader when the first log entry was created - first_log_term: Option, + first_log_term: u64, /// Index of the last log entry in the current log segment - last_log_idx: Option, + last_log_idx: u64, /// Term of the leader when the last log entry was created - last_log_term: Option, + last_log_term: u64, /// Index of the last committed log entry - last_committed_log_idx: Option, + last_committed_log_idx: u64, /// Index of the last committed log entry from the leader's perspective - leader_committed_log_idx: Option, + leader_committed_log_idx: u64, /// Target index for log commitment during replication or recovery - target_committed_log_idx: Option, + target_committed_log_idx: u64, /// Index of the most recent snapshot taken - last_snapshot_idx: Option, + last_snapshot_idx: u64, } } impl Lgif { pub fn new() -> Self { Self { - first_log_idx: None, - first_log_term: None, - last_log_idx: None, - last_log_term: None, - last_committed_log_idx: None, - leader_committed_log_idx: None, - target_committed_log_idx: None, - last_snapshot_idx: None, + first_log_idx: 0, + first_log_term: 0, + last_log_idx: 0, + last_log_term: 0, + last_committed_log_idx: 0, + leader_committed_log_idx: 0, + target_committed_log_idx: 0, + last_snapshot_idx: 0, } } @@ -310,16 +305,17 @@ impl Lgif { // // 1. Create an iterator over the lines of a string. These are split at newlines. // 2. Each line is split by the tab, and we make sure that the key and value are - // valid. If a value is not valid, we ignore it and move on. We want to keep - // other key-value pairs if they are valid. + // valid. // 3. Once we have a HashMap of valid key-value pairs, we set the fields of the // Lgif struct with the retrieved data. To do this we make sure that the name // of each HashMap key matches one of the field names and then we set the - // corresponding value. If a key does not match any of the struct's fields we - // log an error, but continue populating all valid key-value pairs. - // 4. We return an error only if the response had no valid key-value pairs. - // + // corresponding value. let binding = String::from_utf8_lossy(data); + info!( + log, + "Retrieved data from `clickhouse keeper-config lgif`"; + "output" => ?binding + ); let lines = binding.lines(); let mut lgif: HashMap = HashMap::new(); @@ -329,12 +325,9 @@ impl Lgif { let l: Vec<&str> = line.split('\t').collect(); if l.len() != 2 { - info!( - log, - "Command output has a line that does not contain two items"; - "output line" => ?l, + bail!( + "Command output has a line that does not contain a key-value pair: {l:?}" ); - continue; } let key = l[0].to_string(); @@ -342,13 +335,9 @@ impl Lgif { let value = match u64::from_str(raw_value) { Ok(v) => v, Err(e) => { - error!( - log, - "Unable to convert value into u64"; - "value" => ?raw_value, - "error" => ?e, + bail!( + "Unable to convert value {raw_value:?} into u64 for key {key}: {e}" ); - continue; } }; @@ -356,28 +345,44 @@ impl Lgif { } } + let mut keys: Vec = lgif.keys().cloned().collect(); + let mut expected_keys = Lgif::expected_keys(); + keys.sort(); + expected_keys.sort(); + if !keys.eq(&expected_keys) { + bail!( + "Command output contains different keys to those expected. \ + Keys: {keys:?} Expected keys: {expected_keys:?}" + ); + } + let mut parsed_data = Lgif::new(); for (key, value) in lgif { - match parsed_data.set_field_value(&key, Some(value)) { + match parsed_data.set_field_value(&key, value) { Ok(()) => (), Err(e) => { - error!( - log, - "Unable to set Lgif struct field with key value pair"; - "key" => ?key, - "value" => ?value, - "error" => ?e, + bail!( + "Unable to set Lgif struct field with key value pair: {e}" ); } }; } - if parsed_data.are_all_fields_none() { - bail!("Unable to parse `clickhouse keeper-client -q lgif` response: {}", binding); - } - Ok(parsed_data) } + + fn expected_keys() -> Vec { + vec![ + "first_log_idx".to_string(), + "first_log_term".to_string(), + "last_log_idx".to_string(), + "last_log_term".to_string(), + "last_committed_log_idx".to_string(), + "leader_committed_log_idx".to_string(), + "target_committed_log_idx".to_string(), + "last_snapshot_idx".to_string(), + ] + } } #[cfg(test)] @@ -500,109 +505,86 @@ mod tests { .as_bytes(); let lgif = Lgif::parse(&log, data).unwrap(); - assert!(lgif.first_log_idx == Some(1)); - assert!(lgif.first_log_term == Some(1)); - assert!(lgif.last_log_idx == Some(4386)); - assert!(lgif.last_log_term == Some(1)); - assert!(lgif.last_committed_log_idx == Some(4386)); - assert!(lgif.leader_committed_log_idx == Some(4386)); - assert!(lgif.target_committed_log_idx == Some(4386)); - assert!(lgif.last_snapshot_idx == Some(0)); + assert!(lgif.first_log_idx == 1); + assert!(lgif.first_log_term == 1); + assert!(lgif.last_log_idx == 4386); + assert!(lgif.last_log_term == 1); + assert!(lgif.last_committed_log_idx == 4386); + assert!(lgif.leader_committed_log_idx == 4386); + assert!(lgif.target_committed_log_idx == 4386); + assert!(lgif.last_snapshot_idx == 0); } #[test] - fn test_partial_lgif_parse_success() { + fn test_missing_keys_lgif_parse_fail() { let log = log(); let data = "first_log_idx\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(&log, data).unwrap(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); - assert!(lgif.first_log_idx == Some(1)); - assert!(lgif.first_log_term == None); - assert!(lgif.last_log_idx == Some(4386)); - assert!(lgif.last_log_term == Some(1)); - assert!(lgif.last_committed_log_idx == Some(4386)); - assert!(lgif.leader_committed_log_idx == Some(4386)); - assert!(lgif.target_committed_log_idx == Some(4386)); - assert!(lgif.last_snapshot_idx == Some(0)); + assert_eq!( + format!("{}", root_cause), + "Command output contains different keys to those expected. \ + Keys: [\"first_log_idx\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"] \ + Expected keys: [\"first_log_idx\", \"first_log_term\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"]" + ); } #[test] - fn test_empty_value_lgif_parse_success() { + fn test_empty_value_lgif_parse_fail() { let log = log(); let data = "first_log_idx\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(&log, data).unwrap(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); - assert!(lgif.first_log_idx == None); - assert!(lgif.first_log_term == Some(1)); - assert!(lgif.last_log_idx == Some(4386)); - assert!(lgif.last_log_term == Some(1)); - assert!(lgif.last_committed_log_idx == Some(4386)); - assert!(lgif.leader_committed_log_idx == Some(4386)); - assert!(lgif.target_committed_log_idx == Some(4386)); - assert!(lgif.last_snapshot_idx == Some(0)); + assert_eq!( + format!("{}", root_cause), + "Command output has a line that does not contain a key-value pair: [\"first_log_idx\"]" + ); } #[test] - fn test_non_u64_value_lgif_parse_success() { + fn test_non_u64_value_lgif_parse_fail() { let log = log(); let data = "first_log_idx\t1\nfirst_log_term\tBOB\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(&log, data).unwrap(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); - assert!(lgif.first_log_idx == Some(1)); - assert!(lgif.first_log_term == None); - assert!(lgif.last_log_idx == Some(4386)); - assert!(lgif.last_log_term == Some(1)); - assert!(lgif.last_committed_log_idx == Some(4386)); - assert!(lgif.leader_committed_log_idx == Some(4386)); - assert!(lgif.target_committed_log_idx == Some(4386)); - assert!(lgif.last_snapshot_idx == Some(0)); + assert_eq!( + format!("{}", root_cause), + "Unable to convert value \"BOB\" into u64 for key first_log_term: invalid digit found in string" + ); } #[test] - fn test_non_existent_key_with_correct_value_lgif_parse_success() { + fn test_non_existent_key_with_correct_value_lgif_parse_fail() { let log = log(); let data = "first_log_idx\t1\nfirst_log\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); - let lgif = Lgif::parse(&log, data).unwrap(); - - assert!(lgif.first_log_idx == Some(1)); - assert!(lgif.first_log_term == None); - assert!(lgif.last_log_idx == Some(4386)); - assert!(lgif.last_log_term == Some(1)); - assert!(lgif.last_committed_log_idx == Some(4386)); - assert!(lgif.leader_committed_log_idx == Some(4386)); - assert!(lgif.target_committed_log_idx == Some(4386)); - assert!(lgif.last_snapshot_idx == Some(0)); - } - - #[test] - fn test_nonsense_lgif_parse_success() { - let log = log(); - let data = - "Mmmbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\nDu\tbop,\tba\tduba\tdop\tba\tdu - \ntarget_committed_log_idx\t4386\n\n" - .as_bytes(); - let lgif = Lgif::parse(&log, data).unwrap(); + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); - assert!(lgif.first_log_idx == None); - assert!(lgif.first_log_term == None); - assert!(lgif.last_log_idx == None); - assert!(lgif.last_log_term == None); - assert!(lgif.last_committed_log_idx == None); - assert!(lgif.leader_committed_log_idx == None); - assert!(lgif.target_committed_log_idx == Some(4386)); - assert!(lgif.last_snapshot_idx == None); + assert_eq!( + format!("{}", root_cause), + "Command output contains different keys to those expected. \ + Keys: [\"first_log\", \"first_log_idx\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"] \ + Expected keys: [\"first_log_idx\", \"first_log_term\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"]" + ); } #[test] @@ -616,7 +598,7 @@ mod tests { assert_eq!( format!("{}", root_cause), - "Unable to parse `clickhouse keeper-client -q lgif` response: Mmmbop, ba duba dop ba\nDu bop, ba du dop ba\nYeah, yeah", + "Command output has a line that does not contain a key-value pair: [\"Mmmbop, ba duba dop ba\"]", ); } } From 63719545de3cc39347eac10e2cc55dced75d7974 Mon Sep 17 00:00:00 2001 From: karencfv Date: Tue, 17 Sep 2024 18:02:32 +1200 Subject: [PATCH 17/20] generate openapi spec --- openapi/clickhouse-admin.json | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/openapi/clickhouse-admin.json b/openapi/clickhouse-admin.json index bee324a6d9..6862654124 100644 --- a/openapi/clickhouse-admin.json +++ b/openapi/clickhouse-admin.json @@ -383,62 +383,64 @@ "type": "object", "properties": { "first_log_idx": { - "nullable": true, "description": "Index of the first log entry in the current log segment", "type": "integer", "format": "uint64", "minimum": 0 }, "first_log_term": { - "nullable": true, "description": "Term of the leader when the first log entry was created", "type": "integer", "format": "uint64", "minimum": 0 }, "last_committed_log_idx": { - "nullable": true, "description": "Index of the last committed log entry", "type": "integer", "format": "uint64", "minimum": 0 }, "last_log_idx": { - "nullable": true, "description": "Index of the last log entry in the current log segment", "type": "integer", "format": "uint64", "minimum": 0 }, "last_log_term": { - "nullable": true, "description": "Term of the leader when the last log entry was created", "type": "integer", "format": "uint64", "minimum": 0 }, "last_snapshot_idx": { - "nullable": true, "description": "Index of the most recent snapshot taken", "type": "integer", "format": "uint64", "minimum": 0 }, "leader_committed_log_idx": { - "nullable": true, "description": "Index of the last committed log entry from the leader's perspective", "type": "integer", "format": "uint64", "minimum": 0 }, "target_committed_log_idx": { - "nullable": true, "description": "Target index for log commitment during replication or recovery", "type": "integer", "format": "uint64", "minimum": 0 } - } + }, + "required": [ + "first_log_idx", + "first_log_term", + "last_committed_log_idx", + "last_log_idx", + "last_log_term", + "last_snapshot_idx", + "leader_committed_log_idx", + "target_committed_log_idx" + ] }, "LogConfig": { "type": "object", From 55f59bd226b0d6b5716cd1e0f2ede28187a3b595 Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 18 Sep 2024 13:07:54 +1200 Subject: [PATCH 18/20] add integration test --- Cargo.lock | 7 +++ clickhouse-admin/Cargo.toml | 4 ++ clickhouse-admin/types/src/lib.rs | 2 +- oximeter/db/tests/integration_test.rs | 61 ++------------------------- oximeter/test-utils/Cargo.toml | 4 ++ oximeter/test-utils/src/lib.rs | 58 +++++++++++++++++++++++++ 6 files changed, 77 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b0f7b6cb33..3f5a781c2d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5929,6 +5929,7 @@ dependencies = [ "clap", "clickhouse-admin-api", "clickhouse-admin-types", + "clickward", "dropshot 0.10.2-dev", "expectorate", "http 0.2.12", @@ -5940,6 +5941,8 @@ dependencies = [ "omicron-workspace-hack", "openapi-lint", "openapiv3", + "oximeter-db", + "oximeter-test-utils", "schemars", "serde", "serde_json", @@ -7241,10 +7244,14 @@ dependencies = [ name = "oximeter-test-utils" version = "0.1.0" dependencies = [ + "anyhow", "chrono", + "clickward", + "omicron-test-utils", "omicron-workspace-hack", "oximeter-macro-impl", "oximeter-types", + "slog", "uuid", ] diff --git a/clickhouse-admin/Cargo.toml b/clickhouse-admin/Cargo.toml index 270f779d7e..84b04f6caa 100644 --- a/clickhouse-admin/Cargo.toml +++ b/clickhouse-admin/Cargo.toml @@ -30,9 +30,13 @@ toml.workspace = true omicron-workspace-hack.workspace = true [dev-dependencies] +clickward.workspace = true +dropshot.workspace = true expectorate.workspace = true nexus-test-utils.workspace = true omicron-test-utils.workspace = true +oximeter-db.workspace = true +oximeter-test-utils.workspace = true openapi-lint.workspace = true openapiv3.workspace = true serde_json.workspace = true diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index cf280af7f9..56e71d66fe 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -225,7 +225,7 @@ macro_rules! define_struct_and_set_values { pub struct $name { $( #[$doc_field] - $field_name: $field_type + pub $field_name: $field_type ),* } diff --git a/oximeter/db/tests/integration_test.rs b/oximeter/db/tests/integration_test.rs index 05f41b3872..35f96dfd50 100644 --- a/oximeter/db/tests/integration_test.rs +++ b/oximeter/db/tests/integration_test.rs @@ -3,15 +3,13 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use anyhow::Context; -use clickward::{ - BasePorts, Deployment, DeploymentConfig, KeeperClient, KeeperError, - KeeperId, -}; +use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId}; use dropshot::test_util::log_prefix_for_test; use omicron_test_utils::dev::poll; use omicron_test_utils::dev::test_setup_log; use oximeter_db::{Client, DbWrite, OxqlResult, Sample, TestDbWrite}; -use slog::{debug, info, Logger}; +use oximeter_test_utils::wait_for_keepers; +use slog::{info, Logger}; use std::collections::BTreeSet; use std::default::Default; use std::time::Duration; @@ -452,59 +450,6 @@ async fn wait_for_num_points( Ok(()) } -/// Wait for all keeper servers to be capable of handling commands -async fn wait_for_keepers( - log: &Logger, - deployment: &Deployment, - ids: Vec, -) -> anyhow::Result<()> { - let mut keepers = vec![]; - for id in &ids { - keepers.push(KeeperClient::new(deployment.keeper_addr(*id)?)); - } - - poll::wait_for_condition( - || async { - let mut done = true; - for keeper in &keepers { - match keeper.config().await { - Ok(config) => { - // The node isn't really up yet - if config.len() != keepers.len() { - done = false; - debug!(log, "Keeper config not set"; - "addr" => keeper.addr(), - "expected" => keepers.len(), - "got" => config.len() - ); - break; - } - } - Err(e) => { - done = false; - debug!(log, "Keeper connection error: {}", e; - "addr" => keeper.addr() - ); - break; - } - } - } - if !done { - Err(poll::CondCheckError::::NotYet) - } else { - Ok(()) - } - }, - &Duration::from_millis(1), - &Duration::from_secs(30), - ) - .await - .with_context(|| format!("failed to contact all keepers: {ids:?}"))?; - - info!(log, "Keepers ready: {ids:?}"); - Ok(()) -} - /// Try to ping the server until it is responds. async fn wait_for_ping(log: &Logger, client: &Client) -> anyhow::Result<()> { poll::wait_for_condition( diff --git a/oximeter/test-utils/Cargo.toml b/oximeter/test-utils/Cargo.toml index f463e74aca..0bff56583e 100644 --- a/oximeter/test-utils/Cargo.toml +++ b/oximeter/test-utils/Cargo.toml @@ -8,8 +8,12 @@ license = "MPL-2.0" workspace = true [dependencies] +anyhow.workspace = true chrono.workspace = true +clickward.workspace = true omicron-workspace-hack.workspace = true +omicron-test-utils.workspace = true oximeter-macro-impl.workspace = true oximeter-types.workspace = true +slog.workspace =true uuid.workspace = true diff --git a/oximeter/test-utils/src/lib.rs b/oximeter/test-utils/src/lib.rs index 04c49add65..02f928abc0 100644 --- a/oximeter/test-utils/src/lib.rs +++ b/oximeter/test-utils/src/lib.rs @@ -14,6 +14,9 @@ // lots of related issues and discussion. extern crate self as oximeter; +use anyhow::Context; +use clickward::{Deployment, KeeperClient, KeeperError, KeeperId}; +use omicron_test_utils::dev::poll; use oximeter_macro_impl::{Metric, Target}; use oximeter_types::histogram; use oximeter_types::histogram::{Histogram, Record}; @@ -22,6 +25,8 @@ use oximeter_types::types::{ Cumulative, Datum, DatumType, FieldType, FieldValue, Measurement, Sample, }; use oximeter_types::{Metric, Target}; +use slog::{debug, info, Logger}; +use std::time::Duration; use uuid::Uuid; #[derive(Target)] @@ -127,6 +132,59 @@ pub fn generate_test_samples( samples } +/// Wait for all keeper servers to be capable of handling commands +pub async fn wait_for_keepers( + log: &Logger, + deployment: &Deployment, + ids: Vec, +) -> anyhow::Result<()> { + let mut keepers = vec![]; + for id in &ids { + keepers.push(KeeperClient::new(deployment.keeper_addr(*id)?)); + } + + poll::wait_for_condition( + || async { + let mut done = true; + for keeper in &keepers { + match keeper.config().await { + Ok(config) => { + // The node isn't really up yet + if config.len() != keepers.len() { + done = false; + debug!(log, "Keeper config not set"; + "addr" => keeper.addr(), + "expected" => keepers.len(), + "got" => config.len() + ); + break; + } + } + Err(e) => { + done = false; + debug!(log, "Keeper connection error: {}", e; + "addr" => keeper.addr() + ); + break; + } + } + } + if !done { + Err(poll::CondCheckError::::NotYet) + } else { + Ok(()) + } + }, + &Duration::from_millis(1), + &Duration::from_secs(30), + ) + .await + .with_context(|| format!("failed to contact all keepers: {ids:?}"))?; + + info!(log, "Keepers ready: {ids:?}"); + Ok(()) +} + #[cfg(test)] mod tests { use chrono::Utc; From 5cbbae26ebe6780cf57d253c4dd793a7e65e132f Mon Sep 17 00:00:00 2001 From: karencfv Date: Wed, 18 Sep 2024 13:08:14 +1200 Subject: [PATCH 19/20] actually add the new integration test --- clickhouse-admin/tests/integration_test.rs | 69 ++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 clickhouse-admin/tests/integration_test.rs diff --git a/clickhouse-admin/tests/integration_test.rs b/clickhouse-admin/tests/integration_test.rs new file mode 100644 index 0000000000..9967988958 --- /dev/null +++ b/clickhouse-admin/tests/integration_test.rs @@ -0,0 +1,69 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use anyhow::Context; +use camino::Utf8PathBuf; +use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId}; +use dropshot::test_util::log_prefix_for_test; +use omicron_clickhouse_admin::ClickhouseCli; +use omicron_test_utils::dev::test_setup_log; +use oximeter_test_utils::wait_for_keepers; +use slog::info; +use std::net::{Ipv6Addr, SocketAddrV6}; +use std::str::FromStr; + +#[tokio::test] +async fn test_lgif_parsing() -> anyhow::Result<()> { + let logctx = test_setup_log("test_lgif_parsing"); + let log = logctx.log.clone(); + + let (parent_dir, prefix) = log_prefix_for_test(logctx.test_name()); + let path = parent_dir.join(format!("{prefix}-oximeter-clickward-test")); + std::fs::create_dir(&path)?; + + // We use the default ports in `test_schemas_disjoint` and must use a + // separate set here in case the two tests run concurrently. + let base_ports = BasePorts { + keeper: 29000, + raft: 29100, + clickhouse_tcp: 29200, + clickhouse_http: 29300, + clickhouse_interserver_http: 29400, + }; + + let config = DeploymentConfig { + path: path.clone(), + base_ports, + cluster_name: "oximeter_cluster".to_string(), + }; + + let mut deployment = Deployment::new(config); + + // We only need a single keeper to test the lgif command + let num_keepers = 1; + let num_replicas = 1; + deployment + .generate_config(num_keepers, num_replicas) + .context("failed to generate config")?; + deployment.deploy().context("failed to deploy")?; + + wait_for_keepers(&log, &deployment, vec![KeeperId(1)]).await?; + + let clickhouse_cli = ClickhouseCli::new( + Utf8PathBuf::from_str("clickhouse").unwrap(), + SocketAddrV6::new(Ipv6Addr::LOCALHOST, 29001, 0, 0), + ) + .with_log(log.clone()); + + let lgif = clickhouse_cli.lgif().await.unwrap(); + + // The first log index from a newly created cluster should always be 1 + assert_eq!(lgif.first_log_idx, 1); + + info!(&log, "Cleaning up test"); + deployment.teardown()?; + std::fs::remove_dir_all(path)?; + logctx.cleanup_successful(); + Ok(()) +} From bd3465d6e529a06b7ae496523b6e0867e3b606b1 Mon Sep 17 00:00:00 2001 From: karencfv Date: Thu, 19 Sep 2024 13:33:35 +1200 Subject: [PATCH 20/20] Goodbye macro --- clickhouse-admin/types/src/lib.rs | 258 ++++++++++++------------------ 1 file changed, 99 insertions(+), 159 deletions(-) diff --git a/clickhouse-admin/types/src/lib.rs b/clickhouse-admin/types/src/lib.rs index 56e71d66fe..7fe3b99fd9 100644 --- a/clickhouse-admin/types/src/lib.rs +++ b/clickhouse-admin/types/src/lib.rs @@ -9,7 +9,6 @@ use derive_more::{Add, AddAssign, Display, From}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use slog::{info, Logger}; -use std::collections::HashMap; use std::fs::create_dir; use std::io::{ErrorKind, Write}; use std::net::Ipv6Addr; @@ -209,81 +208,29 @@ impl KeeperSettings { } } -macro_rules! define_struct_and_set_values { - ( - #[$doc_struct:meta] - struct $name:ident { - $( - #[$doc_field:meta] - $field_name:ident: $field_type:ty - ),* $(,)? - } - ) => { - #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] - #[serde(rename_all = "snake_case")] - #[$doc_struct] - pub struct $name { - $( - #[$doc_field] - pub $field_name: $field_type - ),* - } - - impl $name { - // Check if a field name matches a given key and set its value. - // Since the struct will be serialised into JSON for the reponse. - // we want to make sure the keys are exactly the same. - pub fn set_field_value(&mut self, key: &str, value: u64) -> Result<()> { - match key { - $( - stringify!($field_name) => { - self.$field_name = value; - Ok(()) - }, - )* - _ => bail!("Key '{}' is not part of the expected keys.", key), - } - } - } - }; -} - -define_struct_and_set_values! { - /// Logically grouped information file from a keeper node - struct Lgif { - /// Index of the first log entry in the current log segment - first_log_idx: u64, - /// Term of the leader when the first log entry was created - first_log_term: u64, - /// Index of the last log entry in the current log segment - last_log_idx: u64, - /// Term of the leader when the last log entry was created - last_log_term: u64, - /// Index of the last committed log entry - last_committed_log_idx: u64, - /// Index of the last committed log entry from the leader's perspective - leader_committed_log_idx: u64, - /// Target index for log commitment during replication or recovery - target_committed_log_idx: u64, - /// Index of the most recent snapshot taken - last_snapshot_idx: u64, - } +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +/// Logically grouped information file from a keeper node +pub struct Lgif { + /// Index of the first log entry in the current log segment + pub first_log_idx: u64, + /// Term of the leader when the first log entry was created + pub first_log_term: u64, + /// Index of the last log entry in the current log segment + pub last_log_idx: u64, + /// Term of the leader when the last log entry was created + pub last_log_term: u64, + /// Index of the last committed log entry + pub last_committed_log_idx: u64, + /// Index of the last committed log entry from the leader's perspective + pub leader_committed_log_idx: u64, + /// Target index for log commitment during replication or recovery + pub target_committed_log_idx: u64, + /// Index of the most recent snapshot taken + pub last_snapshot_idx: u64, } impl Lgif { - pub fn new() -> Self { - Self { - first_log_idx: 0, - first_log_term: 0, - last_log_idx: 0, - last_log_term: 0, - last_committed_log_idx: 0, - leader_committed_log_idx: 0, - target_committed_log_idx: 0, - last_snapshot_idx: 0, - } - } - pub fn parse(log: &Logger, data: &[u8]) -> Result { // The reponse we get from running `clickhouse keeper-client -h {HOST} --q lgif` // isn't in any known format (e.g. JSON), but rather a series of lines with key-value @@ -300,87 +247,68 @@ impl Lgif { // target_committed_log_idx 10889 // last_snapshot_idx 9465 // ``` - // - // To parse the data we follow these steps: - // - // 1. Create an iterator over the lines of a string. These are split at newlines. - // 2. Each line is split by the tab, and we make sure that the key and value are - // valid. - // 3. Once we have a HashMap of valid key-value pairs, we set the fields of the - // Lgif struct with the retrieved data. To do this we make sure that the name - // of each HashMap key matches one of the field names and then we set the - // corresponding value. - let binding = String::from_utf8_lossy(data); + let s = String::from_utf8_lossy(data); info!( log, "Retrieved data from `clickhouse keeper-config lgif`"; - "output" => ?binding + "output" => ?s ); - let lines = binding.lines(); - let mut lgif: HashMap = HashMap::new(); - - for line in lines { - let line = line.trim(); - if !line.is_empty() { - let l: Vec<&str> = line.split('\t').collect(); - - if l.len() != 2 { - bail!( - "Command output has a line that does not contain a key-value pair: {l:?}" - ); - } - - let key = l[0].to_string(); - let raw_value = l[1]; - let value = match u64::from_str(raw_value) { - Ok(v) => v, - Err(e) => { - bail!( - "Unable to convert value {raw_value:?} into u64 for key {key}: {e}" - ); - } - }; - - lgif.insert(key, value); - } - } - let mut keys: Vec = lgif.keys().cloned().collect(); - let mut expected_keys = Lgif::expected_keys(); - keys.sort(); - expected_keys.sort(); - if !keys.eq(&expected_keys) { + let expected = Lgif::expected_keys(); + + // Verify the output contains the same amount of lines as the expected keys. + // This will ensure we catch any new key-value pairs appended to the lgif output. + let lines = s.trim().lines(); + if expected.len() != lines.count() { bail!( - "Command output contains different keys to those expected. \ - Keys: {keys:?} Expected keys: {expected_keys:?}" + "Output from the Keeper differs to the expected output keys \ + Output: {s:?} \ + Expected output keys: {expected:?}" ); } - let mut parsed_data = Lgif::new(); - for (key, value) in lgif { - match parsed_data.set_field_value(&key, value) { - Ok(()) => (), - Err(e) => { - bail!( - "Unable to set Lgif struct field with key value pair: {e}" - ); - } + let mut vals: Vec = Vec::new(); + for (line, expected_key) in s.lines().zip(expected.clone()) { + let mut split = line.split('\t'); + let Some(key) = split.next() else { + bail!("Returned None while attempting to retrieve key"); + }; + if key != expected_key { + bail!("Extracted key `{key:?}` from output differs from expected key `{expected_key}`"); + } + let Some(val) = split.next() else { + bail!("Command output has a line that does not contain a key-value pair: {key:?}"); + }; + let val = match u64::from_str(val) { + Ok(v) => v, + Err(e) => bail!("Unable to convert value {val:?} into u64 for key {key}: {e}"), }; + vals.push(val); } - Ok(parsed_data) + let mut iter = vals.into_iter(); + Ok(Lgif { + first_log_idx: iter.next().unwrap(), + first_log_term: iter.next().unwrap(), + last_log_idx: iter.next().unwrap(), + last_log_term: iter.next().unwrap(), + last_committed_log_idx: iter.next().unwrap(), + leader_committed_log_idx: iter.next().unwrap(), + target_committed_log_idx: iter.next().unwrap(), + last_snapshot_idx: iter.next().unwrap(), + }) } - fn expected_keys() -> Vec { + fn expected_keys() -> Vec<&'static str> { vec![ - "first_log_idx".to_string(), - "first_log_term".to_string(), - "last_log_idx".to_string(), - "last_log_term".to_string(), - "last_committed_log_idx".to_string(), - "leader_committed_log_idx".to_string(), - "target_committed_log_idx".to_string(), - "last_snapshot_idx".to_string(), + "first_log_idx", + "first_log_term", + "last_log_idx", + "last_log_term", + "last_committed_log_idx", + "leader_committed_log_idx", + "target_committed_log_idx", + "last_snapshot_idx", ] } } @@ -500,8 +428,7 @@ mod tests { fn test_full_lgif_parse_success() { let log = log(); let data = - "first_log_idx\t1\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 - \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + "first_log_idx\t1\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); let lgif = Lgif::parse(&log, data).unwrap(); @@ -519,8 +446,7 @@ mod tests { fn test_missing_keys_lgif_parse_fail() { let log = log(); let data = - "first_log_idx\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 - \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + "first_log_idx\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); let result = Lgif::parse(&log, data); let error = result.unwrap_err(); @@ -528,9 +454,9 @@ mod tests { assert_eq!( format!("{}", root_cause), - "Command output contains different keys to those expected. \ - Keys: [\"first_log_idx\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"] \ - Expected keys: [\"first_log_idx\", \"first_log_term\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"]" + "Output from the Keeper differs to the expected output keys \ + Output: \"first_log_idx\\t1\\nlast_log_idx\\t4386\\nlast_log_term\\t1\\nlast_committed_log_idx\\t4386\\nleader_committed_log_idx\\t4386\\ntarget_committed_log_idx\\t4386\\nlast_snapshot_idx\\t0\\n\\n\" \ + Expected output keys: [\"first_log_idx\", \"first_log_term\", \"last_log_idx\", \"last_log_term\", \"last_committed_log_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\", \"last_snapshot_idx\"]" ); } @@ -538,8 +464,7 @@ mod tests { fn test_empty_value_lgif_parse_fail() { let log = log(); let data = - "first_log_idx\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 - \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + "first_log_idx\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); let result = Lgif::parse(&log, data); let error = result.unwrap_err(); @@ -547,7 +472,7 @@ mod tests { assert_eq!( format!("{}", root_cause), - "Command output has a line that does not contain a key-value pair: [\"first_log_idx\"]" + "Command output has a line that does not contain a key-value pair: \"first_log_idx\"" ); } @@ -555,8 +480,7 @@ mod tests { fn test_non_u64_value_lgif_parse_fail() { let log = log(); let data = - "first_log_idx\t1\nfirst_log_term\tBOB\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 - \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + "first_log_idx\t1\nfirst_log_term\tBOB\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); let result = Lgif::parse(&log, data); let error = result.unwrap_err(); @@ -572,8 +496,7 @@ mod tests { fn test_non_existent_key_with_correct_value_lgif_parse_fail() { let log = log(); let data = - "first_log_idx\t1\nfirst_log\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386 - \nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" + "first_log_idx\t1\nfirst_log\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\n\n" .as_bytes(); let result = Lgif::parse(&log, data); let error = result.unwrap_err(); @@ -581,24 +504,41 @@ mod tests { assert_eq!( format!("{}", root_cause), - "Command output contains different keys to those expected. \ - Keys: [\"first_log\", \"first_log_idx\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"] \ - Expected keys: [\"first_log_idx\", \"first_log_term\", \"last_committed_log_idx\", \"last_log_idx\", \"last_log_term\", \"last_snapshot_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\"]" + "Extracted key `\"first_log\"` from output differs from expected key `first_log_term`" ); } #[test] - fn test_all_nonsense_lgif_parse_fail() { + fn test_additional_key_value_pairs_in_output_parse_fail() { let log = log(); - let data = "Mmmbop, ba duba dop ba\nDu bop, ba du dop ba\nYeah, yeah" + let data = "first_log_idx\t1\nfirst_log_term\t1\nlast_log_idx\t4386\nlast_log_term\t1\nlast_committed_log_idx\t4386\nleader_committed_log_idx\t4386\ntarget_committed_log_idx\t4386\nlast_snapshot_idx\t0\nlast_snapshot_idx\t3\n\n" .as_bytes(); + + let result = Lgif::parse(&log, data); + let error = result.unwrap_err(); + let root_cause = error.root_cause(); + + assert_eq!( + format!("{}", root_cause), + "Output from the Keeper differs to the expected output keys \ + Output: \"first_log_idx\\t1\\nfirst_log_term\\t1\\nlast_log_idx\\t4386\\nlast_log_term\\t1\\nlast_committed_log_idx\\t4386\\nleader_committed_log_idx\\t4386\\ntarget_committed_log_idx\\t4386\\nlast_snapshot_idx\\t0\\nlast_snapshot_idx\\t3\\n\\n\" \ + Expected output keys: [\"first_log_idx\", \"first_log_term\", \"last_log_idx\", \"last_log_term\", \"last_committed_log_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\", \"last_snapshot_idx\"]", + ); + } + + #[test] + fn test_empty_output_parse_fail() { + let log = log(); + let data = "".as_bytes(); let result = Lgif::parse(&log, data); let error = result.unwrap_err(); let root_cause = error.root_cause(); assert_eq!( format!("{}", root_cause), - "Command output has a line that does not contain a key-value pair: [\"Mmmbop, ba duba dop ba\"]", + "Output from the Keeper differs to the expected output keys \ + Output: \"\" \ + Expected output keys: [\"first_log_idx\", \"first_log_term\", \"last_log_idx\", \"last_log_term\", \"last_committed_log_idx\", \"leader_committed_log_idx\", \"target_committed_log_idx\", \"last_snapshot_idx\"]", ); } }