From 91656dc77f20bd77422fe7c51e9f37f48ba7cf9b Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Tue, 26 Nov 2024 17:39:13 -0600 Subject: [PATCH 1/4] propagate errors, wait for collectors and executor concurrently --- crates/astria-composer/src/composer.rs | 79 ++++++++++++++++++++------ 1 file changed, 61 insertions(+), 18 deletions(-) diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index 4a6e5b774d..f258149034 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, net::SocketAddr, + sync::Arc, time::Duration, }; @@ -12,11 +13,16 @@ use astria_eyre::eyre::{ use itertools::Itertools as _; use tokio::{ io, + join, signal::unix::{ signal, SignalKind, }, - sync::watch, + sync::{ + watch, + Mutex, + OnceCell, + }, task::{ JoinError, JoinHandle, @@ -226,7 +232,7 @@ impl Composer { pub async fn run_until_stopped(self) -> eyre::Result<()> { let Self { api_server, - mut composer_status_sender, + composer_status_sender, executor, executor_handle, mut geth_collector_tasks, @@ -239,6 +245,8 @@ impl Composer { fee_asset, } = self; + let mut exit_err: OnceCell = OnceCell::new(); + // we need the API server to shutdown at the end, since it is used by k8s // to report the liveness of the service let api_server_shutdown_token = CancellationToken::new(); @@ -259,12 +267,37 @@ impl Composer { let mut executor_task = tokio::spawn(executor.run_until_stopped()); // wait for collectors and executor to come online - wait_for_collectors(&geth_collector_statuses, &mut composer_status_sender) - .await - .wrap_err("geth collectors failed to become ready")?; - wait_for_executor(executor_status, &mut composer_status_sender) - .await - .wrap_err("executor failed to become ready")?; + let composer_status_sender = Arc::new(Mutex::new(composer_status_sender)); + let collectors_startup_fut = + wait_for_collectors(&geth_collector_statuses, composer_status_sender.clone()); + let executor_startup_fut = wait_for_executor(executor_status, composer_status_sender); + + let startup_success = match join!(collectors_startup_fut, executor_startup_fut) { + (Ok(()), Ok(())) => true, + (Err(e), _) => { + error!(%e, "geth collectors failed to become ready"); + let _ = exit_err.set(e); + false + } + (_, Err(e)) => { + error!(%e, "executor failed to become ready"); + let _ = exit_err.set(e); + false + } + }; + + if !startup_success { + return ShutdownInfo { + api_server_shutdown_token, + composer_shutdown_token: shutdown_token, + api_server_task_handle: Some(api_task), + executor_task_handle: Some(executor_task), + grpc_server_task_handle: None, + geth_collector_tasks, + } + .run() + .await; + } // run the grpc server let mut grpc_server_handle = tokio::spawn(async move { @@ -293,7 +326,7 @@ impl Composer { }; }, o = &mut api_task => { - report_exit("api server unexpectedly ended", o); + report_exit("api server unexpectedly ended", o, &exit_err); break ShutdownInfo { api_server_shutdown_token, composer_shutdown_token: shutdown_token, @@ -304,7 +337,7 @@ impl Composer { }; }, o = &mut executor_task => { - report_exit("executor unexpectedly ended", o); + report_exit("executor unexpectedly ended", o, &exit_err); break ShutdownInfo { api_server_shutdown_token, composer_shutdown_token: shutdown_token, @@ -315,7 +348,7 @@ impl Composer { }; }, o = &mut grpc_server_handle => { - report_exit("grpc server unexpectedly ended", o); + report_exit("grpc server unexpectedly ended", o, &exit_err); break ShutdownInfo { api_server_shutdown_token, composer_shutdown_token: shutdown_token, @@ -326,7 +359,7 @@ impl Composer { }; }, Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => { - report_exit("collector", collector_exit); + report_exit("collector", collector_exit, &exit_err); if let Some(url) = rollups.get(&rollup) { let collector = geth::Builder { chain_name: rollup.clone(), @@ -348,7 +381,11 @@ impl Composer { }); }; - shutdown_info.run().await + let shutdown_res = shutdown_info.run().await; + if let Some(exit_err) = exit_err.take() { + return Err(exit_err); + } + shutdown_res } } @@ -467,14 +504,14 @@ fn spawn_geth_collectors( #[instrument(skip_all, err)] async fn wait_for_executor( mut executor_status: watch::Receiver, - composer_status_sender: &mut watch::Sender, + composer_status_sender: Arc>>, ) -> eyre::Result<()> { executor_status .wait_for(executor::Status::is_connected) .await .wrap_err("executor failed while waiting for it to become ready")?; - composer_status_sender.send_modify(|status| { + composer_status_sender.lock().await.send_modify(|status| { status.set_executor_connected(true); }); @@ -485,7 +522,7 @@ async fn wait_for_executor( #[instrument(skip_all, err)] async fn wait_for_collectors( collector_statuses: &HashMap>, - composer_status_sender: &mut watch::Sender, + composer_status_sender: Arc>>, ) -> eyre::Result<()> { use futures::{ future::FutureExt as _, @@ -525,21 +562,27 @@ async fn wait_for_collectors( } } - composer_status_sender.send_modify(|status| { + composer_status_sender.lock().await.send_modify(|status| { status.set_all_collectors_connected(true); }); Ok(()) } -fn report_exit(task_name: &str, outcome: Result, JoinError>) { +fn report_exit( + task_name: &str, + outcome: Result, JoinError>, + exit_err: &OnceCell, +) { match outcome { Ok(Ok(())) => info!(task = task_name, "task exited successfully"), Ok(Err(error)) => { error!(%error, task = task_name, "task returned with error"); + let _ = exit_err.set(error); } Err(error) => { error!(%error, task = task_name, "task failed to complete"); + let _ = exit_err.set(error.into()); } } } From 3bba951eeeb1bd313b86dd40b4c58a9084a3696c Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Tue, 26 Nov 2024 17:52:06 -0600 Subject: [PATCH 2/4] update changelog --- crates/astria-composer/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/astria-composer/CHANGELOG.md b/crates/astria-composer/CHANGELOG.md index e430c3d066..03f84b697e 100644 --- a/crates/astria-composer/CHANGELOG.md +++ b/crates/astria-composer/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Bump penumbra dependencies [#1740](https://github.com/astriaorg/astria/pull/1740). +- Propagate errors [#1838](https://github.com/astriaorg/astria/pull/1838). ## [1.0.0-rc.2] - 2024-10-23 From 08016876871f3faea2be281964ce4e0e5fa1245f Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Wed, 27 Nov 2024 08:52:23 -0600 Subject: [PATCH 3/4] don't error out waiting for collectors or executor --- crates/astria-composer/src/composer.rs | 30 ++++--------------- .../tests/blackbox/executor.rs | 15 ++++------ 2 files changed, 11 insertions(+), 34 deletions(-) diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index f258149034..ff5a0ee224 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -272,33 +272,15 @@ impl Composer { wait_for_collectors(&geth_collector_statuses, composer_status_sender.clone()); let executor_startup_fut = wait_for_executor(executor_status, composer_status_sender); - let startup_success = match join!(collectors_startup_fut, executor_startup_fut) { - (Ok(()), Ok(())) => true, - (Err(e), _) => { - error!(%e, "geth collectors failed to become ready"); - let _ = exit_err.set(e); - false - } - (_, Err(e)) => { - error!(%e, "executor failed to become ready"); - let _ = exit_err.set(e); - false + match join!(collectors_startup_fut, executor_startup_fut) { + (Ok(()), Ok(())) => {} + (Err(e), Ok(())) => error!(%e, "geth collectors failed to become ready"), + (Ok(()), Err(e)) => error!(%e, "executor failed to become ready"), + (Err(collector_err), Err(executor_err)) => { + error!(%collector_err, %executor_err, "geth collectors and executor failed to become ready"); } }; - if !startup_success { - return ShutdownInfo { - api_server_shutdown_token, - composer_shutdown_token: shutdown_token, - api_server_task_handle: Some(api_task), - executor_task_handle: Some(executor_task), - grpc_server_task_handle: None, - geth_collector_tasks, - } - .run() - .await; - } - // run the grpc server let mut grpc_server_handle = tokio::spawn(async move { grpc_server diff --git a/crates/astria-composer/tests/blackbox/executor.rs b/crates/astria-composer/tests/blackbox/executor.rs index 4abf7dd733..32b5ce321a 100644 --- a/crates/astria-composer/tests/blackbox/executor.rs +++ b/crates/astria-composer/tests/blackbox/executor.rs @@ -17,6 +17,7 @@ use crate::helper::{ mount_broadcast_tx_sync_rollup_data_submissions_mock, signed_tx_from_request, spawn_composer, + TEST_CHAIN_ID, }; /// Test to check that the executor sends a signed transaction to the sequencer after its @@ -203,20 +204,14 @@ async fn two_rollup_data_submissions_single_bundle() { /// ID #[tokio::test] async fn chain_id_mismatch_returns_error() { - // TODO(https://github.com/astriaorg/astria/issues/1833): this test will currently succeed if - // the executor fails for any reason on startup, not just if the chain ID is incorrect. This is - // a symptom of the current implementation of executor, though, which should be propagating - // errors. As such, I think it is out of the scope for the following test-only changes and - // should be fixed in a followup. - let bad_chain_id = "bad_id"; let test_composer = spawn_composer(&["test1"], Some(bad_chain_id), false).await; let err = test_composer.composer.await.unwrap().unwrap_err(); for cause in err.chain() { - if cause - .to_string() - .contains("executor failed while waiting for it to become ready") - { + println!("{cause}"); + if cause.to_string().contains(&format!( + "expected chain ID `{TEST_CHAIN_ID}`, but received `{bad_chain_id}`" + )) { return; } } From 9439dd850ad0a6d41cb4413dcd7ea0b74dbd50d1 Mon Sep 17 00:00:00 2001 From: ethanoroshiba Date: Wed, 27 Nov 2024 09:05:41 -0600 Subject: [PATCH 4/4] remove debugging line --- crates/astria-composer/tests/blackbox/executor.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/astria-composer/tests/blackbox/executor.rs b/crates/astria-composer/tests/blackbox/executor.rs index 32b5ce321a..7c69492c86 100644 --- a/crates/astria-composer/tests/blackbox/executor.rs +++ b/crates/astria-composer/tests/blackbox/executor.rs @@ -208,7 +208,6 @@ async fn chain_id_mismatch_returns_error() { let test_composer = spawn_composer(&["test1"], Some(bad_chain_id), false).await; let err = test_composer.composer.await.unwrap().unwrap_err(); for cause in err.chain() { - println!("{cause}"); if cause.to_string().contains(&format!( "expected chain ID `{TEST_CHAIN_ID}`, but received `{bad_chain_id}`" )) {