Skip to content

Commit

Permalink
fix: more error handling; added retry on executor
Browse files Browse the repository at this point in the history
  • Loading branch information
sifnoc committed Dec 6, 2023
1 parent d6dd01e commit 4b4e10e
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 39 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: Aggregation Test

on:
push:
branches: ["*"]
pull_request:
branches: ["*"]

env:
CARGO_TERM_COLOR: always

jobs:
build-and-test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3

- name: Pull mini-tree image
run: docker pull summadev/summa-aggregation-mini-tree:latest

- name: Run tests
run: cargo test --features docker
3 changes: 2 additions & 1 deletion src/aggregation_merkle_sum_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl<const N_CURRENCIES: usize, const N_BYTES: usize> Tree<N_CURRENCIES, N_BYTES
}

fn entries(&self) -> &[Entry<N_CURRENCIES>] {
&self.mini_trees[0].entries()
self.mini_trees[0].entries()
}

fn get_entry(&self, user_index: usize) -> &Entry<N_CURRENCIES> {
Expand Down Expand Up @@ -97,6 +97,7 @@ impl<const N_CURRENCIES: usize, const N_BYTES: usize> Tree<N_CURRENCIES, N_BYTES
let mut current_index = mini_tree_index;
let mut path_indices = vec![Fp::from(0); self.depth];

#[allow(clippy::needless_range_loop)]
for level in 0..self.depth {
let position = current_index % 2;
path_indices[level] = Fp::from(position as u64);
Expand Down
27 changes: 19 additions & 8 deletions src/executor/local_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use bollard::{
};
use std::{
default::Default,
env,
error::Error,
future::Future,
pin::Pin,
Expand All @@ -24,8 +25,16 @@ pub struct LocalSpawner {

impl LocalSpawner {
pub fn new(image_name: String, container_name: String) -> Self {
let docker = match env::var("DOCKER_HOST") {
// Read `DOCKER_HOST` environment variable as default
Ok(host) => Docker::connect_with_http_defaults()
.unwrap_or_else(|_| panic!("Failed to connect to {} for using Docker", host)),
_ => Docker::connect_with_local_defaults()
.unwrap_or_else(|_| panic!("Failed to connect to Docker")),
};

LocalSpawner {
docker: Docker::connect_with_local_defaults().unwrap(),
docker,
worker_counter: AtomicUsize::new(0),
image_name,
container_name,
Expand All @@ -52,6 +61,8 @@ impl LocalSpawner {
platform: None,
};

println!("docker-info: {:?}", docker.info().await?);

docker
.create_container(Some(create_container_options), config.clone())
.await?;
Expand All @@ -66,6 +77,8 @@ impl LocalSpawner {
let container_info: ContainerInspectResponse =
docker.inspect_container(&container_name, None).await?;

println!("container_info: {:?}", container_info);

Ok(container_info)
}
}
Expand Down Expand Up @@ -124,19 +137,17 @@ impl ExecutorSpawner for LocalSpawner {
}
}

#[cfg(feature = "docker")]
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;

#[tokio::test]
async fn test_executor_spawner() {
let spawner = LocalSpawner {
docker: Docker::connect_with_local_defaults().unwrap(),
worker_counter: AtomicUsize::new(0),
image_name: "summadev/summa-aggregation-mini-tree:latest".to_string(), // Should exist on local registry
container_name: "mini_tree_generator".to_string(),
};
let spawner = LocalSpawner::new(
"summadev/summa-aggregation-mini-tree:latest".to_string(),
"executor_test".to_string(),
);

// Spawn 2 executors
let executor_1 = spawner.spawn_executor().await;
Expand Down
35 changes: 22 additions & 13 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use spawner::ExecutorSpawner;

use reqwest::Client;
use std::error::Error;
use tokio::time::{sleep, Duration};

use crate::json_mst::{JsonEntry, JsonMerkleSumTree};
use summa_backend::merkle_sum_tree::MerkleSumTree;
Expand Down Expand Up @@ -48,21 +49,29 @@ impl Executor {
[usize; N_CURRENCIES + 1]: Sized,
[usize; N_CURRENCIES + 2]: Sized,
{
let response = self
.client
.post(&self.url)
.json(&json_entries)
.send()
.await
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
const MAX_RETRIES: u32 = 5;
const RETRY_DELAY: Duration = Duration::from_secs(1);

let json_tree = response
.json::<JsonMerkleSumTree>()
.await
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;
let mut attempts = 0;
loop {
attempts += 1;
let response = self.client.post(&self.url).json(&json_entries).send().await;

let tree = json_tree.to_mst().unwrap();
match response {
Ok(response) => {
let json_tree = response
.json::<JsonMerkleSumTree>()
.await
.map_err(|err| Box::new(err) as Box<dyn Error + Send>)?;

Ok(tree)
let tree = json_tree.to_mst().unwrap();
return Ok(tree);
}
Err(_err) if attempts < MAX_RETRIES => {
sleep(RETRY_DELAY).await;
}
Err(err) => return Err(Box::new(err) as Box<dyn Error + Send>),
}
}
}
}
22 changes: 12 additions & 10 deletions src/orchestrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod test;

use futures::future::join_all;
use std::{cmp::min, error::Error};
use summa_backend::merkle_sum_tree::{utils::parse_csv_to_entries, Cryptocurrency};
use summa_backend::merkle_sum_tree::{utils::parse_csv_to_entries, Cryptocurrency, MerkleSumTree};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

Expand Down Expand Up @@ -87,7 +87,6 @@ impl<const N_CURRENCIES: usize, const N_BYTES: usize> Orchestrator<N_CURRENCIES,
//
let (entries_tx, mut entries_rx) = mpsc::channel(channel_size);
let (tree_tx, tree_rx) = mpsc::channel(channel_size);

// Executor
//
// Spawn executors that process entries with Worker.
Expand Down Expand Up @@ -214,19 +213,22 @@ impl<const N_CURRENCIES: usize, const N_BYTES: usize> Orchestrator<N_CURRENCIES,
// Terminate executors
self.executor_spawner.terminate_executors().await;

let all_merkle_sum_tree = ordered_tree_results.into_iter().flatten().collect();
let all_merkle_sum_tree: Vec<MerkleSumTree<N_CURRENCIES, N_BYTES>> =
ordered_tree_results.into_iter().flatten().collect();

// Occur error if the number of mini_tree in 'all_merkle_sum_tree' is not equal to the number of entry_csvs.
if all_merkle_sum_tree.len() != self.entry_csvs.len() {
return Err("Mismatch in generated mini tree counts and given CSV counts".into());
}

AggregationMerkleSumTree::new(
all_merkle_sum_tree,
vec![
Cryptocurrency {
name: "BTC".to_string(),
chain: "mainnet".to_string(),
},
Cryptocurrency {
name: "ETH".to_string(),
chain: "mainnet".to_string(),
},
name: "DUMMY".to_string(),
chain: "ETH".to_string(),
};
N_CURRENCIES
],
)
}
Expand Down
28 changes: 21 additions & 7 deletions src/orchestrator/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ async fn test_none_exist_csv() {
"./src/orchestrator/csv/no_exist.csv".to_string(),
],
);
let one_mini_tree_result = orchestrator.create_aggregation_mst(2).await.unwrap();
assert_eq!(&0, one_mini_tree_result.depth());
match orchestrator.create_aggregation_mst(2).await {
Ok(_) => panic!("Expected an error"),
Err(e) => {
assert!(e
.to_string()
.contains("Mismatch in generated mini tree counts and given CSV counts"));
}
}
}

#[tokio::test]
Expand All @@ -46,15 +52,22 @@ async fn test_none_exist_worker() {
"./src/orchestrator/csv/entry_16_2.csv".to_string(),
],
);
let empty_mini_tree_error = orchestrator.create_aggregation_mst(2).await.unwrap_err();
assert_eq!("Empty mini tree inputs", empty_mini_tree_error.to_string());

match orchestrator.create_aggregation_mst(2).await {
Ok(_) => panic!("Expected an error"),
Err(e) => {
assert!(e
.to_string()
.contains("Mismatch in generated mini tree counts and given CSV counts"));
}
}
}

#[cfg(feature = "docker")]
#[tokio::test]
async fn test_with_containers() {
let spawner = LocalSpawner::new(
"summa-aggregation".to_string(),
"summadev/summa-aggregation-mini-tree:latest".to_string(),
"orchestrator_test".to_string(),
);

Expand All @@ -66,8 +79,9 @@ async fn test_with_containers() {
],
);
let aggregation_merkle_sum_tree = orchestrator.create_aggregation_mst(2).await.unwrap();

assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len());
assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len());
assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(1).entries().len());
}

#[cfg(feature = "docker-swarm")]
Expand All @@ -88,5 +102,5 @@ async fn test_with_swarm_service() {
);
let aggregation_merkle_sum_tree = orchestrator.create_aggregation_mst(2).await.unwrap();
assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len());
assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(0).entries().len());
assert_eq!(16, aggregation_merkle_sum_tree.mini_tree(1).entries().len());
}

0 comments on commit 4b4e10e

Please sign in to comment.