diff --git a/crates/astria-composer/CHANGELOG.md b/crates/astria-composer/CHANGELOG.md index e430c3d066..03f84b697e 100644 --- a/crates/astria-composer/CHANGELOG.md +++ b/crates/astria-composer/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Bump penumbra dependencies [#1740](https://github.com/astriaorg/astria/pull/1740). +- Propagate errors [#1838](https://github.com/astriaorg/astria/pull/1838). ## [1.0.0-rc.2] - 2024-10-23 diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index 4a6e5b774d..ff5a0ee224 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, net::SocketAddr, + sync::Arc, time::Duration, }; @@ -12,11 +13,16 @@ use astria_eyre::eyre::{ use itertools::Itertools as _; use tokio::{ io, + join, signal::unix::{ signal, SignalKind, }, - sync::watch, + sync::{ + watch, + Mutex, + OnceCell, + }, task::{ JoinError, JoinHandle, @@ -226,7 +232,7 @@ impl Composer { pub async fn run_until_stopped(self) -> eyre::Result<()> { let Self { api_server, - mut composer_status_sender, + composer_status_sender, executor, executor_handle, mut geth_collector_tasks, @@ -239,6 +245,8 @@ impl Composer { fee_asset, } = self; + let mut exit_err: OnceCell = OnceCell::new(); + // we need the API server to shutdown at the end, since it is used by k8s // to report the liveness of the service let api_server_shutdown_token = CancellationToken::new(); @@ -259,12 +267,19 @@ impl Composer { let mut executor_task = tokio::spawn(executor.run_until_stopped()); // wait for collectors and executor to come online - wait_for_collectors(&geth_collector_statuses, &mut composer_status_sender) - .await - .wrap_err("geth collectors failed to become ready")?; - wait_for_executor(executor_status, &mut composer_status_sender) - .await - .wrap_err("executor failed to become ready")?; + let composer_status_sender = Arc::new(Mutex::new(composer_status_sender)); + let collectors_startup_fut = + wait_for_collectors(&geth_collector_statuses, composer_status_sender.clone()); + let executor_startup_fut = wait_for_executor(executor_status, composer_status_sender); + + match join!(collectors_startup_fut, executor_startup_fut) { + (Ok(()), Ok(())) => {} + (Err(e), Ok(())) => error!(%e, "geth collectors failed to become ready"), + (Ok(()), Err(e)) => error!(%e, "executor failed to become ready"), + (Err(collector_err), Err(executor_err)) => { + error!(%collector_err, %executor_err, "geth collectors and executor failed to become ready"); + } + }; // run the grpc server let mut grpc_server_handle = tokio::spawn(async move { @@ -293,7 +308,7 @@ impl Composer { }; }, o = &mut api_task => { - report_exit("api server unexpectedly ended", o); + report_exit("api server unexpectedly ended", o, &exit_err); break ShutdownInfo { api_server_shutdown_token, composer_shutdown_token: shutdown_token, @@ -304,7 +319,7 @@ impl Composer { }; }, o = &mut executor_task => { - report_exit("executor unexpectedly ended", o); + report_exit("executor unexpectedly ended", o, &exit_err); break ShutdownInfo { api_server_shutdown_token, composer_shutdown_token: shutdown_token, @@ -315,7 +330,7 @@ impl Composer { }; }, o = &mut grpc_server_handle => { - report_exit("grpc server unexpectedly ended", o); + report_exit("grpc server unexpectedly ended", o, &exit_err); break ShutdownInfo { api_server_shutdown_token, composer_shutdown_token: shutdown_token, @@ -326,7 +341,7 @@ impl Composer { }; }, Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => { - report_exit("collector", collector_exit); + report_exit("collector", collector_exit, &exit_err); if let Some(url) = rollups.get(&rollup) { let collector = geth::Builder { chain_name: rollup.clone(), @@ -348,7 +363,11 @@ impl Composer { }); }; - shutdown_info.run().await + let shutdown_res = shutdown_info.run().await; + if let Some(exit_err) = exit_err.take() { + return Err(exit_err); + } + shutdown_res } } @@ -467,14 +486,14 @@ fn spawn_geth_collectors( #[instrument(skip_all, err)] async fn wait_for_executor( mut executor_status: watch::Receiver, - composer_status_sender: &mut watch::Sender, + composer_status_sender: Arc>>, ) -> eyre::Result<()> { executor_status .wait_for(executor::Status::is_connected) .await .wrap_err("executor failed while waiting for it to become ready")?; - composer_status_sender.send_modify(|status| { + composer_status_sender.lock().await.send_modify(|status| { status.set_executor_connected(true); }); @@ -485,7 +504,7 @@ async fn wait_for_executor( #[instrument(skip_all, err)] async fn wait_for_collectors( collector_statuses: &HashMap>, - composer_status_sender: &mut watch::Sender, + composer_status_sender: Arc>>, ) -> eyre::Result<()> { use futures::{ future::FutureExt as _, @@ -525,21 +544,27 @@ async fn wait_for_collectors( } } - composer_status_sender.send_modify(|status| { + composer_status_sender.lock().await.send_modify(|status| { status.set_all_collectors_connected(true); }); Ok(()) } -fn report_exit(task_name: &str, outcome: Result, JoinError>) { +fn report_exit( + task_name: &str, + outcome: Result, JoinError>, + exit_err: &OnceCell, +) { match outcome { Ok(Ok(())) => info!(task = task_name, "task exited successfully"), Ok(Err(error)) => { error!(%error, task = task_name, "task returned with error"); + let _ = exit_err.set(error); } Err(error) => { error!(%error, task = task_name, "task failed to complete"); + let _ = exit_err.set(error.into()); } } } diff --git a/crates/astria-composer/tests/blackbox/executor.rs b/crates/astria-composer/tests/blackbox/executor.rs index 4abf7dd733..7c69492c86 100644 --- a/crates/astria-composer/tests/blackbox/executor.rs +++ b/crates/astria-composer/tests/blackbox/executor.rs @@ -17,6 +17,7 @@ use crate::helper::{ mount_broadcast_tx_sync_rollup_data_submissions_mock, signed_tx_from_request, spawn_composer, + TEST_CHAIN_ID, }; /// Test to check that the executor sends a signed transaction to the sequencer after its @@ -203,20 +204,13 @@ async fn two_rollup_data_submissions_single_bundle() { /// ID #[tokio::test] async fn chain_id_mismatch_returns_error() { - // TODO(https://github.com/astriaorg/astria/issues/1833): this test will currently succeed if - // the executor fails for any reason on startup, not just if the chain ID is incorrect. This is - // a symptom of the current implementation of executor, though, which should be propagating - // errors. As such, I think it is out of the scope for the following test-only changes and - // should be fixed in a followup. - let bad_chain_id = "bad_id"; let test_composer = spawn_composer(&["test1"], Some(bad_chain_id), false).await; let err = test_composer.composer.await.unwrap().unwrap_err(); for cause in err.chain() { - if cause - .to_string() - .contains("executor failed while waiting for it to become ready") - { + if cause.to_string().contains(&format!( + "expected chain ID `{TEST_CHAIN_ID}`, but received `{bad_chain_id}`" + )) { return; } }