Skip to content

Commit

Permalink
DA: Use executor in tests (#818)
Browse files Browse the repository at this point in the history
* Use executor in tests instead of nomos node

* Executor config from node config

* Bring generics to the testing game

* Fill in missing gaps in test

* Implement testnode wrapper

* Use sleep on dispersal service instead

* Fix cfgsync

* Clippy happy

* Clippy happy tests

* Mixnet config in tests for validator

* Tests: General config and multiple nodes (#832)

* Use executor in tests instead of nomos node

* Bring generics to the testing game

* Fill in missing gaps in test

* Clippy happy

* Mixnet config in tests for validator

* Derive different types of configs from general in tests

* Validator and executor in cfgsync

---------

Co-authored-by: danielSanchezQ <[email protected]>

* Tests executor node mix config (#834)

* Merge branch 'master' into tests-executor-node-mix-config

* add mix configs

---------

Co-authored-by: danielSanchezQ <[email protected]>
Co-authored-by: Youngjoon Lee <[email protected]>
  • Loading branch information
3 people authored Oct 21, 2024
1 parent 83d9ef7 commit 328398c
Show file tree
Hide file tree
Showing 25 changed files with 1,380 additions and 877 deletions.
14 changes: 10 additions & 4 deletions clients/executor-http-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
// crates
use reqwest::{Client, ClientBuilder, StatusCode, Url};
// internal
use nomos_executor::api::paths;
use nomos_executor::api::{handlers::DispersalRequest, paths};
use serde::Serialize;

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -40,15 +41,20 @@ impl ExecutorHttpClient {
}

/// Send a `Blob` to be dispersed
pub async fn publish_blob(&self, blob: Vec<u8>) -> Result<(), Error> {
pub async fn publish_blob<Metadata: Serialize>(
&self,
data: Vec<u8>,
metadata: Metadata,
) -> Result<(), Error> {
let req = DispersalRequest { data, metadata };
let url = self
.executor_address
.join(paths::DA_ADD_BLOB)
.join(paths::DISPERSE_DATA)
.expect("Url should build properly");
let response = self
.client
.post(url)
.body(blob)
.json(&req)
.send()
.await
.map_err(Error::Request)?;
Expand Down
12 changes: 11 additions & 1 deletion nodes/nomos-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ use nomos_mempool::backend::mockpool::MockPool;
use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
use nomos_mix_service::MixService;
use nomos_node::*;
use nomos_node::DispersedBlobInfo;
use nomos_node::HeaderId;
use nomos_node::MempoolNetworkAdapter;
#[cfg(feature = "metrics")]
use nomos_node::Metrics;
use nomos_node::NetworkBackend;
use nomos_node::{
BlobInfo, Cryptarchia, DaIndexer, DaMempool, DaNetworkService, DaSampling, DaVerifier, Logger,
NetworkService, NomosDaMembership, RocksBackend, StorageService, SystemSig, Tx, TxMempool,
Wire, MB16,
};
use overwatch_derive::Services;
use overwatch_rs::services::handle::ServiceHandle;

Expand Down
2 changes: 1 addition & 1 deletion nomos-da/network/core/src/swarm/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ where
pub async fn run(mut self) {
loop {
if let Some(event) = self.swarm.next().await {
debug!("Da swarm event received: {event:?}");
tracing::info!("Da swarm event received: {event:?}");
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_behaviour_event(behaviour_event).await;
Expand Down
3 changes: 3 additions & 0 deletions nomos-services/data-availability/dispersal/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::adapters::{mempool::DaMempoolAdapter, network::DispersalNetworkAdapter};
use std::time::Duration;

use nomos_core::da::{blob::metadata, DaDispersal, DaEncoder};
use overwatch_rs::DynError;
Expand Down Expand Up @@ -42,6 +43,8 @@ pub trait DispersalBackend {
) -> Result<(), DynError> {
let (blob_id, encoded_data) = self.encode(data).await?;
self.disperse(encoded_data).await?;
// let disperse and replication happen before pushing to mempool
tokio::time::sleep(Duration::from_secs(1)).await;
self.publish_to_mempool(blob_id, metadata).await?;
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use std::time::Duration;
use subnetworks_assignations::MembershipHandler;
use tokio::sync::broadcast;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -148,6 +149,7 @@ where

let task = overwatch_handle.runtime().spawn(executor_swarm.run());

std::thread::sleep(Duration::from_secs(1));
// open streams to dispersal peers
for peer_id in dispersal_peers.iter() {
executor_open_stream_sender.send(*peer_id).unwrap();
Expand Down
2 changes: 2 additions & 0 deletions testnet/cfgsync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ edition = "2021"
[dependencies]
axum = { version = "0.6" }
clap = { version = "4", features = ["derive"] }
nomos-executor = { path = "../../nodes/nomos-executor" }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-node = { path = "../../nodes/nomos-node" }
rand = "0.8"
reqwest = { version = "0.11", features = ["json", "rustls-tls"] }
tests = { path = "../../tests" }
tokio = { version = "1.24", features = ["rt-multi-thread"] }
Expand Down
20 changes: 17 additions & 3 deletions testnet/cfgsync/src/bin/cfgsync-client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// std
use std::{env, fs, net::Ipv4Addr, process};
// crates
use nomos_node::Config as NodeConfig;
use nomos_executor::config::Config as ExecutorConfig;
use nomos_node::Config as ValidatorConfig;
use reqwest::Client;
use serde::{de::DeserializeOwned, Serialize};
// internal
Expand Down Expand Up @@ -57,9 +58,22 @@ async fn main() {
let server_addr = env::var("CFG_SERVER_ADDR").unwrap_or("http://127.0.0.1:4400".to_string());
let ip = parse_ip(env::var("CFG_HOST_IP").unwrap_or_else(|_| "127.0.0.1".to_string()));

let node_config_endpoint = format!("{}/node", server_addr);
let host_kind = env::var("CFG_HOST_KIND").unwrap_or_else(|_| "validator".to_string());

if let Err(err) = get_config::<NodeConfig>(ip, &node_config_endpoint, &config_file_path).await {
let node_config_endpoint = match host_kind.as_str() {
"executor" => format!("{}/executor", server_addr),
_ => format!("{}/validator", server_addr),
};

let config_result = match host_kind.as_str() {
"executor" => {
get_config::<ExecutorConfig>(ip, &node_config_endpoint, &config_file_path).await
}
_ => get_config::<ValidatorConfig>(ip, &node_config_endpoint, &config_file_path).await,
};

// Handle error if the config request fails
if let Err(err) = config_result {
eprintln!("Error: {}", err);
process::exit(1);
}
Expand Down
52 changes: 39 additions & 13 deletions testnet/cfgsync/src/bin/cfgsync-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use cfgsync::config::Host;
use cfgsync::repo::{ConfigRepo, RepoResponse};
use clap::Parser;
use serde::{Deserialize, Serialize};
use tests::{ConsensusConfig, DaConfig};
use tests::nodes::executor::create_executor_config;
use tests::nodes::validator::create_validator_config;
use tests::topology::configs::consensus::ConsensusParams;
use tests::topology::configs::da::DaParams;
use tokio::sync::oneshot::channel;
// internal

Expand Down Expand Up @@ -50,16 +53,16 @@ impl CfgSyncConfig {
.map_err(|err| format!("Failed to parse config file: {}", err))
}

fn to_consensus_config(&self) -> ConsensusConfig {
ConsensusConfig {
fn to_consensus_params(&self) -> ConsensusParams {
ConsensusParams {
n_participants: self.n_hosts,
security_param: self.security_param,
active_slot_coeff: self.active_slot_coeff,
}
}

fn to_da_config(&self) -> DaConfig {
DaConfig {
fn to_da_params(&self) -> DaParams {
DaParams {
subnetwork_size: self.subnetwork_size,
dispersal_factor: self.dispersal_factor,
num_samples: self.num_samples,
Expand All @@ -76,21 +79,43 @@ struct ClientIp {
ip: Ipv4Addr,
}

async fn node_config(
async fn validator_config(
State(config_repo): State<Arc<ConfigRepo>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let ClientIp { ip } = payload;

let (reply_tx, reply_rx) = channel();
config_repo.register(Host::default_node_from_ip(ip), reply_tx);
config_repo.register(Host::default_validator_from_ip(ip), reply_tx);

match reply_rx.await {
Ok(config_response) => match config_response {
RepoResponse::Config(config) => (StatusCode::OK, Json(config)).into_response(),
RepoResponse::Timeout => {
(StatusCode::REQUEST_TIMEOUT, Json(RepoResponse::Timeout)).into_response()
RepoResponse::Config(config) => {
let config = create_validator_config(*config);
(StatusCode::OK, Json(config)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
},
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
}
}

async fn executor_config(
State(config_repo): State<Arc<ConfigRepo>>,
Json(payload): Json<ClientIp>,
) -> impl IntoResponse {
let ClientIp { ip } = payload;

let (reply_tx, reply_rx) = channel();
config_repo.register(Host::default_executor_from_ip(ip), reply_tx);

match reply_rx.await {
Ok(config_response) => match config_response {
RepoResponse::Config(config) => {
let config = create_executor_config(*config);
(StatusCode::OK, Json(config)).into_response()
}
RepoResponse::Timeout => (StatusCode::REQUEST_TIMEOUT).into_response(),
},
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Error receiving config").into_response(),
}
Expand All @@ -104,8 +129,8 @@ async fn main() {
eprintln!("{}", err);
process::exit(1);
});
let consensus_config = config.to_consensus_config();
let da_config = config.to_da_config();
let consensus_config = config.to_consensus_params();
let da_config = config.to_da_params();

let config_repo = ConfigRepo::new(
config.n_hosts,
Expand All @@ -114,7 +139,8 @@ async fn main() {
Duration::from_secs(config.timeout),
);
let app = Router::new()
.route("/node", post(node_config))
.route("/validator", post(validator_config))
.route("/executor", post(executor_config))
.with_state(config_repo.clone());

println!("Server running on http://0.0.0.0:{}", config.port);
Expand Down
Loading

0 comments on commit 328398c

Please sign in to comment.