Skip to content

Commit

Permalink
feat: add structured logging for coprocessor
Browse files Browse the repository at this point in the history
+ remove all panics from tfhe worker to allow graceful error handling
  • Loading branch information
david-zk committed Sep 26, 2024
1 parent d799643 commit d91af2d
Show file tree
Hide file tree
Showing 9 changed files with 407 additions and 59 deletions.
158 changes: 158 additions & 0 deletions fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ anyhow = "1.0.86"
daggy = "0.8.0"
serde = "1.0.210"
prometheus = "0.13.4"
log = { version = "0.4.22", features = ["kv"] }

[profile.dev.package.tfhe]
overflow-checks = false
Expand Down
2 changes: 2 additions & 0 deletions fhevm-engine/coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ strum = { version = "0.26", features = ["derive"] }
bincode.workspace = true
sha3.workspace = true
prometheus.workspace = true
log.workspace = true
structured-logger = "1.0.3"
actix-web = "4.9.0"

[dev-dependencies]
Expand Down
16 changes: 10 additions & 6 deletions fhevm-engine/coprocessor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ pub fn start_runtime(
tokio::select! {
main = async_main(args) => {
if let Err(e) = main {
eprintln!("Runtime error: {:?}", e);
log::error!(target: "main_wchannel", error = e.to_string(); "Runtime error");
}
}
_ = close_recv.changed() => {
eprintln!("Service stopped voluntarily");
log::info!(target: "main_wchannel", "Service stopped voluntarily");
}
}
} else {
if let Err(e) = async_main(args).await {
eprintln!("Runtime error: {:?}", e);
log::error!(target: "main", error = e.to_string(); "Runtime error");
}
}
})
Expand All @@ -60,19 +60,23 @@ pub fn start_runtime(
async fn async_main(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
structured_logger::Builder::new()
.with_default_writer(structured_logger::async_json::new_writer(tokio::io::stdout()))
.init();

let mut set = JoinSet::new();
if args.run_server {
println!("Initializing api server");
log::info!(target: "async_main", "Initializing api server");
set.spawn(crate::server::run_server(args.clone()));
}

if args.run_bg_worker {
println!("Initializing background worker");
log::info!(target: "async_main", "Initializing background worker");
set.spawn(crate::tfhe_worker::run_tfhe_worker(args.clone()));
}

if !args.metrics_addr.is_empty() {
println!("Initializing metrics server");
log::info!(target: "async_main", "Initializing metrics server");
set.spawn(crate::metrics::run_metrics_server(args.clone()));
}

Expand Down
2 changes: 1 addition & 1 deletion fhevm-engine/coprocessor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn healthcheck() -> impl actix_web::Responder {
pub async fn run_metrics_server(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("metrics server listening at {}", args.metrics_addr);
log::info!("metrics server listening at {}", args.metrics_addr);
let _ = actix_web::HttpServer::new(|| {
actix_web::App::new()
.route("/metrics", actix_web::web::to(metrics))
Expand Down
6 changes: 3 additions & 3 deletions fhevm-engine/coprocessor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn run_server(
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
if let Err(e) = run_server_iteration(args.clone()).await {
println!("Error running server, retrying shortly: {:?}", e);
log::error!(target: "grpc_server", error = e.to_string(); "Error running server, retrying shortly");
}

tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
Expand All @@ -86,9 +86,9 @@ pub async fn run_server_iteration(
let coprocessor_key_file = tokio::fs::read_to_string(&args.coprocessor_private_key).await?;

let signer = PrivateKeySigner::from_str(coprocessor_key_file.trim())?;
println!("Coprocessor signer address: {}", signer.address());
log::info!(target: "grpc_server", address = signer.address().to_string(); "Coprocessor signer initiated");

println!("Coprocessor listening on {}", addr);
log::info!("Coprocessor listening on {}", addr);
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(args.pg_pool_max_connections)
.connect(&db_url)
Expand Down
17 changes: 10 additions & 7 deletions fhevm-engine/coprocessor/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ pub async fn run_tfhe_worker(
// here we log the errors and make sure we retry
if let Err(cycle_error) = tfhe_worker_cycle(&args).await {
WORKER_ERRORS_COUNTER.inc();
eprintln!(
"Error in background worker, retrying shortly: {:?}",
cycle_error
);
log::error!(target: "tfhe_worker", error = cycle_error.to_string(); "Error in background worker, retrying shortly");
}
tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
}
Expand Down Expand Up @@ -68,11 +65,11 @@ async fn tfhe_worker_cycle(
tokio::select! {
_ = listener.try_recv() => {
WORK_ITEMS_NOTIFICATIONS_COUNTER.inc();
println!("Received work_available notification from postgres");
log::info!(target: "tfhe_worker", "Received work_available notification from postgres");
},
_ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => {
WORK_ITEMS_POLL_COUNTER.inc();
println!("Polling the database for more work on timer");
log::info!(target: "tfhe_worker", "Polling the database for more work on timer");
},
};
}
Expand Down Expand Up @@ -114,7 +111,7 @@ async fn tfhe_worker_cycle(
}

WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64);
println!("Processing {} work items", the_work.len());
log::info!(target: "tfhe_worker", count = the_work.len(); "Processing work items");

// make sure we process each tenant sequentially not to
// load different keys in cache by different tenants
Expand Down Expand Up @@ -273,6 +270,12 @@ async fn tfhe_worker_cycle(
}
Err((err, tenant_id, output_handle)) => {
WORKER_ERRORS_COUNTER.inc();
log::error!(target: "tfhe_worker",
tenant_id,
output_handle = format!("0x{}", hex::encode(&output_handle)),
error = err.to_string();
"error while processing work item"
);
let _ = query!(
"
UPDATE computations
Expand Down
Loading

0 comments on commit d91af2d

Please sign in to comment.