diff --git a/Cargo.lock b/Cargo.lock index 3d6cdd9f93e..e9352059bd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2215,6 +2215,7 @@ name = "mithril-end-to-end" version = "0.1.29" dependencies = [ "anyhow", + "async-recursion", "async-trait", "clap", "glob", diff --git a/mithril-test-lab/mithril-end-to-end/Cargo.toml b/mithril-test-lab/mithril-end-to-end/Cargo.toml index 7d35ad6e9b8..601c2e26c5f 100644 --- a/mithril-test-lab/mithril-end-to-end/Cargo.toml +++ b/mithril-test-lab/mithril-end-to-end/Cargo.toml @@ -15,6 +15,7 @@ bench = false [dependencies] anyhow = "1.0.71" +async-recursion = "1.0.4" async-trait = "0.1.52" clap = { version = "4.0.18", features = ["derive"] } glob = "0.3" diff --git a/mithril-test-lab/mithril-end-to-end/src/bin/load-aggregator/main.rs b/mithril-test-lab/mithril-end-to-end/src/bin/load-aggregator/main.rs index d54b885643c..6c90b451e4c 100644 --- a/mithril-test-lab/mithril-end-to-end/src/bin/load-aggregator/main.rs +++ b/mithril-test-lab/mithril-end-to-end/src/bin/load-aggregator/main.rs @@ -7,6 +7,7 @@ use std::{ }; use anyhow::Context; +use async_recursion::async_recursion; use clap::Parser; use indicatif::{ProgressBar, ProgressDrawTarget}; @@ -15,13 +16,17 @@ use mithril_common::{ entities::{ Epoch, PartyId, ProtocolMessage, ProtocolParameters, SignedEntityType, SingleSignatures, }, - messages::{EpochSettingsMessage, RegisterSignatureMessage, RegisterSignerMessage}, + messages::{ + CertificateListItemMessage, EpochSettingsMessage, MithrilStakeDistributionListItemMessage, + RegisterSignatureMessage, RegisterSignerMessage, + }, test_utils::{MithrilFixture, MithrilFixtureBuilder}, StdResult, }; use mithril_end_to_end::{Aggregator, BftNode}; use reqwest::StatusCode; +use serde::Deserialize; use slog::Level; use slog_scope::{info, warn}; use thiserror::Error; @@ -40,9 +45,9 @@ macro_rules! spin_while_waiting { let probe = async move { $block }; select! { - _ = spinner => Err(String::new().into()), - _ = sleep($timeout) => Err($timeout_message.into()), - _ = probe => Ok(()) + _ = spinner => Err(String::new().into()), + _ = sleep($timeout) => Err($timeout_message.into()), + res = probe => res } }}; } @@ -129,6 +134,7 @@ pub async fn wait_for_http_response(url: &str, timeout: Duration, message: &str) while reqwest::get(url).await.is_err() { sleep(Duration::from_millis(300)).await; } + Ok(()) }, timeout, message.to_owned(), @@ -165,6 +171,7 @@ pub async fn wait_for_epoch_settings_at_epoch( _ => sleep(Duration::from_millis(300)).await, } } + Ok(()) }, timeout, format!("Waiting for epoch {epoch}"), @@ -195,6 +202,7 @@ pub async fn wait_for_pending_certificate( _ => sleep(Duration::from_millis(300)).await, } } + Ok(()) }, timeout, format!("Waiting for pending certificate"), @@ -202,6 +210,73 @@ pub async fn wait_for_pending_certificate( ) } +#[async_recursion] +async fn request_first_list_item(url: &str) -> Result +where + for<'a> I: Deserialize<'a> + Sync + Send + Clone, +{ + sleep(Duration::from_millis(300)).await; + + match reqwest::get(url).await { + Ok(response) => match response.status() { + StatusCode::OK => match response.json::>().await.as_deref() { + Ok([first_item, ..]) => Ok(first_item.to_owned()), + Ok(&[]) => request_first_list_item::(url).await, + Err(err) => Err(format!("Invalid list body : {err}")), + }, + s if s.is_server_error() => { + let message = format!( + "Server error while waiting for the Aggregator, http code: {}", + s + ); + warn!("{message}"); + Err(message) + } + _ => request_first_list_item::(url).await, + }, + Err(err) => Err(format!("Request to `{url}` failed: {err}")), + } +} + +/// Wait for certificates +pub async fn wait_for_certificates( + aggregator: &Aggregator, + timeout: Duration, +) -> StdResult { + let url = &format!("{}/certificates", aggregator.endpoint()); + spin_while_waiting!( + { + request_first_list_item::(url) + .await + .map_err(|e| e.into()) + }, + timeout, + format!("Waiting for certificates"), + format!("Aggregator did not get a response after {timeout:?} from '{url}'") + ) +} + +/// Wait for Mithril Stake Distribution artifacts +pub async fn wait_for_mithril_stake_distribution_artifacts( + aggregator: &Aggregator, + timeout: Duration, +) -> StdResult { + let url = &format!( + "{}/artifact/mithril-stake-distributions", + aggregator.endpoint() + ); + spin_while_waiting!( + { + request_first_list_item::(url) + .await + .map_err(|e| e.into()) + }, + timeout, + format!("Waiting for mithril stake distribution artifacts"), + format!("Aggregator did not get a response after {timeout:?} from '{url}'") + ) +} + pub async fn register_signers_to_aggregator( aggregator: &Aggregator, signers_fixture: &MithrilFixture, @@ -538,6 +613,12 @@ async fn main() -> StdResult<()> { .await?; assert_eq!(0, errors); + info!(">> Wait for certificates to be available..."); + wait_for_certificates(&aggregator, Duration::from_secs(30)).await?; + + info!(">> Wait for artifacts to be available..."); + wait_for_mithril_stake_distribution_artifacts(&aggregator, Duration::from_secs(30)).await?; + info!(">> All steps executed successfully, stopping all tasks..."); aggregator.stop().await.unwrap();