Skip to content

Commit

Permalink
[feat]: Reworks the way the ledger waits for genesis start. It now fu…
Browse files Browse the repository at this point in the history
…lly initializes the node and outputs logs before sleeping until genesis start time
  • Loading branch information
batconjurer committed Feb 1, 2024
1 parent 720304b commit 2ee5570
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 52 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ test-e2e:
NAMADA_E2E_USE_PREBUILT_BINARIES=$(NAMADA_E2E_USE_PREBUILT_BINARIES) \
NAMADA_E2E_DEBUG=$(NAMADA_E2E_DEBUG) \
RUST_BACKTRACE=$(RUST_BACKTRACE) \
$(cargo) +$(nightly) test e2e::$(TEST_FILTER) \
$(cargo) +$(nightly) test $(jobs) e2e::$(TEST_FILTER) \
-Z unstable-options \
-- \
--test-threads=1 \
Expand Down
16 changes: 14 additions & 2 deletions crates/apps/src/lib/node/ledger/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::net::SocketAddr;
use std::ops::ControlFlow;

use namada::types::control_flow::time;
use namada::types::time::{DateTimeUtc, Utc};
use tokio::sync::mpsc::UnboundedReceiver;

use crate::facade::tendermint_rpc::{Client, HttpClient};
Expand All @@ -27,7 +28,15 @@ impl Broadcaster {

/// Loop forever, broadcasting messages that have been received
/// by the receiver
async fn run_loop(&mut self) {
async fn run_loop(&mut self, genesis_time: DateTimeUtc) {
// wait for start time if necessary
if let Ok(sleep_time) =
genesis_time.0.signed_duration_since(Utc::now()).to_std()
{
if !sleep_time.is_zero() {
tokio::time::sleep(sleep_time).await;
}
}
let result = time::Sleep {
strategy: time::ExponentialBackoff {
base: 2,
Expand Down Expand Up @@ -62,6 +71,8 @@ impl Broadcaster {
if let Err(()) = result {
tracing::error!("Broadcaster failed to connect to CometBFT node");
return;
} else {
tracing::info!("Broadcaster successfully started.");
}
loop {
if let Some(msg) = self.receiver.recv().await {
Expand All @@ -75,10 +86,11 @@ impl Broadcaster {
pub async fn run(
&mut self,
abort_recv: tokio::sync::oneshot::Receiver<()>,
genesis_time: DateTimeUtc,
) {
tracing::info!("Starting broadcaster.");
tokio::select! {
_ = self.run_loop() => {
_ = self.run_loop(genesis_time) => {
tracing::error!("Broadcaster unexpectedly shut down.");
tracing::info!("Shutting down broadcaster...");
},
Expand Down
46 changes: 4 additions & 42 deletions crates/apps/src/lib/node/ledger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::future::TryFutureExt;
use namada::eth_bridge::ethers::providers::{Http, Provider};
use namada::governance::storage::keys as governance_storage;
use namada::types::storage::Key;
use namada::types::time::{DateTimeUtc, Utc};
use namada::types::time::DateTimeUtc;
use namada_sdk::tendermint::abci::request::CheckTxKind;
use once_cell::unsync::Lazy;
use sysinfo::{RefreshKind, System, SystemExt};
Expand Down Expand Up @@ -242,12 +242,6 @@ pub fn rollback(config: config::Ledger) -> Result<(), shell::Error> {
///
/// All must be alive for correct functioning.
async fn run_aux(config: config::Ledger, wasm_dir: PathBuf) {
// wait for genesis time
let genesis_time = DateTimeUtc::try_from(config.genesis_time.clone())
.expect("Should be able to parse genesis time");
if let std::ops::ControlFlow::Break(_) = sleep_until(genesis_time).await {
return;
}
let setup_data = run_aux_setup(&config, &wasm_dir).await;

// Create an `AbortableSpawner` for signalling shut down from the shell or
Expand Down Expand Up @@ -441,7 +435,8 @@ fn start_abci_broadcaster_shell(
// Channels for validators to send protocol txs to be broadcast to the
// broadcaster service
let (broadcaster_sender, broadcaster_receiver) = mpsc::unbounded_channel();

let genesis_time = DateTimeUtc::try_from(config.genesis_time.clone())
.expect("Should be able to parse genesis time");
// Start broadcaster
let broadcaster = if matches!(
config.shell.tendermint_mode,
Expand All @@ -456,7 +451,7 @@ fn start_abci_broadcaster_shell(
// the ledger
let mut broadcaster =
Broadcaster::new(rpc_address, broadcaster_receiver);
broadcaster.run(bc_abort_recv).await;
broadcaster.run(bc_abort_recv, genesis_time).await;
tracing::info!("Broadcaster is no longer running.");

drop(aborter);
Expand Down Expand Up @@ -788,36 +783,3 @@ pub fn test_genesis_files(
fn spawn_dummy_task<T: Send + 'static>(ready: T) -> task::JoinHandle<T> {
tokio::spawn(async { std::future::ready(ready).await })
}

/// Sleep until the genesis time if necessary.
async fn sleep_until(time: DateTimeUtc) -> std::ops::ControlFlow<()> {
// Sleep until start time if needed
let sleep = async {
if let Ok(sleep_time) =
time.0.signed_duration_since(Utc::now()).to_std()
{
if !sleep_time.is_zero() {
tracing::info!(
"Waiting for ledger genesis time: {:?}, time left: {:?}",
time,
sleep_time
);
tokio::time::sleep(sleep_time).await
}
}
};
let shutdown_signal = async {
let (tx, rx) = tokio::sync::oneshot::channel();
namada_sdk::control_flow::shutdown_send(tx).await;
rx.await
};
tokio::select! {
_ = shutdown_signal => {
std::ops::ControlFlow::Break(())
}
_ = sleep => {
tracing::info!("Genesis time reached, starting ledger");
std::ops::ControlFlow::Continue(())
}
}
}
40 changes: 33 additions & 7 deletions crates/apps/src/lib/node/ledger/shims/abcipp_shim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use namada::tx::Tx;
use namada::types::hash::Hash;
use namada::types::key::tm_raw_hash_to_string;
use namada::types::storage::{BlockHash, BlockHeight};
use namada::types::time::Utc;
use namada_sdk::types::time::DateTimeUtc;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
use tower::Service;
Expand Down Expand Up @@ -182,6 +184,7 @@ impl AbcippShim {
Err(err) => Err(err),
},
};

let resp = resp.map_err(|e| e.into());
if resp_sender.send(resp).is_err() {
tracing::info!("ABCI response channel is closed")
Expand Down Expand Up @@ -292,20 +295,43 @@ impl AbciService {
/// forward it normally.
fn forward_request(&mut self, req: Req) -> <Self as Service<Req>>::Future {
let (resp_send, recv) = tokio::sync::oneshot::channel();
let result = self.shell_send.send((req, resp_send));

let shell_send = self.shell_send.clone();
async move {
let genesis_time = if let Req::InitChain(ref init) = req {
Some(
DateTimeUtc::try_from(init.time)
.expect("Should be able to parse genesis time."),
)
} else {
None
};
let result = shell_send.send((req, resp_send));
if let Err(err) = result {
// The shell has shut-down
return Err(err.into());
}
match recv.await {
Ok(resp) => resp,
Err(err) => {
recv.await
.unwrap_or_else(|err| {
tracing::info!("ABCI response channel didn't respond");
Err(err.into())
}
}
})
.map(|res| {
// emit a log line stating that we are sleeping until
// genesis.
if let Some(Ok(sleep_time)) = genesis_time
.map(|t| t.0.signed_duration_since(Utc::now()).to_std())
{
if !sleep_time.is_zero() {
tracing::info!(
"Waiting for ledger genesis time: {:?}, time \
left: {:?}",
genesis_time.unwrap(),
sleep_time
);
}
}
res
})
}
.boxed()
}
Expand Down

0 comments on commit 2ee5570

Please sign in to comment.