Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Jul 24, 2024
1 parent 52ed6eb commit b9c7a13
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
31 changes: 30 additions & 1 deletion oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ use std::ops::Bound;
use std::path::Path;
use std::path::PathBuf;
use std::sync::OnceLock;
use std::time::Duration;
use std::time::Instant;
use tokio::fs;
use tokio::sync::Mutex;
use uuid::Uuid;

const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
const CLICKHOUSE_DB_MISSING: &'static str = "Database oximeter does not exist";
const CLICKHOUSE_DB_VERSION_MISSING: &'static str =
"Table oximeter.version does not exist";
Expand All @@ -77,6 +79,7 @@ pub struct Client {
url: String,
client: reqwest::Client,
schema: Mutex<BTreeMap<TimeseriesName, TimeseriesSchema>>,
request_timeout: Duration,
}

impl Client {
Expand All @@ -90,7 +93,32 @@ impl Client {
let client = reqwest::Client::new();
let url = format!("http://{}", address);
let schema = Mutex::new(BTreeMap::new());
Self { _id: id, log, url, client, schema }
Self {
_id: id,
log,
url,
client,
schema,
request_timeout: DEFAULT_REQUEST_TIMEOUT,
}
}

/// Construct a new ClickHouse client of the database at `address`, and a
/// custom request timeout.
pub fn new_with_request_timeout(
address: SocketAddr,
log: &Logger,
request_timeout: Duration,
) -> Self {
let id = Uuid::new_v4();
let log = log.new(slog::o!(
"component" => "clickhouse-client",
"id" => id.to_string(),
));
let client = reqwest::Client::new();
let url = format!("http://{}", address);
let schema = Mutex::new(BTreeMap::new());
Self { _id: id, log, url, client, schema, request_timeout }
}

/// Ping the ClickHouse server to verify connectivitiy.
Expand Down Expand Up @@ -896,6 +924,7 @@ impl Client {
let response = self
.client
.post(&self.url)
.timeout(self.request_timeout)
.query(&[
("output_format_json_quote_64bit_integers", "0"),
// TODO-performance: This is needed to get the correct counts of
Expand Down
43 changes: 36 additions & 7 deletions oximeter/db/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl TestInput {

#[tokio::test]
async fn test_cluster() -> anyhow::Result<()> {
let request_timeout = Duration::from_secs(5);
usdt::register_probes().unwrap();
let start = tokio::time::Instant::now();
let logctx = test_setup_log("test_cluster");
Expand All @@ -58,8 +59,16 @@ async fn test_cluster() -> anyhow::Result<()> {
.context("failed to generate config")?;
deployment.deploy().context("failed to deploy")?;

let client1 = Client::new(deployment.http_addr(1)?, log);
let client2 = Client::new(deployment.http_addr(2)?, log);
let client1 = Client::new_with_request_timeout(
deployment.http_addr(1)?,
log,
request_timeout,
);
let client2 = Client::new_with_request_timeout(
deployment.http_addr(2)?,
log,
request_timeout,
);
wait_for_ping(&client1).await?;
wait_for_ping(&client2).await?;
wait_for_keepers(&deployment, (1..=3).collect()).await?;
Expand Down Expand Up @@ -122,7 +131,11 @@ async fn test_cluster() -> anyhow::Result<()> {

// Add a 3rd clickhouse server and wait for it to come up
deployment.add_server().expect("failed to launch a 3rd clickhouse server");
let client3 = Client::new(deployment.http_addr(3)?, log);
let client3 = Client::new_with_request_timeout(
deployment.http_addr(3)?,
log,
request_timeout,
);
wait_for_ping(&client3).await?;

// We need to initiate copying from existing replicated tables by creating
Expand Down Expand Up @@ -198,22 +211,33 @@ async fn test_cluster() -> anyhow::Result<()> {
.expect("failed to get samples from client1");

println!("Attempting to insert samples without keeper quorum");
let samples = test_util::generate_test_samples(
input.n_projects,
input.n_instances,
input.n_cpus,
input.n_samples,
);
// We have lost quorum and should not be able to insert
client1
.insert_samples(&samples)
.await
.expect_err("Insert succeeded without keeper quorum");
.expect_err("insert succeeded without keeper quorum");

// Bringing the keeper back up should allow us to insert again
/* deployment.start_keeper(1).expect("failed to restart keeper");
deployment.start_keeper(1).expect("failed to restart keeper");
wait_for_keepers(&deployment, vec![1, 3])
.await
.expect("failed to sync keepers");
let samples = test_util::generate_test_samples(
input.n_projects,
input.n_instances,
input.n_cpus,
input.n_samples,
);
client1.insert_samples(&samples).await.expect("failed to insert samples");
wait_for_num_points(&client2, samples.len() * 4)
.await
.expect("failed to get samples from client1");
*/

println!("Cleaning up test");
deployment.teardown()?;
Expand Down Expand Up @@ -262,6 +286,11 @@ async fn wait_for_num_points(
poll::CondCheckError::<oximeter_db::Error>::NotYet
})?;
if total_points(&oxql_res) != n_samples {
println!(
"received {}, expected {}",
total_points(&oxql_res),
n_samples
);
Err(poll::CondCheckError::<oximeter_db::Error>::NotYet)
} else {
Ok(())
Expand All @@ -271,7 +300,7 @@ async fn wait_for_num_points(
&Duration::from_secs(10),
)
.await
.context("failed to ping clickhouse server")?;
.context("failed to get all samples from clickhouse server")?;
Ok(())
}

Expand Down

0 comments on commit b9c7a13

Please sign in to comment.