Skip to content

Commit

Permalink
[reconfigurator] Retrieve keeper lgif information (#6549)
Browse files Browse the repository at this point in the history
## Overview

This commit introduces a new `clickhouse-admin` API endpoint:
`/keeper/lgif`.

This endpoint uses the ClickHouse CLI internally to retrieve and parse
the logically grouped information file from the ClickHouse keepers.

## Purpose

Reconfigurator will need this information to reliably manage and operate
a ClickHouse replicated cluster. Additional endpoints to retrieve other
information from ClickHouse servers or keepers will be added in follow
up PRs.

## Testing

In addition to the unit tests, I have manually tested with the following
results:

```console
$ cargo run --bin=clickhouse-admin -- run -c ./smf/clickhouse-admin/config.toml -a [::1]:8888 -l [::1]:20001 -b /Users/karcar/src/omicron/out/clickhouse/clickhouse
   Compiling omicron-clickhouse-admin v0.1.0 (/Users/karcar/src/omicron/clickhouse-admin)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 2.46s
     Running `target/debug/clickhouse-admin run -c ./smf/clickhouse-admin/config.toml -a '[::1]:8888' -l '[::1]:20001' -b /Users/karcar/src/omicron/out/clickhouse/clickhouse`
note: configured to log to "/dev/stdout"
{"msg":"listening","v":0,"name":"clickhouse-admin","level":30,"time":"2024-09-12T02:37:19.383597Z","hostname":"ixchel","pid":3115,"local_addr":"[::1]:8888","component":"dropshot","file":"/Users/karcar/.cargo/git/checkouts/dropshot-a4a923d29dccc492/06c8dab/dropshot/src/server.rs:205"}
{"msg":"accepted connection","v":0,"name":"clickhouse-admin","level":30,"time":"2024-09-12T02:37:23.843325Z","hostname":"ixchel","pid":3115,"local_addr":"[::1]:8888","component":"dropshot","file":"/Users/karcar/.cargo/git/checkouts/dropshot-a4a923d29dccc492/06c8dab/dropshot/src/server.rs:775","remote_addr":"[::1]:54455"}
{"msg":"request completed","v":0,"name":"clickhouse-admin","level":30,"time":"2024-09-12T02:37:24.302588Z","hostname":"ixchel","pid":3115,"uri":"/keeper/lgif","method":"GET","req_id":"64b232d0-d6ac-4cae-8f0a-f14cf6d1dfba","remote_addr":"[::1]:54455","local_addr":"[::1]:8888","component":"dropshot","file":"/Users/karcar/.cargo/git/checkouts/dropshot-a4a923d29dccc492/06c8dab/dropshot/src/server.rs:914","latency_us":458301,"response_code":"200"}
```

```console
$ curl http://[::1]:8888/keeper/lgif
{"first_log_idx":1,"first_log_term":1,"last_log_idx":11717,"last_log_term":20,"last_committed_log_idx":11717,"leader_committed_log_idx":11717,"target_committed_log_idx":11717,"last_snapshot_idx":9465}
```

Related: #5999
  • Loading branch information
karencfv authored Sep 19, 2024
1 parent bac635f commit c7f5a11
Show file tree
Hide file tree
Showing 17 changed files with 725 additions and 95 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions clickhouse-admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,13 @@ toml.workspace = true
omicron-workspace-hack.workspace = true

[dev-dependencies]
clickward.workspace = true
dropshot.workspace = true
expectorate.workspace = true
nexus-test-utils.workspace = true
omicron-test-utils.workspace = true
oximeter-db.workspace = true
oximeter-test-utils.workspace = true
openapi-lint.workspace = true
openapiv3.workspace = true
serde_json.workspace = true
Expand Down
17 changes: 15 additions & 2 deletions clickhouse-admin/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig};
use clickhouse_admin_types::{KeeperSettings, ServerSettings};
use dropshot::{HttpError, HttpResponseCreated, RequestContext, TypedBody};
use clickhouse_admin_types::{KeeperSettings, Lgif, ServerSettings};
use dropshot::{
HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody,
};
use omicron_common::api::external::Generation;
use schemars::JsonSchema;
use serde::Deserialize;
Expand Down Expand Up @@ -50,4 +52,15 @@ pub trait ClickhouseAdminApi {
rqctx: RequestContext<Self::Context>,
body: TypedBody<KeeperConfigurableSettings>,
) -> Result<HttpResponseCreated<KeeperConfig>, HttpError>;

/// Retrieve a logically grouped information file from a keeper node.
/// This information is used internally by ZooKeeper to manage snapshots
/// and logs for consistency and recovery.
#[endpoint {
method = GET,
path = "/keeper/lgif",
}]
async fn lgif(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Lgif>, HttpError>;
}
26 changes: 19 additions & 7 deletions clickhouse-admin/src/bin/clickhouse-admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use anyhow::anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use omicron_clickhouse_admin::{Clickward, Config};
use omicron_clickhouse_admin::{ClickhouseCli, Clickward, Config};
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use std::net::{SocketAddr, SocketAddrV6};
Expand All @@ -27,6 +27,14 @@ enum Args {
/// Path to the server configuration file
#[clap(long, short, action)]
config: Utf8PathBuf,

/// Address on which the clickhouse server or keeper is listening on
#[clap(long, short = 'l', action)]
listen_address: SocketAddrV6,

/// Path to the clickhouse binary
#[clap(long, short, action)]
binary_path: Utf8PathBuf,
},
}

Expand All @@ -41,17 +49,21 @@ async fn main_impl() -> Result<(), CmdError> {
let args = Args::parse();

match args {
Args::Run { http_address, config } => {
Args::Run { http_address, config, listen_address, binary_path } => {
let mut config = Config::from_file(&config)
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
config.dropshot.bind_address = SocketAddr::V6(http_address);

let clickward = Clickward::new();
let clickhouse_cli =
ClickhouseCli::new(binary_path, listen_address);

let server =
omicron_clickhouse_admin::start_server(clickward, config)
.await
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
let server = omicron_clickhouse_admin::start_server(
clickward,
clickhouse_cli,
config,
)
.await
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
server.await.map_err(|err| {
CmdError::Failure(anyhow!(
"server failed after starting: {err}"
Expand Down
130 changes: 130 additions & 0 deletions clickhouse-admin/src/clickhouse_cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use anyhow::Result;
use camino::Utf8PathBuf;
use clickhouse_admin_types::Lgif;
use dropshot::HttpError;
use illumos_utils::{output_to_exec_error, ExecutionError};
use slog::Logger;
use slog_error_chain::{InlineErrorChain, SlogInlineError};
use std::ffi::OsStr;
use std::io;
use std::net::SocketAddrV6;
use tokio::process::Command;

#[derive(Debug, thiserror::Error, SlogInlineError)]
pub enum ClickhouseCliError {
#[error("failed to run `clickhouse {subcommand}`")]
Run {
description: &'static str,
subcommand: String,
#[source]
err: io::Error,
},
#[error(transparent)]
ExecutionError(#[from] ExecutionError),
#[error("failed to parse command output")]
Parse {
description: &'static str,
stdout: String,
stderr: String,
#[source]
err: anyhow::Error,
},
}

impl From<ClickhouseCliError> for HttpError {
fn from(err: ClickhouseCliError) -> Self {
match err {
ClickhouseCliError::Run { .. }
| ClickhouseCliError::Parse { .. }
| ClickhouseCliError::ExecutionError(_) => {
let message = InlineErrorChain::new(&err).to_string();
HttpError {
status_code: http::StatusCode::INTERNAL_SERVER_ERROR,
error_code: Some(String::from("Internal")),
external_message: message.clone(),
internal_message: message,
}
}
}
}
}

#[derive(Debug)]
pub struct ClickhouseCli {
/// Path to where the clickhouse binary is located
pub binary_path: Utf8PathBuf,
/// Address on where the clickhouse keeper is listening on
pub listen_address: SocketAddrV6,
pub log: Option<Logger>,
}

impl ClickhouseCli {
pub fn new(binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self {
Self { binary_path, listen_address, log: None }
}

pub fn with_log(mut self, log: Logger) -> Self {
self.log = Some(log);
self
}

pub async fn lgif(&self) -> Result<Lgif, ClickhouseCliError> {
self.keeper_client_non_interactive(
"lgif",
"Retrieve logically grouped information file",
Lgif::parse,
self.log.clone().unwrap(),
)
.await
}

async fn keeper_client_non_interactive<F, T>(
&self,
query: &str,
subcommand_description: &'static str,
parse: F,
log: Logger,
) -> Result<T, ClickhouseCliError>
where
F: FnOnce(&Logger, &[u8]) -> Result<T>,
{
let mut command = Command::new(&self.binary_path);
command
.arg("keeper-client")
.arg("--host")
.arg(&format!("[{}]", self.listen_address.ip()))
.arg("--port")
.arg(&format!("{}", self.listen_address.port()))
.arg("--query")
.arg(query);

let output = command.output().await.map_err(|err| {
let err_args: Vec<&OsStr> = command.as_std().get_args().collect();
let err_args_parsed: Vec<String> = err_args
.iter()
.map(|&os_str| os_str.to_string_lossy().into_owned())
.collect();
let err_args_str = err_args_parsed.join(" ");
ClickhouseCliError::Run {
description: subcommand_description,
subcommand: err_args_str,
err,
}
})?;

if !output.status.success() {
return Err(output_to_exec_error(command.as_std(), &output).into());
}

parse(&log, &output.stdout).map_err(|err| ClickhouseCliError::Parse {
description: subcommand_description,
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stdout).to_string(),
err,
})
}
}
15 changes: 12 additions & 3 deletions clickhouse-admin/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use crate::Clickward;
use crate::{ClickhouseCli, Clickward};
use slog::Logger;

pub struct ServerContext {
clickward: Clickward,
clickhouse_cli: ClickhouseCli,
_log: Logger,
}

impl ServerContext {
pub fn new(clickward: Clickward, _log: Logger) -> Self {
Self { clickward, _log }
pub fn new(
clickward: Clickward,
clickhouse_cli: ClickhouseCli,
_log: Logger,
) -> Self {
Self { clickward, clickhouse_cli, _log }
}

pub fn clickward(&self) -> &Clickward {
&self.clickward
}

pub fn clickhouse_cli(&self) -> &ClickhouseCli {
&self.clickhouse_cli
}
}
13 changes: 12 additions & 1 deletion clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
use crate::context::ServerContext;
use clickhouse_admin_api::*;
use clickhouse_admin_types::config::{KeeperConfig, ReplicaConfig};
use dropshot::{HttpError, HttpResponseCreated, RequestContext, TypedBody};
use clickhouse_admin_types::Lgif;
use dropshot::{
HttpError, HttpResponseCreated, HttpResponseOk, RequestContext, TypedBody,
};
use std::sync::Arc;

type ClickhouseApiDescription = dropshot::ApiDescription<Arc<ServerContext>>;
Expand Down Expand Up @@ -44,4 +47,12 @@ impl ClickhouseAdminApi for ClickhouseAdminImpl {
let output = ctx.clickward().generate_keeper_config(keeper.settings)?;
Ok(HttpResponseCreated(output))
}

async fn lgif(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Lgif>, HttpError> {
let ctx = rqctx.context();
let output = ctx.clickhouse_cli().lgif().await?;
Ok(HttpResponseOk(output))
}
}
5 changes: 5 additions & 0 deletions clickhouse-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ use std::error::Error;
use std::io;
use std::sync::Arc;

mod clickhouse_cli;
mod clickward;
mod config;
mod context;
mod http_entrypoints;

pub use clickhouse_cli::ClickhouseCli;
pub use clickward::Clickward;
pub use config::Config;

Expand All @@ -34,6 +36,7 @@ pub type Server = dropshot::HttpServer<Arc<ServerContext>>;
/// Start the dropshot server
pub async fn start_server(
clickward: Clickward,
clickhouse_cli: ClickhouseCli,
server_config: Config,
) -> Result<Server, StartError> {
let (drain, registration) = slog_dtrace::with_drain(
Expand All @@ -56,6 +59,8 @@ pub async fn start_server(

let context = ServerContext::new(
clickward,
clickhouse_cli
.with_log(log.new(slog::o!("component" => "ClickhouseCli"))),
log.new(slog::o!("component" => "ServerContext")),
);
let http_server_starter = dropshot::HttpServerStarter::new(
Expand Down
Loading

0 comments on commit c7f5a11

Please sign in to comment.