Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(composer): propagate errors #1838

Open
wants to merge 4 commits into
base: ENG-914/composer_blackbox_tests
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/astria-composer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Bump penumbra dependencies [#1740](https://github.com/astriaorg/astria/pull/1740).
- Propagate errors [#1838](https://github.com/astriaorg/astria/pull/1838).

## [1.0.0-rc.2] - 2024-10-23

Expand Down
61 changes: 43 additions & 18 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::Duration,
};

Expand All @@ -12,11 +13,16 @@ use astria_eyre::eyre::{
use itertools::Itertools as _;
use tokio::{
io,
join,
signal::unix::{
signal,
SignalKind,
},
sync::watch,
sync::{
watch,
Mutex,
OnceCell,
},
task::{
JoinError,
JoinHandle,
Expand Down Expand Up @@ -226,7 +232,7 @@ impl Composer {
pub async fn run_until_stopped(self) -> eyre::Result<()> {
let Self {
api_server,
mut composer_status_sender,
composer_status_sender,
executor,
executor_handle,
mut geth_collector_tasks,
Expand All @@ -239,6 +245,8 @@ impl Composer {
fee_asset,
} = self;

let mut exit_err: OnceCell<eyre::Report> = OnceCell::new();

// we need the API server to shutdown at the end, since it is used by k8s
// to report the liveness of the service
let api_server_shutdown_token = CancellationToken::new();
Expand All @@ -259,12 +267,19 @@ impl Composer {
let mut executor_task = tokio::spawn(executor.run_until_stopped());

// wait for collectors and executor to come online
wait_for_collectors(&geth_collector_statuses, &mut composer_status_sender)
.await
.wrap_err("geth collectors failed to become ready")?;
wait_for_executor(executor_status, &mut composer_status_sender)
.await
.wrap_err("executor failed to become ready")?;
let composer_status_sender = Arc::new(Mutex::new(composer_status_sender));
let collectors_startup_fut =
wait_for_collectors(&geth_collector_statuses, composer_status_sender.clone());
let executor_startup_fut = wait_for_executor(executor_status, composer_status_sender);

match join!(collectors_startup_fut, executor_startup_fut) {
(Ok(()), Ok(())) => {}
(Err(e), Ok(())) => error!(%e, "geth collectors failed to become ready"),
(Ok(()), Err(e)) => error!(%e, "executor failed to become ready"),
(Err(collector_err), Err(executor_err)) => {
error!(%collector_err, %executor_err, "geth collectors and executor failed to become ready");
}
};

// run the grpc server
let mut grpc_server_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -293,7 +308,7 @@ impl Composer {
};
},
o = &mut api_task => {
report_exit("api server unexpectedly ended", o);
report_exit("api server unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -304,7 +319,7 @@ impl Composer {
};
},
o = &mut executor_task => {
report_exit("executor unexpectedly ended", o);
report_exit("executor unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -315,7 +330,7 @@ impl Composer {
};
},
o = &mut grpc_server_handle => {
report_exit("grpc server unexpectedly ended", o);
report_exit("grpc server unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -326,7 +341,7 @@ impl Composer {
};
},
Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => {
report_exit("collector", collector_exit);
report_exit("collector", collector_exit, &exit_err);
if let Some(url) = rollups.get(&rollup) {
let collector = geth::Builder {
chain_name: rollup.clone(),
Expand All @@ -348,7 +363,11 @@ impl Composer {
});
};

shutdown_info.run().await
let shutdown_res = shutdown_info.run().await;
if let Some(exit_err) = exit_err.take() {
return Err(exit_err);
}
shutdown_res
}
}

Expand Down Expand Up @@ -467,14 +486,14 @@ fn spawn_geth_collectors(
#[instrument(skip_all, err)]
async fn wait_for_executor(
mut executor_status: watch::Receiver<executor::Status>,
composer_status_sender: &mut watch::Sender<composer::Status>,
composer_status_sender: Arc<Mutex<watch::Sender<composer::Status>>>,
) -> eyre::Result<()> {
executor_status
.wait_for(executor::Status::is_connected)
.await
.wrap_err("executor failed while waiting for it to become ready")?;

composer_status_sender.send_modify(|status| {
composer_status_sender.lock().await.send_modify(|status| {
status.set_executor_connected(true);
});

Expand All @@ -485,7 +504,7 @@ async fn wait_for_executor(
#[instrument(skip_all, err)]
async fn wait_for_collectors(
collector_statuses: &HashMap<String, watch::Receiver<collectors::geth::Status>>,
composer_status_sender: &mut watch::Sender<composer::Status>,
composer_status_sender: Arc<Mutex<watch::Sender<composer::Status>>>,
) -> eyre::Result<()> {
use futures::{
future::FutureExt as _,
Expand Down Expand Up @@ -525,21 +544,27 @@ async fn wait_for_collectors(
}
}

composer_status_sender.send_modify(|status| {
composer_status_sender.lock().await.send_modify(|status| {
status.set_all_collectors_connected(true);
});

Ok(())
}

fn report_exit(task_name: &str, outcome: Result<eyre::Result<()>, JoinError>) {
fn report_exit(
task_name: &str,
outcome: Result<eyre::Result<()>, JoinError>,
exit_err: &OnceCell<eyre::Report>,
) {
match outcome {
Ok(Ok(())) => info!(task = task_name, "task exited successfully"),
Ok(Err(error)) => {
error!(%error, task = task_name, "task returned with error");
let _ = exit_err.set(error);
}
Err(error) => {
error!(%error, task = task_name, "task failed to complete");
let _ = exit_err.set(error.into());
}
}
}
Expand Down
14 changes: 4 additions & 10 deletions crates/astria-composer/tests/blackbox/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::helper::{
mount_broadcast_tx_sync_rollup_data_submissions_mock,
signed_tx_from_request,
spawn_composer,
TEST_CHAIN_ID,
};

/// Test to check that the executor sends a signed transaction to the sequencer after its
Expand Down Expand Up @@ -203,20 +204,13 @@ async fn two_rollup_data_submissions_single_bundle() {
/// ID
#[tokio::test]
async fn chain_id_mismatch_returns_error() {
// TODO(https://github.com/astriaorg/astria/issues/1833): this test will currently succeed if
// the executor fails for any reason on startup, not just if the chain ID is incorrect. This is
// a symptom of the current implementation of executor, though, which should be propagating
// errors. As such, I think it is out of the scope for the following test-only changes and
// should be fixed in a followup.

let bad_chain_id = "bad_id";
let test_composer = spawn_composer(&["test1"], Some(bad_chain_id), false).await;
let err = test_composer.composer.await.unwrap().unwrap_err();
for cause in err.chain() {
if cause
.to_string()
.contains("executor failed while waiting for it to become ready")
{
if cause.to_string().contains(&format!(
"expected chain ID `{TEST_CHAIN_ID}`, but received `{bad_chain_id}`"
)) {
return;
}
}
Expand Down
Loading