Skip to content

Commit

Permalink
chore(composer): propagate errors (#1838)
Browse files Browse the repository at this point in the history
## Summary
Propagate errors which occur while composer is starting up and/or
running so that the will be returned by the Composer's handle.

## Background
Previously, composer would only exit with an error if the collectors' or
executor's status channels were closed, and then the error message did
not provide detailed information about the error that occurred.
Additionally, if either of the `wait_for_*` loops failed, the composer
would not shut down gracefully. This change is meant to expose the first
eyre report which causes the composer to shut down, and gracefully shut
down in all circumstances.

## Changes
- Started collector and executor `wait_for_ready` loops concurrently,
and continue with graceful shutdown even if these fail.
- Store the first error composer encounters, and return it after
graceful shutdown. If waiting for collectors or executor fails, Composer
continues so that it can ascertain the underlying error from the task
which caused it.

## Testing
Passing all tests

## Changelogs
Changelog updated

## Related Issues
closes #1833
  • Loading branch information
ethanoroshiba authored Jan 23, 2025
1 parent c6ca388 commit 9553576
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 28 deletions.
1 change: 1 addition & 0 deletions crates/astria-composer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Bump MSRV to 1.83.0 [#1857](https://github.com/astriaorg/astria/pull/1857).
- Bump penumbra dependencies [#1740](https://github.com/astriaorg/astria/pull/1740).
- Propagate errors [#1838](https://github.com/astriaorg/astria/pull/1838).

## [1.0.0-rc.2] - 2024-10-23

Expand Down
54 changes: 38 additions & 16 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ use astria_eyre::eyre::{
use itertools::Itertools as _;
use tokio::{
io,
join,
signal::unix::{
signal,
SignalKind,
},
sync::watch,
sync::{
watch,
OnceCell,
},
task::{
JoinError,
JoinHandle,
Expand Down Expand Up @@ -226,7 +230,7 @@ impl Composer {
pub async fn run_until_stopped(self) -> eyre::Result<()> {
let Self {
api_server,
mut composer_status_sender,
composer_status_sender,
executor,
executor_handle,
mut geth_collector_tasks,
Expand All @@ -239,6 +243,8 @@ impl Composer {
fee_asset,
} = self;

let mut exit_err: OnceCell<eyre::Report> = OnceCell::new();

// we need the API server to shutdown at the end, since it is used by k8s
// to report the liveness of the service
let api_server_shutdown_token = CancellationToken::new();
Expand All @@ -259,12 +265,18 @@ impl Composer {
let mut executor_task = tokio::spawn(executor.run_until_stopped());

// wait for collectors and executor to come online
wait_for_collectors(&geth_collector_statuses, &mut composer_status_sender)
.await
.wrap_err("geth collectors failed to become ready")?;
wait_for_executor(executor_status, &mut composer_status_sender)
.await
.wrap_err("executor failed to become ready")?;
let collectors_startup_fut =
wait_for_collectors(&geth_collector_statuses, composer_status_sender.clone());
let executor_startup_fut = wait_for_executor(executor_status, composer_status_sender);

match join!(collectors_startup_fut, executor_startup_fut) {
(Ok(()), Ok(())) => {}
(Err(e), Ok(())) => error!(%e, "geth collectors failed to become ready"),
(Ok(()), Err(e)) => error!(%e, "executor failed to become ready"),
(Err(collector_err), Err(executor_err)) => {
error!(%collector_err, %executor_err, "geth collectors and executor failed to become ready");
}
};

// run the grpc server
let mut grpc_server_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -293,7 +305,7 @@ impl Composer {
};
},
o = &mut api_task => {
report_exit("api server unexpectedly ended", o);
report_exit("api server unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -304,7 +316,7 @@ impl Composer {
};
},
o = &mut executor_task => {
report_exit("executor unexpectedly ended", o);
report_exit("executor unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -315,7 +327,7 @@ impl Composer {
};
},
o = &mut grpc_server_handle => {
report_exit("grpc server unexpectedly ended", o);
report_exit("grpc server unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -326,7 +338,7 @@ impl Composer {
};
},
Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => {
report_exit("collector", collector_exit);
report_exit("collector", collector_exit, &exit_err);
if let Some(url) = rollups.get(&rollup) {
let collector = geth::Builder {
chain_name: rollup.clone(),
Expand All @@ -348,7 +360,11 @@ impl Composer {
});
};

shutdown_info.run().await
let shutdown_res = shutdown_info.run().await;
if let Some(exit_err) = exit_err.take() {
return Err(exit_err);
}
shutdown_res
}
}

Expand Down Expand Up @@ -467,7 +483,7 @@ fn spawn_geth_collectors(
#[instrument(skip_all, err)]
async fn wait_for_executor(
mut executor_status: watch::Receiver<executor::Status>,
composer_status_sender: &mut watch::Sender<composer::Status>,
composer_status_sender: watch::Sender<composer::Status>,
) -> eyre::Result<()> {
executor_status
.wait_for(executor::Status::is_connected)
Expand All @@ -485,7 +501,7 @@ async fn wait_for_executor(
#[instrument(skip_all, err)]
async fn wait_for_collectors(
collector_statuses: &HashMap<String, watch::Receiver<collectors::geth::Status>>,
composer_status_sender: &mut watch::Sender<composer::Status>,
composer_status_sender: watch::Sender<composer::Status>,
) -> eyre::Result<()> {
use futures::{
future::FutureExt as _,
Expand Down Expand Up @@ -532,14 +548,20 @@ async fn wait_for_collectors(
Ok(())
}

fn report_exit(task_name: &str, outcome: Result<eyre::Result<()>, JoinError>) {
fn report_exit(
task_name: &str,
outcome: Result<eyre::Result<()>, JoinError>,
exit_err: &OnceCell<eyre::Report>,
) {
match outcome {
Ok(Ok(())) => info!(task = task_name, "task exited successfully"),
Ok(Err(error)) => {
error!(%error, task = task_name, "task returned with error");
let _ = exit_err.set(error);
}
Err(error) => {
error!(%error, task = task_name, "task failed to complete");
let _ = exit_err.set(error.into());
}
}
}
Expand Down
29 changes: 17 additions & 12 deletions crates/astria-composer/tests/blackbox/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::time::Duration;
use std::{
fmt::Write as _,
time::Duration,
};

use astria_core::{
generated::astria::composer::v1::{
Expand All @@ -17,6 +20,7 @@ use crate::helper::{
mount_broadcast_tx_sync_rollup_data_submissions_mock,
signed_tx_from_request,
spawn_composer,
TEST_CHAIN_ID,
};

/// Test to check that the executor sends a signed transaction to the sequencer after its
Expand Down Expand Up @@ -207,22 +211,23 @@ async fn two_rollup_data_submissions_single_bundle() {
/// ID
#[tokio::test]
async fn chain_id_mismatch_returns_error() {
// TODO(https://github.com/astriaorg/astria/issues/1833): this test will currently succeed if
// the executor fails for any reason on startup, not just if the chain ID is incorrect. This is
// a symptom of the current implementation of executor, though, which should be propagating
// errors. As such, I think it is out of the scope for the following test-only changes and
// should be fixed in a followup.

let bad_chain_id = "bad_id";
let test_composer = spawn_composer(&["test1"], Some(bad_chain_id), false).await;
let expected_err_msg =
format!("expected chain ID `{TEST_CHAIN_ID}`, but received `{bad_chain_id}`");
let err = test_composer.composer.await.unwrap().unwrap_err();
for cause in err.chain() {
if cause
.to_string()
.contains("executor failed while waiting for it to become ready")
{
if cause.to_string().contains(&expected_err_msg) {
return;
}
}
panic!("did not find expected executor error message")
let mut panic_msg = String::new();
writeln!(
&mut panic_msg,
"did not find expected executor error message"
)
.unwrap();
writeln!(&mut panic_msg, "expected cause:\n\t{expected_err_msg}").unwrap();
writeln!(&mut panic_msg, "actual cause chain:\n\t{err:?}").unwrap();
panic!("{panic_msg}");
}

0 comments on commit 9553576

Please sign in to comment.