Skip to content

Commit

Permalink
feat: setup bootnodes in multi-node tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Serial-ATA committed Feb 12, 2025
1 parent 480cc63 commit 9f924ac
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 66 deletions.
8 changes: 8 additions & 0 deletions crates/config/src/context_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub enum GadgetCLICoreSettings {
#[arg(long, value_parser = <Multiaddr as gadget_std::str::FromStr>::from_str, action = clap::ArgAction::Append, env)]
#[serde(default)]
bootnodes: Option<Vec<Multiaddr>>,
#[cfg(feature = "networking")]
#[arg(long, env)]
#[serde(default)]
network_bind_port: Option<u16>,
#[arg(long, short = 'd', env)]
keystore_uri: String,
#[arg(long, value_enum, env)]
Expand Down Expand Up @@ -212,6 +216,8 @@ impl Default for GadgetCLICoreSettings {
ws_rpc_url: default_ws_rpc_url(),
#[cfg(feature = "networking")]
bootnodes: None,
#[cfg(feature = "networking")]
network_bind_port: None,
keystore_uri: String::new(),
chain: SupportedChains::default(),
verbose: 0,
Expand Down Expand Up @@ -342,6 +348,8 @@ impl ContextConfig {
http_rpc_url,
#[cfg(feature = "networking")]
bootnodes: None,
#[cfg(feature = "networking")]
network_bind_port: None,
keystore_uri,
chain,
verbose: 3,
Expand Down
9 changes: 8 additions & 1 deletion crates/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ pub struct GadgetConfiguration {
/// The list of bootnodes to connect to
#[cfg(feature = "networking")]
pub bootnodes: Vec<Multiaddr>,
/// The port to bind the network to
#[cfg(feature = "networking")]
pub network_bind_port: u16,
/// The type of protocol the gadget is executing on.
pub protocol: Protocol,
/// Protocol-specific settings
Expand Down Expand Up @@ -160,7 +163,7 @@ impl GadgetConfiguration {
network_identity,
ecdsa_pair,
self.bootnodes.clone(),
0,
self.network_bind_port,
network_name,
);

Expand All @@ -186,6 +189,8 @@ fn load_inner(config: ContextConfig) -> Result<GadgetConfiguration, Error> {
ws_rpc_url,
#[cfg(feature = "networking")]
bootnodes,
#[cfg(feature = "networking")]
network_bind_port,
keystore_uri,
protocol,
#[cfg(feature = "tangle")]
Expand Down Expand Up @@ -303,6 +308,8 @@ fn load_inner(config: ContextConfig) -> Result<GadgetConfiguration, Error> {
data_dir: None,
#[cfg(feature = "networking")]
bootnodes: bootnodes.unwrap_or_default(),
#[cfg(feature = "networking")]
network_bind_port: network_bind_port.unwrap_or_default(),
protocol,
protocol_settings,
})
Expand Down
172 changes: 108 additions & 64 deletions crates/testing-utils/tangle/src/multi_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
Error,
};
use futures::future::join_all;
use gadget_config::GadgetConfiguration;
use gadget_config::{GadgetConfiguration, Multiaddr};
use gadget_contexts::tangle::TangleClientContext;
use gadget_contexts::{keystore::KeystoreContext, tangle::TangleClient};
use gadget_core_testing_utils::runner::TestEnv;
Expand All @@ -16,74 +16,12 @@ use gadget_keystore::crypto::sp_core::SpSr25519;
use gadget_runners::core::error::RunnerError;
use gadget_runners::tangle::tangle::TangleConfig;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tangle_subxt::subxt::tx::Signer;
use tokio::sync::{broadcast, mpsc, oneshot, RwLock};

/// Represents a single node in the multi-node test environment
pub struct NodeHandle {
node_id: usize,
client: TangleClient,
signer: TanglePairSigner<sp_core::sr25519::Pair>,
state: Arc<RwLock<NodeState>>,
command_tx: mpsc::Sender<NodeCommand>,
test_env: Arc<RwLock<TangleTestEnv>>,
}

impl NodeHandle {
/// Adds a job to the node to be executed when the test is run.
///
/// The job is added to the end of the list of jobs and can be stopped using the `stop_job`
/// method.
pub async fn add_job<K: InitializableEventHandler + Send + Sync + 'static>(&self, job: K) {
self.test_env.write().await.add_job(job)
}

pub async fn gadget_config(&self) -> GadgetConfiguration {
self.test_env.read().await.get_gadget_config()
}
}

impl Debug for NodeHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeHandle")
.field("node_id", &self.node_id)
.field("signer", &self.signer.address())
.field("test_env", &self.test_env)
.finish()
}
}

struct NodeState {
is_running: bool,
}

impl Debug for NodeState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeState")
.field("is_running", &self.is_running)
.finish()
}
}

/// Commands that can be sent to individual nodes
enum NodeCommand {
StartRunner {
result_tx: oneshot::Sender<Result<(), Error>>,
},
Shutdown,
}

impl Debug for NodeCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
NodeCommand::StartRunner { .. } => f.write_str("StartRunner"),
NodeCommand::Shutdown => f.write_str("Shutdown"),
}
}
}

#[derive(Clone, Debug)]
enum NodeSlot {
Occupied(Arc<NodeHandle>),
Expand Down Expand Up @@ -167,6 +105,28 @@ impl MultiNodeTestEnv {
self.add_node(node_id).await?;
}

// Setup the bootnodes
if initial_node_count > 1 {
let nodes = self.nodes.read().await;
for (index, node) in nodes.iter().enumerate() {
let NodeSlot::Occupied(node) = node else {
panic!("Not all nodes were initialized");
};

let mut bootnodes = Vec::new();
for node in nodes.iter().enumerate().filter(|(n, _)| *n != index) {
let NodeSlot::Occupied(node) = node.1 else {
panic!("Not all nodes were initialized");
};

bootnodes.push(node.addr.clone());
}

let mut env = node.test_env.write().await;
env.update_networking_config(bootnodes, node.port);
}
}

// Signal initialization is complete
if let Some(tx) = self.initialized_tx.take() {
let _ = tx.send(());
Expand Down Expand Up @@ -370,6 +330,71 @@ impl MultiNodeTestEnv {
}
}

struct NodeState {
is_running: bool,
}

impl Debug for NodeState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeState")
.field("is_running", &self.is_running)
.finish()
}
}

/// Commands that can be sent to individual nodes
enum NodeCommand {
StartRunner {
result_tx: oneshot::Sender<Result<(), Error>>,
},
Shutdown,
}

impl Debug for NodeCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
NodeCommand::StartRunner { .. } => f.write_str("StartRunner"),
NodeCommand::Shutdown => f.write_str("Shutdown"),
}
}
}

/// Represents a single node in the multi-node test environment
pub struct NodeHandle {
node_id: usize,
addr: Multiaddr,
port: u16,
client: TangleClient,
signer: TanglePairSigner<sp_core::sr25519::Pair>,
state: Arc<RwLock<NodeState>>,
command_tx: mpsc::Sender<NodeCommand>,
test_env: Arc<RwLock<TangleTestEnv>>,
}

impl NodeHandle {
/// Adds a job to the node to be executed when the test is run.
///
/// The job is added to the end of the list of jobs and can be stopped using the `stop_job`
/// method.
pub async fn add_job<K: InitializableEventHandler + Send + Sync + 'static>(&self, job: K) {
self.test_env.write().await.add_job(job)
}

pub async fn gadget_config(&self) -> GadgetConfiguration {
self.test_env.read().await.get_gadget_config()
}
}

impl Debug for NodeHandle {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeHandle")
.field("node_id", &self.node_id)
.field("signer", &self.signer.address())
.field("test_env", &self.test_env)
.finish()
}
}

// Implementation for NodeHandle
impl NodeHandle {
async fn new(node_id: usize, config: &TangleTestConfig) -> Result<Arc<Self>, Error> {
Expand Down Expand Up @@ -398,8 +423,16 @@ impl NodeHandle {
// Create TangleTestEnv for this node
let test_env = TangleTestEnv::new(TangleConfig::default(), env.clone())?;

let port = find_open_tcp_bind_port();
gadget_logging::info!("Binding node {node_id} to port {port}");

let addr = Multiaddr::from_str(&format!("/ip4/127.0.0.1/tcp/{port}"))
.expect("Should parse MultiAddr");

let node = Arc::new(Self {
node_id,
addr,
port,
client,
signer: sr25519_signer,
state,
Expand Down Expand Up @@ -470,3 +503,14 @@ impl NodeHandle {
&self.signer
}
}

pub fn find_open_tcp_bind_port() -> u16 {
let listener = std::net::TcpListener::bind("127.0.0.1:0")
.expect("Should bind to localhost");
let port = listener
.local_addr()
.expect("Should have a local address")
.port();
drop(listener);
port
}
9 changes: 8 additions & 1 deletion crates/testing-utils/tangle/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use gadget_config::GadgetConfiguration;
use gadget_config::{GadgetConfiguration, Multiaddr};
use gadget_core_testing_utils::runner::{TestEnv, TestRunner};
use gadget_event_listeners::core::InitializableEventHandler;
use gadget_runners::core::error::RunnerError as Error;
Expand All @@ -17,6 +17,13 @@ pub struct TangleTestEnv {
runner_handle: Mutex<Option<JoinHandle<Result<(), Error>>>>,
}

impl TangleTestEnv {
pub(crate) fn update_networking_config(&mut self, bootnodes: Vec<Multiaddr>, network_bind_port: u16) {
self.gadget_config.bootnodes = bootnodes;
self.gadget_config.network_bind_port = network_bind_port;
}
}

impl Debug for TangleTestEnv {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TangleTestEnv")
Expand Down

0 comments on commit 9f924ac

Please sign in to comment.