Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add structured logging for coprocessor #38

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ anyhow = "1.0.86"
daggy = "0.8.0"
serde = "1.0.210"
prometheus = "0.13.4"
log = { version = "0.4.22", features = ["kv"] }

[profile.dev.package.tfhe]
overflow-checks = false
Expand Down
2 changes: 2 additions & 0 deletions fhevm-engine/coprocessor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ strum = { version = "0.26", features = ["derive"] }
bincode.workspace = true
sha3.workspace = true
prometheus.workspace = true
log.workspace = true
structured-logger = "1.0.3"
actix-web = "4.9.0"

[dev-dependencies]
Expand Down
16 changes: 10 additions & 6 deletions fhevm-engine/coprocessor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ pub fn start_runtime(
tokio::select! {
main = async_main(args) => {
if let Err(e) = main {
eprintln!("Runtime error: {:?}", e);
log::error!(target: "main_wchannel", error = e.to_string(); "Runtime error");
}
}
_ = close_recv.changed() => {
eprintln!("Service stopped voluntarily");
log::info!(target: "main_wchannel", "Service stopped voluntarily");
}
}
} else {
if let Err(e) = async_main(args).await {
eprintln!("Runtime error: {:?}", e);
log::error!(target: "main", error = e.to_string(); "Runtime error");
}
}
})
Expand All @@ -60,19 +60,23 @@ pub fn start_runtime(
async fn async_main(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
structured_logger::Builder::new()
.with_default_writer(structured_logger::async_json::new_writer(tokio::io::stdout()))
.init();

let mut set = JoinSet::new();
if args.run_server {
println!("Initializing api server");
log::info!(target: "async_main", "Initializing api server");
set.spawn(crate::server::run_server(args.clone()));
}

if args.run_bg_worker {
println!("Initializing background worker");
log::info!(target: "async_main", "Initializing background worker");
set.spawn(crate::tfhe_worker::run_tfhe_worker(args.clone()));
}

if !args.metrics_addr.is_empty() {
println!("Initializing metrics server");
log::info!(target: "async_main", "Initializing metrics server");
set.spawn(crate::metrics::run_metrics_server(args.clone()));
}

Expand Down
2 changes: 1 addition & 1 deletion fhevm-engine/coprocessor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn healthcheck() -> impl actix_web::Responder {
pub async fn run_metrics_server(
args: crate::cli::Args,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("metrics server listening at {}", args.metrics_addr);
log::info!("metrics server listening at {}", args.metrics_addr);
let _ = actix_web::HttpServer::new(|| {
actix_web::App::new()
.route("/metrics", actix_web::web::to(metrics))
Expand Down
6 changes: 3 additions & 3 deletions fhevm-engine/coprocessor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub async fn run_server(
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
if let Err(e) = run_server_iteration(args.clone()).await {
println!("Error running server, retrying shortly: {:?}", e);
log::error!(target: "grpc_server", error = e.to_string(); "Error running server, retrying shortly");
}

tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
Expand All @@ -86,9 +86,9 @@ pub async fn run_server_iteration(
let coprocessor_key_file = tokio::fs::read_to_string(&args.coprocessor_private_key).await?;

let signer = PrivateKeySigner::from_str(coprocessor_key_file.trim())?;
println!("Coprocessor signer address: {}", signer.address());
log::info!(target: "grpc_server", address = signer.address().to_string(); "Coprocessor signer initiated");

println!("Coprocessor listening on {}", addr);
log::info!("Coprocessor listening on {}", addr);
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(args.pg_pool_max_connections)
.connect(&db_url)
Expand Down
17 changes: 10 additions & 7 deletions fhevm-engine/coprocessor/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ pub async fn run_tfhe_worker(
// here we log the errors and make sure we retry
if let Err(cycle_error) = tfhe_worker_cycle(&args).await {
WORKER_ERRORS_COUNTER.inc();
eprintln!(
"Error in background worker, retrying shortly: {:?}",
cycle_error
);
log::error!(target: "tfhe_worker", error = cycle_error.to_string(); "Error in background worker, retrying shortly");
}
tokio::time::sleep(tokio::time::Duration::from_millis(5000)).await;
}
Expand Down Expand Up @@ -68,11 +65,11 @@ async fn tfhe_worker_cycle(
tokio::select! {
_ = listener.try_recv() => {
WORK_ITEMS_NOTIFICATIONS_COUNTER.inc();
println!("Received work_available notification from postgres");
log::info!(target: "tfhe_worker", "Received work_available notification from postgres");
},
_ = tokio::time::sleep(tokio::time::Duration::from_millis(5000)) => {
WORK_ITEMS_POLL_COUNTER.inc();
println!("Polling the database for more work on timer");
log::info!(target: "tfhe_worker", "Polling the database for more work on timer");
},
};
}
Expand Down Expand Up @@ -114,7 +111,7 @@ async fn tfhe_worker_cycle(
}

WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64);
println!("Processing {} work items", the_work.len());
log::info!(target: "tfhe_worker", count = the_work.len(); "Processing work items");

// make sure we process each tenant sequentially not to
// load different keys in cache by different tenants
Expand Down Expand Up @@ -273,6 +270,12 @@ async fn tfhe_worker_cycle(
}
Err((err, tenant_id, output_handle)) => {
WORKER_ERRORS_COUNTER.inc();
log::error!(target: "tfhe_worker",
tenant_id,
output_handle = format!("0x{}", hex::encode(&output_handle)),
error = err.to_string();
"error while processing work item"
);
let _ = query!(
"
UPDATE computations
Expand Down
Loading