diff --git a/crates/node/src/abortable.rs b/crates/node/src/abortable.rs index 5b02da4cab..dca8e5a409 100644 --- a/crates/node/src/abortable.rs +++ b/crates/node/src/abortable.rs @@ -20,6 +20,8 @@ pub struct AbortableSpawner { abort_send: UnboundedSender, abort_recv: UnboundedReceiver, cleanup_jobs: Vec>>>, + pinned: Option<(AbortingTask, JoinHandle>)>, + batch: Vec<(AbortingTask, JoinHandle>)>, } /// Contains the state of an on-going [`AbortableSpawner`] task spawn. @@ -28,6 +30,7 @@ pub struct AbortableTaskBuilder<'a, A> { abortable: A, spawner: &'a mut AbortableSpawner, cleanup: Option>>>, + pin: bool, } impl Default for AbortableSpawner { @@ -47,6 +50,8 @@ impl AbortableSpawner { abort_recv, shutdown_recv, cleanup_jobs: Vec::new(), + batch: Vec::new(), + pinned: None, } } @@ -63,6 +68,7 @@ impl AbortableSpawner { /// println!("I have signaled a control task that I am no longer running!"); /// }) /// .spawn(); + /// spawner.run_to_completion().await; /// ``` /// /// The return type of this method is [`AbortableTaskBuilder`], such that a @@ -79,17 +85,22 @@ impl AbortableSpawner { abortable, spawner: self, cleanup: None, + pin: false, } } - /// This future will resolve when: + /// Wait for any of the spawned tasks to abort. + /// + /// ## Resolving this future + /// + /// This future runs to completion if: /// /// 1. A user sends a shutdown signal (e.g. SIGINT), or... /// 2. One of the child processes of the ledger terminates, which /// generates a notification upon dropping an [`Aborter`]. /// /// These two scenarios are represented by the [`AborterStatus`] enum. - pub async fn wait_for_abort(mut self) -> AborterStatus { + async fn wait_for_abort(mut self) -> AborterStatus { let status = tokio::select! { _ = self.shutdown_recv.wait_for_shutdown() => AborterStatus::UserShutdownLedger, msg = self.abort_recv.recv() => { @@ -109,6 +120,49 @@ impl AbortableSpawner { status } + /// Run all the spawned tasks to completion. + pub async fn run_to_completion(mut self) { + let pinned_task = self.pinned.take(); + + let batch = std::mem::take(&mut self.batch); + let (task_ids, task_handles): (Vec<_>, Vec<_>) = + batch.into_iter().unzip(); + + // Wait for interrupt signal or abort message + let aborted = self.wait_for_abort().await.child_terminated(); + + // Wait for all managed tasks to finish + match futures::future::try_join_all(task_handles).await { + Ok(results) => { + for (i, res) in results.into_iter().enumerate() { + match res { + Err(err) if aborted => { + let who = task_ids[i]; + tracing::error!("{who} error: {err}"); + } + _ => {} + } + } + } + Err(err) => { + // Ignore cancellation errors + if !err.is_cancelled() { + tracing::error!("Abortable spawner error: {err}"); + } + } + } + + if let Some((who, pinned_task)) = pinned_task { + match pinned_task.await { + Err(err) if err.is_panic() => { + std::panic::resume_unwind(err.into_panic()) + } + Err(err) => tracing::error!("{who} error: {err}"), + _ => {} + } + } + } + fn spawn_abortable_task( &self, who: AbortingTask, @@ -144,7 +198,7 @@ impl AbortableSpawner { impl<'a, A> AbortableTaskBuilder<'a, A> { /// Spawn the built abortable task into the runtime. #[inline] - pub fn spawn(self) -> JoinHandle> + pub fn spawn(self) where A: FnOnce(Aborter) -> F, F: Future> + Send + 'static, @@ -152,20 +206,37 @@ impl<'a, A> AbortableTaskBuilder<'a, A> { if let Some(cleanup) = self.cleanup { self.spawner.cleanup_jobs.push(cleanup); } - self.spawner.spawn_abortable_task(self.who, self.abortable) + let task = self.spawner.spawn_abortable_task(self.who, self.abortable); + if self.pin { + if let Some(pinned_task) = self.spawner.pinned.take() { + self.spawner.batch.push(pinned_task); + } + self.spawner.pinned = Some((self.who, task)); + } else { + self.spawner.batch.push((self.who, task)); + } } /// Spawn the built abortable (blocking) task into the runtime. #[inline] - pub fn spawn_blocking(self) -> JoinHandle> + pub fn spawn_blocking(self) where A: FnOnce(Aborter) -> ShellResult<()> + Send + 'static, { if let Some(cleanup) = self.cleanup { self.spawner.cleanup_jobs.push(cleanup); } - self.spawner - .spawn_abortable_task_blocking(self.who, self.abortable) + let task = self + .spawner + .spawn_abortable_task_blocking(self.who, self.abortable); + if self.pin { + if let Some(pinned_task) = self.spawner.pinned.take() { + self.spawner.batch.push(pinned_task); + } + self.spawner.pinned = Some((self.who, task)); + } else { + self.spawner.batch.push((self.who, task)); + } } /// A cleanup routine `cleanup` will be executed for the associated task. @@ -178,6 +249,14 @@ impl<'a, A> AbortableTaskBuilder<'a, A> { self.cleanup = Some(Box::pin(cleanup)); self } + + /// Pin the task to spawn. The main purpose behind this operation + /// is to resume unwinding the stack if the pinned task panics. + #[inline] + pub fn pin(mut self) -> Self { + self.pin = true; + self + } } /// A panic-proof handle for aborting a future. Will abort during stack diff --git a/crates/node/src/ethereum_oracle/mod.rs b/crates/node/src/ethereum_oracle/mod.rs index 9c340e79cb..7026295111 100644 --- a/crates/node/src/ethereum_oracle/mod.rs +++ b/crates/node/src/ethereum_oracle/mod.rs @@ -283,7 +283,7 @@ pub fn run_oracle( control: control::Receiver, last_processed_block: last_processed_block::Sender, spawner: &mut AbortableSpawner, -) -> tokio::task::JoinHandle> { +) { let url = url.as_ref().to_owned(); spawner .abortable("Ethereum Oracle", move |aborter| { @@ -317,7 +317,7 @@ pub fn run_oracle( drop(aborter); Ok(()) }) - .spawn_blocking() + .spawn_blocking(); } /// Determine what action to take after attempting to diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index c090ce08c0..ad827f6d3d 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -53,7 +53,6 @@ use namada_sdk::time::DateTimeUtc; use once_cell::unsync::Lazy; use sysinfo::{RefreshKind, System, SystemExt}; use tokio::sync::mpsc; -use tokio::task; use self::abortable::AbortableSpawner; use self::ethereum_oracle::last_processed_block; @@ -446,15 +445,13 @@ async fn run_aux( let mut spawner = AbortableSpawner::new(); // Start Tendermint node - let tendermint_node = start_tendermint(&mut spawner, &config); + start_tendermint(&mut spawner, &config); // Start oracle if necessary - let (eth_oracle_channels, eth_oracle) = + let eth_oracle_channels = match maybe_start_ethereum_oracle(&mut spawner, &config).await { - EthereumOracleTask::NotEnabled { handle } => (None, handle), - EthereumOracleTask::Enabled { handle, channels } => { - (Some(channels), handle) - } + EthereumOracleTask::NotEnabled => None, + EthereumOracleTask::Enabled { channels } => Some(channels), }; tracing::info!("Loading MASP verifying keys."); @@ -463,7 +460,7 @@ async fn run_aux( // Start ABCI server and broadcaster (the latter only if we are a validator // node) - let [abci, broadcaster, shell_handler] = start_abci_broadcaster_shell( + start_abci_broadcaster_shell( &mut spawner, eth_oracle_channels, wasm_dir, @@ -471,41 +468,7 @@ async fn run_aux( config, ); - // Wait for interrupt signal or abort message - let aborted = spawner.wait_for_abort().await.child_terminated(); - - // Wait for all managed tasks to finish. - let res = tokio::try_join!(tendermint_node, abci, eth_oracle, broadcaster); - - match res { - Ok((tendermint_res, abci_res, _, _)) => { - // we ignore errors on user-initiated shutdown - if aborted { - if let Err(err) = tendermint_res { - tracing::error!("Tendermint error: {}", err); - } - if let Err(err) = abci_res { - tracing::error!("ABCI error: {}", err); - } - } - } - Err(err) => { - // Ignore cancellation errors - if !err.is_cancelled() { - tracing::error!("Ledger error: {}", err); - } - } - } - - tracing::info!("Namada ledger node has shut down."); - - match shell_handler.await { - Err(err) if err.is_panic() => { - std::panic::resume_unwind(err.into_panic()) - } - Err(err) => tracing::error!("Shell error: {err}"), - _ => {} - } + spawner.run_to_completion().await; } /// A [`RunAuxSetup`] stores some variables used to start child @@ -620,7 +583,7 @@ fn start_abci_broadcaster_shell( wasm_dir: PathBuf, setup_data: RunAuxSetup, config: config::Ledger, -) -> [task::JoinHandle>; 3] { +) { let rpc_address = convert_tm_addr_to_socket_addr(&config.cometbft.rpc.laddr); let RunAuxSetup { @@ -636,7 +599,7 @@ fn start_abci_broadcaster_shell( let genesis_time = DateTimeUtc::try_from(config.genesis_time.clone()) .expect("Should be able to parse genesis time"); // Start broadcaster - let broadcaster = if matches!( + if matches!( config.shell.tendermint_mode, TendermintMode::Validator { .. } ) { @@ -659,10 +622,8 @@ fn start_abci_broadcaster_shell( .with_cleanup(async move { let _ = bc_abort_send.send(()); }) - .spawn() - } else { - spawn_dummy_task() - }; + .spawn(); + } // Setup DB cache, it must outlive the DB instance that's in the shell let db_cache = rocksdb::Cache::new_lru_cache( @@ -690,7 +651,7 @@ fn start_abci_broadcaster_shell( let (abci_abort_send, abci_abort_recv) = tokio::sync::oneshot::channel(); // Start the ABCI server - let abci = spawner + spawner .abortable("ABCI", move |aborter| async move { let res = run_abci( abci_service, @@ -709,7 +670,7 @@ fn start_abci_broadcaster_shell( .spawn(); // Start the shell in a new OS thread - let shell_handler = spawner + spawner .abortable("Shell", move |_aborter| { tracing::info!("Namada ledger node started."); match tendermint_mode { @@ -723,9 +684,13 @@ fn start_abci_broadcaster_shell( shell.run(); Ok(()) }) + .with_cleanup(async { + tracing::info!("Namada ledger node has shut down."); + }) + // NB: pin the shell's task to allow + // resuming unwinding on panic + .pin() .spawn_blocking(); - - [abci, broadcaster, shell_handler] } /// Runs the an asynchronous ABCI server with four sub-components for consensus, @@ -771,10 +736,7 @@ async fn run_abci( /// Launches a new task managing a Tendermint process into the asynchronous /// runtime, and returns its [`task::JoinHandle`]. -fn start_tendermint( - spawner: &mut AbortableSpawner, - config: &config::Ledger, -) -> task::JoinHandle> { +fn start_tendermint(spawner: &mut AbortableSpawner, config: &config::Ledger) { let tendermint_dir = config.cometbft_dir(); let chain_id = config.chain_id.clone(); let proxy_app_address = config.cometbft.proxy_app.to_string(); @@ -829,22 +791,14 @@ fn start_tendermint( } } }) - .spawn() + .spawn(); } /// Represents a [`tokio::task`] in which an Ethereum oracle may be running, and /// if so, channels for communicating with it. enum EthereumOracleTask { - NotEnabled { - // TODO(namada#459): we have to return a dummy handle for the moment, - // until `run_aux` is refactored - at which point, we no longer need an - // enum to represent the Ethereum oracle being on/off. - handle: task::JoinHandle>, - }, - Enabled { - handle: task::JoinHandle>, - channels: EthereumOracleChannels, - }, + NotEnabled, + Enabled { channels: EthereumOracleChannels }, } /// Potentially starts an Ethereum event oracle. @@ -856,9 +810,7 @@ async fn maybe_start_ethereum_oracle( config.shell.tendermint_mode, TendermintMode::Validator { .. } ) { - return EthereumOracleTask::NotEnabled { - handle: spawn_dummy_task(), - }; + return EthereumOracleTask::NotEnabled; } let ethereum_url = config.ethereum_bridge.oracle_rpc_endpoint.clone(); @@ -872,7 +824,7 @@ async fn maybe_start_ethereum_oracle( match config.ethereum_bridge.mode { ethereum_bridge::ledger::Mode::RemoteEndpoint => { - let handle = oracle::run_oracle::>( + oracle::run_oracle::>( ethereum_url, eth_sender, control_receiver, @@ -881,7 +833,6 @@ async fn maybe_start_ethereum_oracle( ); EthereumOracleTask::Enabled { - handle, channels: EthereumOracleChannels::new( eth_receiver, control_sender, @@ -893,7 +844,7 @@ async fn maybe_start_ethereum_oracle( let (oracle_abort_send, oracle_abort_recv) = tokio::sync::oneshot::channel::>( ); - let handle = spawner + spawner .abortable( "Ethereum Events Endpoint", move |aborter| async move { @@ -934,7 +885,6 @@ async fn maybe_start_ethereum_oracle( }) .spawn(); EthereumOracleTask::Enabled { - handle, channels: EthereumOracleChannels::new( eth_receiver, control_sender, @@ -942,9 +892,7 @@ async fn maybe_start_ethereum_oracle( ), } } - ethereum_bridge::ledger::Mode::Off => EthereumOracleTask::NotEnabled { - handle: spawn_dummy_task(), - }, + ethereum_bridge::ledger::Mode::Off => EthereumOracleTask::NotEnabled, } } @@ -979,9 +927,3 @@ pub fn test_genesis_files( initializer.run_validation(chain_id, genesis); initializer.report(); } - -/// Spawn a dummy asynchronous task into the runtime, -/// which will resolve instantly. -fn spawn_dummy_task() -> task::JoinHandle> { - tokio::spawn(async { std::future::ready(Ok(())).await }) -}