Skip to content

Commit

Permalink
feat: applied rayon while flatten results
Browse files Browse the repository at this point in the history
  • Loading branch information
sifnoc committed Dec 12, 2023
1 parent 7dba825 commit 3fecd8b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
/benches/csv
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ futures = "0.3.29"
bollard = "0.15.0"
tokio-util = "0.7.10"
serde_yaml = "0.9.27"
rayon = "1.8.0"

[[bin]]
name = "mini-tree-server"
Expand Down
12 changes: 8 additions & 4 deletions src/executor/local_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::{
env,
error::Error,
future::Future,
net::{SocketAddr, TcpListener, IpAddr},
net::{IpAddr, SocketAddr, TcpListener},
pin::Pin,
sync::atomic::{AtomicUsize, Ordering}, str::FromStr,
str::FromStr,
sync::atomic::{AtomicUsize, Ordering},
};
use tokio;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -87,7 +88,10 @@ impl LocalSpawner {

let config = Config {
image: Some(image_name),
exposed_ports: Some(HashMap::from([("4000/tcp".to_string(), HashMap::<(), ()>::new())])), // Expose the container port
exposed_ports: Some(HashMap::from([(
"4000/tcp".to_string(),
HashMap::<(), ()>::new(),
)])), // Expose the container port
host_config: Some(HostConfig {
port_bindings: Some(port_bindings),
..Default::default()
Expand Down Expand Up @@ -152,7 +156,7 @@ impl ExecutorSpawner for LocalSpawner {

// Return a Future that resolves to Executor
Box::pin(async move {
// the container_info also has exposed port as 'host_port` field but it looks ugly to use it
// the container_info also has exposed port as 'host_port` field but it looks ugly to use it
let (exposed_port, container_info) = rx.await.expect("Failed to receive worker URL");
let worker_url = format!(
"http://127.0.0.1:{}", // This port is exposed to the host
Expand Down
22 changes: 10 additions & 12 deletions src/orchestrator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod test;

use futures::future::join_all;
use rayon::prelude::*;
use std::{cmp::min, error::Error};
use summa_backend::merkle_sum_tree::{utils::parse_csv_to_entries, Cryptocurrency, MerkleSumTree};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -223,21 +224,18 @@ impl<const N_CURRENCIES: usize, const N_BYTES: usize> Orchestrator<N_CURRENCIES,

let all_tree_results = join_all(all_tree_responses).await;

// Aggregate results from all workers in order
let mut ordered_tree_results = vec![None; self.entry_csvs.len()];
for result in all_tree_results {
let (index, worker_results) = result.unwrap();
let start = index * entries_per_executor;
for (i, res) in worker_results.iter().enumerate() {
ordered_tree_results[start + i] = Some(res.clone());
}
}

// Terminate executors
self.executor_spawner.terminate_executors().await;

let all_merkle_sum_tree: Vec<MerkleSumTree<N_CURRENCIES, N_BYTES>> =
ordered_tree_results.into_iter().flatten().collect();
// TODO: this merkle sum tree is not ordered, so we need to sort it
let all_merkle_sum_tree = all_tree_results
.into_par_iter()
.map(|result| {
let (_, worker_results) = result.unwrap();
worker_results
})
.flatten()
.collect::<Vec<MerkleSumTree<N_CURRENCIES, N_BYTES>>>();

// Occur error if the number of mini_tree in 'all_merkle_sum_tree' is not equal to the number of entry_csvs.
if all_merkle_sum_tree.len() != self.entry_csvs.len() {
Expand Down

0 comments on commit 3fecd8b

Please sign in to comment.