Skip to content

Commit

Permalink
Add wait for certificates and artifacts
Browse files Browse the repository at this point in the history
  • Loading branch information
jpraynaud committed Jul 24, 2023
1 parent f3e0cd7 commit 3493ad6
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mithril-test-lab/mithril-end-to-end/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bench = false

[dependencies]
anyhow = "1.0.71"
async-recursion = "1.0.4"
async-trait = "0.1.52"
clap = { version = "4.0.18", features = ["derive"] }
glob = "0.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use anyhow::Context;
use async_recursion::async_recursion;
use clap::Parser;

use indicatif::{ProgressBar, ProgressDrawTarget};
Expand All @@ -15,13 +16,17 @@ use mithril_common::{
entities::{
Epoch, PartyId, ProtocolMessage, ProtocolParameters, SignedEntityType, SingleSignatures,
},
messages::{EpochSettingsMessage, RegisterSignatureMessage, RegisterSignerMessage},
messages::{
CertificateListItemMessage, EpochSettingsMessage, MithrilStakeDistributionListItemMessage,
RegisterSignatureMessage, RegisterSignerMessage,
},
test_utils::{MithrilFixture, MithrilFixtureBuilder},
StdResult,
};

use mithril_end_to_end::{Aggregator, BftNode};
use reqwest::StatusCode;
use serde::Deserialize;
use slog::Level;
use slog_scope::{info, warn};
use thiserror::Error;
Expand All @@ -40,9 +45,9 @@ macro_rules! spin_while_waiting {
let probe = async move { $block };

select! {
_ = spinner => Err(String::new().into()),
_ = sleep($timeout) => Err($timeout_message.into()),
_ = probe => Ok(())
_ = spinner => Err(String::new().into()),
_ = sleep($timeout) => Err($timeout_message.into()),
res = probe => res
}
}};
}
Expand Down Expand Up @@ -129,6 +134,7 @@ pub async fn wait_for_http_response(url: &str, timeout: Duration, message: &str)
while reqwest::get(url).await.is_err() {
sleep(Duration::from_millis(300)).await;
}
Ok(())
},
timeout,
message.to_owned(),
Expand Down Expand Up @@ -165,6 +171,7 @@ pub async fn wait_for_epoch_settings_at_epoch(
_ => sleep(Duration::from_millis(300)).await,
}
}
Ok(())
},
timeout,
format!("Waiting for epoch {epoch}"),
Expand Down Expand Up @@ -195,13 +202,81 @@ pub async fn wait_for_pending_certificate(
_ => sleep(Duration::from_millis(300)).await,
}
}
Ok(())
},
timeout,
format!("Waiting for pending certificate"),
format!("Aggregator did not get a response after {timeout:?} from '{url}'")
)
}

#[async_recursion]
async fn request_first_list_item<I>(url: &str) -> Result<I, String>
where
for<'a> I: Deserialize<'a> + Sync + Send + Clone,
{
sleep(Duration::from_millis(300)).await;

match reqwest::get(url).await {
Ok(response) => match response.status() {
StatusCode::OK => match response.json::<Vec<I>>().await.as_deref() {
Ok([first_item, ..]) => Ok(first_item.to_owned()),
Ok(&[]) => request_first_list_item::<I>(url).await,
Err(err) => Err(format!("Invalid list body : {err}")),
},
s if s.is_server_error() => {
let message = format!(
"Server error while waiting for the Aggregator, http code: {}",
s
);
warn!("{message}");
Err(message)
}
_ => request_first_list_item::<I>(url).await,
},
Err(err) => Err(format!("Request to `{url}` failed: {err}")),
}
}

/// Wait for certificates
pub async fn wait_for_certificates(
aggregator: &Aggregator,
timeout: Duration,
) -> StdResult<CertificateListItemMessage> {
let url = &format!("{}/certificates", aggregator.endpoint());
spin_while_waiting!(
{
request_first_list_item::<CertificateListItemMessage>(url)
.await
.map_err(|e| e.into())
},
timeout,
format!("Waiting for certificates"),
format!("Aggregator did not get a response after {timeout:?} from '{url}'")
)
}

/// Wait for Mithril Stake Distribution artifacts
pub async fn wait_for_mithril_stake_distribution_artifacts(
aggregator: &Aggregator,
timeout: Duration,
) -> StdResult<MithrilStakeDistributionListItemMessage> {
let url = &format!(
"{}/artifact/mithril-stake-distributions",
aggregator.endpoint()
);
spin_while_waiting!(
{
request_first_list_item::<MithrilStakeDistributionListItemMessage>(url)
.await
.map_err(|e| e.into())
},
timeout,
format!("Waiting for mithril stake distribution artifacts"),
format!("Aggregator did not get a response after {timeout:?} from '{url}'")
)
}

pub async fn register_signers_to_aggregator(
aggregator: &Aggregator,
signers_fixture: &MithrilFixture,
Expand Down Expand Up @@ -538,6 +613,12 @@ async fn main() -> StdResult<()> {
.await?;
assert_eq!(0, errors);

info!(">> Wait for certificates to be available...");
wait_for_certificates(&aggregator, Duration::from_secs(30)).await?;

info!(">> Wait for artifacts to be available...");
wait_for_mithril_stake_distribution_artifacts(&aggregator, Duration::from_secs(30)).await?;

info!(">> All steps executed successfully, stopping all tasks...");

aggregator.stop().await.unwrap();
Expand Down

0 comments on commit 3493ad6

Please sign in to comment.