Skip to content

Commit

Permalink
feat: tests starting copro with existing local DB
Browse files Browse the repository at this point in the history
Introduce the `COPROCESSOR_TEST_LOCAL_DB` environment variable that uses
an existing local DB, but starts coprocessor service instances.
  • Loading branch information
dartdart26 committed Sep 12, 2024
1 parent f4fa623 commit bdb25cb
Showing 1 changed file with 59 additions and 34 deletions.
93 changes: 59 additions & 34 deletions fhevm-engine/coprocessor/src/tests/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::cli::Args;
use std::sync::atomic::{AtomicU16, Ordering};
use testcontainers::{core::WaitFor, runners::AsyncRunner, GenericImage, ImageExt};
use tokio::sync::watch::Receiver;

pub struct TestInstance {
// just to destroy container
Expand Down Expand Up @@ -41,28 +42,75 @@ pub fn default_tenant_id() -> i32 {
pub async fn setup_test_app() -> Result<TestInstance, Box<dyn std::error::Error>> {
if std::env::var("COPROCESSOR_TEST_LOCALHOST").is_ok() {
setup_test_app_existing_localhost().await
} else if std::env::var("COPROCESSOR_TEST_LOCAL_DB").is_ok() {
setup_test_app_existing_db().await
} else {
setup_test_app_custom_docker().await
}
}

pub async fn setup_test_app_existing_localhost() -> Result<TestInstance, Box<dyn std::error::Error>> {
const LOCAL_DB_URL: &str = "postgresql://postgres:[email protected]:5432/coprocessor";

async fn setup_test_app_existing_localhost() -> Result<TestInstance, Box<dyn std::error::Error>> {
Ok(TestInstance {
_container: None,
app_close_channel: None,
app_url: "http://127.0.0.1:50051".to_string(),
db_url: "postgresql://postgres:[email protected]:5432/coprocessor".to_string(),
db_url: LOCAL_DB_URL.to_string(),
})
}

pub async fn setup_test_app_custom_docker() -> Result<TestInstance, Box<dyn std::error::Error>> {
pub async fn setup_test_app_existing_db() -> Result<TestInstance, Box<dyn std::error::Error>> {
let app_port = get_app_port();
let (app_close_channel, rx) = tokio::sync::watch::channel(false);
start_coprocessor(rx, app_port, &LOCAL_DB_URL).await;
Ok(TestInstance {
_container: None,
app_close_channel: Some(app_close_channel),
app_url: format!("http://127.0.0.1:{app_port}"),
db_url: LOCAL_DB_URL.to_string(),
})
}

async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
let args: Args = Args {
run_bg_worker: true,
run_server: true,
generate_fhe_keys: false,
server_maximum_ciphertexts_to_schedule: 5000,
work_items_batch_size: 40,
tenant_key_cache_size: 4,
coprocessor_fhe_threads: 4,
maximum_handles_per_input: 255,
tokio_threads: 2,
pg_pool_max_connections: 2,
server_addr: format!("127.0.0.1:{app_port}"),
database_url: Some(db_url.to_string()),
maximimum_compact_inputs_upload: 10,
coprocessor_private_key: "./coprocessor.key".to_string(),
};

std::thread::spawn(move || {
crate::start_runtime(args, Some(rx));
});

// wait until app port is opened
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}

fn get_app_port() -> u16 {
static PORT_COUNTER: AtomicU16 = AtomicU16::new(10000);

let app_port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
// wrap around, if we ever have that many tests?
if app_port >= 50000 {
PORT_COUNTER.store(10000, Ordering::SeqCst);
}
app_port
}

async fn setup_test_app_custom_docker() -> Result<TestInstance, Box<dyn std::error::Error>> {
let app_port = get_app_port();

let container = GenericImage::new("postgres", "15.7")
.with_wait_for(WaitFor::message_on_stderr(
Expand Down Expand Up @@ -98,30 +146,7 @@ pub async fn setup_test_app_custom_docker() -> Result<TestInstance, Box<dyn std:
println!("DB prepared");

let (app_close_channel, rx) = tokio::sync::watch::channel(false);
let args: Args = Args {
run_bg_worker: true,
run_server: true,
generate_fhe_keys: false,
server_maximum_ciphertexts_to_schedule: 5000,
work_items_batch_size: 40,
tenant_key_cache_size: 4,
coprocessor_fhe_threads: 4,
maximum_handles_per_input: 255,
tokio_threads: 2,
pg_pool_max_connections: 2,
server_addr: format!("127.0.0.1:{app_port}"),
database_url: Some(db_url.clone()),
maximimum_compact_inputs_upload: 10,
coprocessor_private_key: "./coprocessor.key".to_string(),
};

std::thread::spawn(move || {
crate::start_runtime(args, Some(rx));
});

// wait until app port is opened
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

start_coprocessor(rx, app_port, &db_url).await;
Ok(TestInstance {
_container: Some(container),
app_close_channel: Some(app_close_channel),
Expand All @@ -130,19 +155,19 @@ pub async fn setup_test_app_custom_docker() -> Result<TestInstance, Box<dyn std:
})
}

pub async fn wait_until_all_ciphertexts_computed(test_instance: &TestInstance) -> Result<(), Box<dyn std::error::Error>> {
pub async fn wait_until_all_ciphertexts_computed(
test_instance: &TestInstance,
) -> Result<(), Box<dyn std::error::Error>> {
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(2)
.connect(test_instance.db_url())
.await?;

loop {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let count = sqlx::query!(
"SELECT count(*) FROM computations WHERE NOT is_completed"
)
.fetch_one(&pool)
.await?;
let count = sqlx::query!("SELECT count(*) FROM computations WHERE NOT is_completed")
.fetch_one(&pool)
.await?;
let current_count = count.count.unwrap();
if current_count == 0 {
println!("All computations completed");
Expand All @@ -153,4 +178,4 @@ pub async fn wait_until_all_ciphertexts_computed(test_instance: &TestInstance) -
}

Ok(())
}
}

0 comments on commit bdb25cb

Please sign in to comment.