From 4b4e10e2c83faebcf94c1aa870b01f6a3778ebd9 Mon Sep 17 00:00:00 2001 From: sifnoc Date: Wed, 6 Dec 2023 11:21:51 +0000 Subject: [PATCH] fix: more error handling; added retry on executor --- .github/workflows/rust.yml | 23 ++++++++++++++++++++ src/aggregation_merkle_sum_tree.rs | 3 ++- src/executor/local_spawner.rs | 27 ++++++++++++++++------- src/executor/mod.rs | 35 +++++++++++++++++++----------- src/orchestrator/mod.rs | 22 ++++++++++--------- src/orchestrator/test.rs | 28 ++++++++++++++++++------ 6 files changed, 99 insertions(+), 39 deletions(-) create mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..f7ffac5 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,23 @@ +name: Aggregation Test + +on: + push: + branches: ["*"] + pull_request: + branches: ["*"] + +env: + CARGO_TERM_COLOR: always + +jobs: + build-and-test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + + - name: Pull mini-tree image + run: docker pull summadev/summa-aggregation-mini-tree:latest + + - name: Run tests + run: cargo test --features docker diff --git a/src/aggregation_merkle_sum_tree.rs b/src/aggregation_merkle_sum_tree.rs index 9880f73..3a5abbb 100644 --- a/src/aggregation_merkle_sum_tree.rs +++ b/src/aggregation_merkle_sum_tree.rs @@ -48,7 +48,7 @@ impl Tree &[Entry] { - &self.mini_trees[0].entries() + self.mini_trees[0].entries() } fn get_entry(&self, user_index: usize) -> &Entry { @@ -97,6 +97,7 @@ impl Tree Self { + let docker = match env::var("DOCKER_HOST") { + // Read `DOCKER_HOST` environment variable as default + Ok(host) => Docker::connect_with_http_defaults() + .unwrap_or_else(|_| panic!("Failed to connect to {} for using Docker", host)), + _ => Docker::connect_with_local_defaults() + .unwrap_or_else(|_| panic!("Failed to connect to Docker")), + }; + LocalSpawner { - docker: Docker::connect_with_local_defaults().unwrap(), + docker, worker_counter: AtomicUsize::new(0), image_name, container_name, @@ -52,6 +61,8 @@ impl LocalSpawner { platform: None, }; + println!("docker-info: {:?}", docker.info().await?); + docker .create_container(Some(create_container_options), config.clone()) .await?; @@ -66,6 +77,8 @@ impl LocalSpawner { let container_info: ContainerInspectResponse = docker.inspect_container(&container_name, None).await?; + println!("container_info: {:?}", container_info); + Ok(container_info) } } @@ -124,19 +137,17 @@ impl ExecutorSpawner for LocalSpawner { } } +#[cfg(feature = "docker")] #[cfg(test)] mod tests { use super::*; - use std::sync::atomic::AtomicUsize; #[tokio::test] async fn test_executor_spawner() { - let spawner = LocalSpawner { - docker: Docker::connect_with_local_defaults().unwrap(), - worker_counter: AtomicUsize::new(0), - image_name: "summadev/summa-aggregation-mini-tree:latest".to_string(), // Should exist on local registry - container_name: "mini_tree_generator".to_string(), - }; + let spawner = LocalSpawner::new( + "summadev/summa-aggregation-mini-tree:latest".to_string(), + "executor_test".to_string(), + ); // Spawn 2 executors let executor_1 = spawner.spawn_executor().await; diff --git a/src/executor/mod.rs b/src/executor/mod.rs index f1ca432..cd986cd 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -12,6 +12,7 @@ pub use spawner::ExecutorSpawner; use reqwest::Client; use std::error::Error; +use tokio::time::{sleep, Duration}; use crate::json_mst::{JsonEntry, JsonMerkleSumTree}; use summa_backend::merkle_sum_tree::MerkleSumTree; @@ -48,21 +49,29 @@ impl Executor { [usize; N_CURRENCIES + 1]: Sized, [usize; N_CURRENCIES + 2]: Sized, { - let response = self - .client - .post(&self.url) - .json(&json_entries) - .send() - .await - .map_err(|err| Box::new(err) as Box)?; + const MAX_RETRIES: u32 = 5; + const RETRY_DELAY: Duration = Duration::from_secs(1); - let json_tree = response - .json::() - .await - .map_err(|err| Box::new(err) as Box)?; + let mut attempts = 0; + loop { + attempts += 1; + let response = self.client.post(&self.url).json(&json_entries).send().await; - let tree = json_tree.to_mst().unwrap(); + match response { + Ok(response) => { + let json_tree = response + .json::() + .await + .map_err(|err| Box::new(err) as Box)?; - Ok(tree) + let tree = json_tree.to_mst().unwrap(); + return Ok(tree); + } + Err(_err) if attempts < MAX_RETRIES => { + sleep(RETRY_DELAY).await; + } + Err(err) => return Err(Box::new(err) as Box), + } + } } } diff --git a/src/orchestrator/mod.rs b/src/orchestrator/mod.rs index b3c60ac..5720e84 100644 --- a/src/orchestrator/mod.rs +++ b/src/orchestrator/mod.rs @@ -2,7 +2,7 @@ mod test; use futures::future::join_all; use std::{cmp::min, error::Error}; -use summa_backend::merkle_sum_tree::{utils::parse_csv_to_entries, Cryptocurrency}; +use summa_backend::merkle_sum_tree::{utils::parse_csv_to_entries, Cryptocurrency, MerkleSumTree}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; @@ -87,7 +87,6 @@ impl Orchestrator Orchestrator> = + ordered_tree_results.into_iter().flatten().collect(); + + // Occur error if the number of mini_tree in 'all_merkle_sum_tree' is not equal to the number of entry_csvs. + if all_merkle_sum_tree.len() != self.entry_csvs.len() { + return Err("Mismatch in generated mini tree counts and given CSV counts".into()); + } AggregationMerkleSumTree::new( all_merkle_sum_tree, vec![ Cryptocurrency { - name: "BTC".to_string(), - chain: "mainnet".to_string(), - }, - Cryptocurrency { - name: "ETH".to_string(), - chain: "mainnet".to_string(), - }, + name: "DUMMY".to_string(), + chain: "ETH".to_string(), + }; + N_CURRENCIES ], ) } diff --git a/src/orchestrator/test.rs b/src/orchestrator/test.rs index 5977598..9a74c4e 100644 --- a/src/orchestrator/test.rs +++ b/src/orchestrator/test.rs @@ -30,8 +30,14 @@ async fn test_none_exist_csv() { "./src/orchestrator/csv/no_exist.csv".to_string(), ], ); - let one_mini_tree_result = orchestrator.create_aggregation_mst(2).await.unwrap(); - assert_eq!(&0, one_mini_tree_result.depth()); + match orchestrator.create_aggregation_mst(2).await { + Ok(_) => panic!("Expected an error"), + Err(e) => { + assert!(e + .to_string() + .contains("Mismatch in generated mini tree counts and given CSV counts")); + } + } } #[tokio::test] @@ -46,15 +52,22 @@ async fn test_none_exist_worker() { "./src/orchestrator/csv/entry_16_2.csv".to_string(), ], ); - let empty_mini_tree_error = orchestrator.create_aggregation_mst(2).await.unwrap_err(); - assert_eq!("Empty mini tree inputs", empty_mini_tree_error.to_string()); + + match orchestrator.create_aggregation_mst(2).await { + Ok(_) => panic!("Expected an error"), + Err(e) => { + assert!(e + .to_string() + .contains("Mismatch in generated mini tree counts and given CSV counts")); + } + } } #[cfg(feature = "docker")] #[tokio::test] async fn test_with_containers() { let spawner = LocalSpawner::new( - "summa-aggregation".to_string(), + "summadev/summa-aggregation-mini-tree:latest".to_string(), "orchestrator_test".to_string(), ); @@ -66,8 +79,9 @@ async fn test_with_containers() { ], ); let aggregation_merkle_sum_tree = orchestrator.create_aggregation_mst(2).await.unwrap(); + assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len()); - assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len()); + assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(1).entries().len()); } #[cfg(feature = "docker-swarm")] @@ -88,5 +102,5 @@ async fn test_with_swarm_service() { ); let aggregation_merkle_sum_tree = orchestrator.create_aggregation_mst(2).await.unwrap(); assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len()); - assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len()); + assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(1).entries().len()); }