diff --git a/.changelog/unreleased/improvements/2502-wait-for-genesis-logs.md b/.changelog/unreleased/improvements/2502-wait-for-genesis-logs.md new file mode 100644 index 0000000000..ca54776342 --- /dev/null +++ b/.changelog/unreleased/improvements/2502-wait-for-genesis-logs.md @@ -0,0 +1,4 @@ +- Reworks the way the ledger waits for genesis start. It now fully initializes the node and + outputs logs before sleeping until genesis start time. Previously it would not start any + processes until genesis times, giving no feedback to users until genesis time was reached. + ([\#2502](https://github.com/anoma/namada/pull/2502)) \ No newline at end of file diff --git a/Makefile b/Makefile index 3dc399433d..f1ea9b7480 100644 --- a/Makefile +++ b/Makefile @@ -168,7 +168,7 @@ test-e2e: NAMADA_E2E_USE_PREBUILT_BINARIES=$(NAMADA_E2E_USE_PREBUILT_BINARIES) \ NAMADA_E2E_DEBUG=$(NAMADA_E2E_DEBUG) \ RUST_BACKTRACE=$(RUST_BACKTRACE) \ - $(cargo) +$(nightly) test e2e::$(TEST_FILTER) \ + $(cargo) +$(nightly) test $(jobs) e2e::$(TEST_FILTER) \ -Z unstable-options \ -- \ --test-threads=1 \ diff --git a/crates/apps/src/lib/node/ledger/broadcaster.rs b/crates/apps/src/lib/node/ledger/broadcaster.rs index 62c4083a11..6a0d9bf149 100644 --- a/crates/apps/src/lib/node/ledger/broadcaster.rs +++ b/crates/apps/src/lib/node/ledger/broadcaster.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use std::ops::ControlFlow; use namada::types::control_flow::time; +use namada::types::time::{DateTimeUtc, Utc}; use tokio::sync::mpsc::UnboundedReceiver; use crate::facade::tendermint_rpc::{Client, HttpClient}; @@ -27,7 +28,15 @@ impl Broadcaster { /// Loop forever, broadcasting messages that have been received /// by the receiver - async fn run_loop(&mut self) { + async fn run_loop(&mut self, genesis_time: DateTimeUtc) { + // wait for start time if necessary + if let Ok(sleep_time) = + genesis_time.0.signed_duration_since(Utc::now()).to_std() + { + if !sleep_time.is_zero() { + tokio::time::sleep(sleep_time).await; + } + } let result = time::Sleep { strategy: time::ExponentialBackoff { base: 2, @@ -62,6 +71,8 @@ impl Broadcaster { if let Err(()) = result { tracing::error!("Broadcaster failed to connect to CometBFT node"); return; + } else { + tracing::info!("Broadcaster successfully started."); } loop { if let Some(msg) = self.receiver.recv().await { @@ -75,10 +86,11 @@ impl Broadcaster { pub async fn run( &mut self, abort_recv: tokio::sync::oneshot::Receiver<()>, + genesis_time: DateTimeUtc, ) { tracing::info!("Starting broadcaster."); tokio::select! { - _ = self.run_loop() => { + _ = self.run_loop(genesis_time) => { tracing::error!("Broadcaster unexpectedly shut down."); tracing::info!("Shutting down broadcaster..."); }, diff --git a/crates/apps/src/lib/node/ledger/mod.rs b/crates/apps/src/lib/node/ledger/mod.rs index 7ed221f260..8075081c45 100644 --- a/crates/apps/src/lib/node/ledger/mod.rs +++ b/crates/apps/src/lib/node/ledger/mod.rs @@ -17,7 +17,7 @@ use futures::future::TryFutureExt; use namada::eth_bridge::ethers::providers::{Http, Provider}; use namada::governance::storage::keys as governance_storage; use namada::types::storage::Key; -use namada::types::time::{DateTimeUtc, Utc}; +use namada::types::time::DateTimeUtc; use namada_sdk::tendermint::abci::request::CheckTxKind; use once_cell::unsync::Lazy; use sysinfo::{RefreshKind, System, SystemExt}; @@ -242,12 +242,6 @@ pub fn rollback(config: config::Ledger) -> Result<(), shell::Error> { /// /// All must be alive for correct functioning. async fn run_aux(config: config::Ledger, wasm_dir: PathBuf) { - // wait for genesis time - let genesis_time = DateTimeUtc::try_from(config.genesis_time.clone()) - .expect("Should be able to parse genesis time"); - if let std::ops::ControlFlow::Break(_) = sleep_until(genesis_time).await { - return; - } let setup_data = run_aux_setup(&config, &wasm_dir).await; // Create an `AbortableSpawner` for signalling shut down from the shell or @@ -441,7 +435,8 @@ fn start_abci_broadcaster_shell( // Channels for validators to send protocol txs to be broadcast to the // broadcaster service let (broadcaster_sender, broadcaster_receiver) = mpsc::unbounded_channel(); - + let genesis_time = DateTimeUtc::try_from(config.genesis_time.clone()) + .expect("Should be able to parse genesis time"); // Start broadcaster let broadcaster = if matches!( config.shell.tendermint_mode, @@ -456,7 +451,7 @@ fn start_abci_broadcaster_shell( // the ledger let mut broadcaster = Broadcaster::new(rpc_address, broadcaster_receiver); - broadcaster.run(bc_abort_recv).await; + broadcaster.run(bc_abort_recv, genesis_time).await; tracing::info!("Broadcaster is no longer running."); drop(aborter); @@ -788,36 +783,3 @@ pub fn test_genesis_files( fn spawn_dummy_task(ready: T) -> task::JoinHandle { tokio::spawn(async { std::future::ready(ready).await }) } - -/// Sleep until the genesis time if necessary. -async fn sleep_until(time: DateTimeUtc) -> std::ops::ControlFlow<()> { - // Sleep until start time if needed - let sleep = async { - if let Ok(sleep_time) = - time.0.signed_duration_since(Utc::now()).to_std() - { - if !sleep_time.is_zero() { - tracing::info!( - "Waiting for ledger genesis time: {:?}, time left: {:?}", - time, - sleep_time - ); - tokio::time::sleep(sleep_time).await - } - } - }; - let shutdown_signal = async { - let (tx, rx) = tokio::sync::oneshot::channel(); - namada_sdk::control_flow::shutdown_send(tx).await; - rx.await - }; - tokio::select! { - _ = shutdown_signal => { - std::ops::ControlFlow::Break(()) - } - _ = sleep => { - tracing::info!("Genesis time reached, starting ledger"); - std::ops::ControlFlow::Continue(()) - } - } -} diff --git a/crates/apps/src/lib/node/ledger/shims/abcipp_shim.rs b/crates/apps/src/lib/node/ledger/shims/abcipp_shim.rs index 12848d86e0..53452aabbf 100644 --- a/crates/apps/src/lib/node/ledger/shims/abcipp_shim.rs +++ b/crates/apps/src/lib/node/ledger/shims/abcipp_shim.rs @@ -11,6 +11,8 @@ use namada::tx::Tx; use namada::types::hash::Hash; use namada::types::key::tm_raw_hash_to_string; use namada::types::storage::{BlockHash, BlockHeight}; +use namada::types::time::Utc; +use namada_sdk::types::time::DateTimeUtc; use tokio::sync::broadcast; use tokio::sync::mpsc::UnboundedSender; use tower::Service; @@ -182,6 +184,7 @@ impl AbcippShim { Err(err) => Err(err), }, }; + let resp = resp.map_err(|e| e.into()); if resp_sender.send(resp).is_err() { tracing::info!("ABCI response channel is closed") @@ -292,20 +295,43 @@ impl AbciService { /// forward it normally. fn forward_request(&mut self, req: Req) -> >::Future { let (resp_send, recv) = tokio::sync::oneshot::channel(); - let result = self.shell_send.send((req, resp_send)); - + let shell_send = self.shell_send.clone(); async move { + let genesis_time = if let Req::InitChain(ref init) = req { + Some( + DateTimeUtc::try_from(init.time) + .expect("Should be able to parse genesis time."), + ) + } else { + None + }; + let result = shell_send.send((req, resp_send)); if let Err(err) = result { // The shell has shut-down return Err(err.into()); } - match recv.await { - Ok(resp) => resp, - Err(err) => { + recv.await + .unwrap_or_else(|err| { tracing::info!("ABCI response channel didn't respond"); Err(err.into()) - } - } + }) + .map(|res| { + // emit a log line stating that we are sleeping until + // genesis. + if let Some(Ok(sleep_time)) = genesis_time + .map(|t| t.0.signed_duration_since(Utc::now()).to_std()) + { + if !sleep_time.is_zero() { + tracing::info!( + "Waiting for ledger genesis time: {:?}, time \ + left: {:?}", + genesis_time.unwrap(), + sleep_time + ); + } + } + res + }) } .boxed() }