From 243dba8ddfb38e03bdffbc6927b032f2c58eba36 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 3 Sep 2024 18:55:44 +0800 Subject: [PATCH 01/10] test: prepare to call stream disperse --- .../protocols/dispersal/executor/behaviour.rs | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs index 242791054..96dbad312 100644 --- a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs @@ -485,3 +485,57 @@ impl + 'static> Netw } } } + +#[cfg(test)] +pub mod test { + use crate::protocols::dispersal::executor::behaviour::DispersalExecutorBehaviour; + use crate::test_utils::AllNeighbours; + use crate::SubnetworkId; + use kzgrs_backend::common::blob::DaBlob; + use kzgrs_backend::common::{Chunk, Column}; + use libp2p::PeerId; + + #[tokio::test] + async fn test_stream_disperse_error_cases() { + let k1 = libp2p::identity::Keypair::generate_ed25519(); + let k2 = libp2p::identity::Keypair::generate_ed25519(); + let _validator_peer = PeerId::from_public_key(&k2.public()); + let neighbours = AllNeighbours { + neighbours: [ + PeerId::from_public_key(&k1.public()), + PeerId::from_public_key(&k2.public()), + ] + .into_iter() + .collect(), + }; + let mut executor = DispersalExecutorBehaviour::new(neighbours.clone()); + let chunk = Chunk(Vec::with_capacity(10)); + let chunks: Vec = (0..100).map(|_| chunk.clone()).collect(); + let column = Column(chunks.clone()); + let blob = DaBlob { + column_idx: 0, + column, + column_commitment: Default::default(), + aggregated_column_commitment: Default::default(), + aggregated_column_proof: Default::default(), + rows_commitments: vec![], + rows_proofs: vec![], + }; + + if let Some(stream) = executor + .idle_streams + .remove(&PeerId::from_public_key(&k1.public())) + { + let network_id = SubnetworkId::from(0u32); + let dispersed = DispersalExecutorBehaviour::::stream_disperse( + stream, + blob.clone(), + network_id, + ) + .await + .is_ok(); + + assert!(dispersed); + }; + } +} \ No newline at end of file From 9fd2594f95500ec10e81a6c8f2de5ba8960fb636 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 3 Sep 2024 18:59:41 +0800 Subject: [PATCH 02/10] fix: formatting --- .../core/src/protocols/dispersal/executor/behaviour.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs index 96dbad312..ee18d0e83 100644 --- a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs @@ -505,8 +505,8 @@ pub mod test { PeerId::from_public_key(&k1.public()), PeerId::from_public_key(&k2.public()), ] - .into_iter() - .collect(), + .into_iter() + .collect(), }; let mut executor = DispersalExecutorBehaviour::new(neighbours.clone()); let chunk = Chunk(Vec::with_capacity(10)); @@ -532,10 +532,10 @@ pub mod test { blob.clone(), network_id, ) - .await - .is_ok(); + .await + .is_ok(); assert!(dispersed); }; } -} \ No newline at end of file +} From 1f6f8351b236a0aadf61f13a7cd5993cc339eb3e Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 5 Sep 2024 20:08:11 +0800 Subject: [PATCH 03/10] fix: remove test_stream_disperse_error_cases --- .../protocols/dispersal/executor/behaviour.rs | 53 ------------------- 1 file changed, 53 deletions(-) diff --git a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs index ee18d0e83..a931653c4 100644 --- a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs @@ -486,56 +486,3 @@ impl + 'static> Netw } } -#[cfg(test)] -pub mod test { - use crate::protocols::dispersal::executor::behaviour::DispersalExecutorBehaviour; - use crate::test_utils::AllNeighbours; - use crate::SubnetworkId; - use kzgrs_backend::common::blob::DaBlob; - use kzgrs_backend::common::{Chunk, Column}; - use libp2p::PeerId; - - #[tokio::test] - async fn test_stream_disperse_error_cases() { - let k1 = libp2p::identity::Keypair::generate_ed25519(); - let k2 = libp2p::identity::Keypair::generate_ed25519(); - let _validator_peer = PeerId::from_public_key(&k2.public()); - let neighbours = AllNeighbours { - neighbours: [ - PeerId::from_public_key(&k1.public()), - PeerId::from_public_key(&k2.public()), - ] - .into_iter() - .collect(), - }; - let mut executor = DispersalExecutorBehaviour::new(neighbours.clone()); - let chunk = Chunk(Vec::with_capacity(10)); - let chunks: Vec = (0..100).map(|_| chunk.clone()).collect(); - let column = Column(chunks.clone()); - let blob = DaBlob { - column_idx: 0, - column, - column_commitment: Default::default(), - aggregated_column_commitment: Default::default(), - aggregated_column_proof: Default::default(), - rows_commitments: vec![], - rows_proofs: vec![], - }; - - if let Some(stream) = executor - .idle_streams - .remove(&PeerId::from_public_key(&k1.public())) - { - let network_id = SubnetworkId::from(0u32); - let dispersed = DispersalExecutorBehaviour::::stream_disperse( - stream, - blob.clone(), - network_id, - ) - .await - .is_ok(); - - assert!(dispersed); - }; - } -} From 82b899948c36d165f6c7765f0a01fb8f78d97cce Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 5 Sep 2024 20:09:40 +0800 Subject: [PATCH 04/10] test: utils for cli --- nomos-cli/src/test_utils.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 nomos-cli/src/test_utils.rs diff --git a/nomos-cli/src/test_utils.rs b/nomos-cli/src/test_utils.rs new file mode 100644 index 000000000..555886047 --- /dev/null +++ b/nomos-cli/src/test_utils.rs @@ -0,0 +1,25 @@ +use libp2p::PeerId; +use std::collections::HashSet; +use subnetworks_assignations::MembershipHandler; + +#[derive(Clone)] +pub struct AllNeighbours { + pub neighbours: HashSet, +} + +impl MembershipHandler for AllNeighbours { + type NetworkId = u32; + type Id = PeerId; + + fn membership(&self, _self_id: &Self::Id) -> HashSet { + [0].into_iter().collect() + } + + fn is_allowed(&self, _id: &Self::Id) -> bool { + true + } + + fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet { + self.neighbours.clone() + } +} From ef3d9f1e188b47b719c99c1be3769abad5aa0436 Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 5 Sep 2024 20:14:13 +0800 Subject: [PATCH 05/10] test: test_dispersal_with_swarms - runtime preparation --- nomos-cli/src/da/network/swarm.rs | 68 +++++++++++++++++++++++++++++++ nomos-cli/src/lib.rs | 3 ++ 2 files changed, 71 insertions(+) diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index d5ef009c1..4cce4d2fa 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -129,3 +129,71 @@ where } } } + +#[cfg(test)] +pub mod test { + use crate::da::network::swarm::ExecutorSwarm; + use crate::test_utils::AllNeighbours; + use futures::StreamExt; + use libp2p::identity::Keypair; + use libp2p::PeerId; + use nomos_da_network_core::address_book::AddressBook; + use nomos_da_network_core::swarm::validator::ValidatorSwarm; + use nomos_libp2p::Multiaddr; + use overwatch_rs::overwatch::handle::OverwatchHandle; + use tokio::sync::mpsc::unbounded_channel; + use tokio::sync::{broadcast, mpsc}; + use tokio_stream::wrappers::UnboundedReceiverStream; + + #[tokio::test] + async fn test_dispersal_with_swarms() { + let k1 = Keypair::generate_ed25519(); + let k2 = Keypair::generate_ed25519(); + let executor_peer = PeerId::from_public_key(&k1.public()); + let validator_peer = PeerId::from_public_key(&k2.public()); + let neighbours = AllNeighbours { + neighbours: [ + PeerId::from_public_key(&k1.public()), + PeerId::from_public_key(&k2.public()), + ] + .into_iter() + .collect(), + }; + + let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap(); + let addr2 = addr.clone().with_p2p(validator_peer).unwrap(); + + let addr_book = AddressBook::from_iter(vec![(executor_peer, addr.clone())]); + + let (dispersal_events_sender, dispersal_events_receiver) = unbounded_channel(); + + let mut executor_swarm = + ExecutorSwarm::new(k1, neighbours.clone(), dispersal_events_sender); + + let dispersal_request_sender = executor_swarm.blobs_sender(); + + let (mut validator_swarm, events_streams) = + ValidatorSwarm::new(k2, neighbours.clone(), addr_book); + + executor_swarm + .dial(addr) + .expect("Should schedule the dials"); + + let ov_handle = OverwatchHandle::new(tokio::runtime::Handle::current(), mpsc::channel(1).0); + + let task = ov_handle + .runtime() + .spawn(async move { executor_swarm.run().await }); + let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = + broadcast::channel(128usize); + let mut dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver); + + let replies_task = ov_handle.runtime().spawn(async move { + while let Some(dispersal_event) = dispersal_events_receiver.next().await { + if let Err(e) = dispersal_broadcast_sender.send(dispersal_event) { + println!("Error in internal broadcast of dispersal event: {e:?}"); + } + } + }); + } +} diff --git a/nomos-cli/src/lib.rs b/nomos-cli/src/lib.rs index 27e0099a4..f2c30c500 100644 --- a/nomos-cli/src/lib.rs +++ b/nomos-cli/src/lib.rs @@ -2,6 +2,9 @@ pub mod api; pub mod cmds; pub mod da; +#[cfg(test)] +pub mod test_utils; + use clap::Parser; use cmds::Command; From a40b5da44896cf6fff92d1ef97572c31d7af88fb Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 6 Sep 2024 16:03:12 +0800 Subject: [PATCH 06/10] fix: cleanup - tracing_subscriber --- nomos-cli/Cargo.toml | 2 +- nomos-cli/src/da/network/swarm.rs | 90 +++++++++++++++++++------------ 2 files changed, 57 insertions(+), 35 deletions(-) diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index 4d9876ae2..34c3e0a0e 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -9,7 +9,7 @@ description = "Cli app to interact with Nomos nodes and perform various tasks" [dependencies] fraction = "0.13" tracing = "0.1" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } async-trait = "0.1" clap = { version = "4", features = ["derive"] } serde_yaml = "0.9" diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index 4cce4d2fa..71b9083dd 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -134,19 +134,26 @@ where pub mod test { use crate::da::network::swarm::ExecutorSwarm; use crate::test_utils::AllNeighbours; - use futures::StreamExt; + use kzgrs_backend::common::blob::DaBlob; + use kzgrs_backend::common::Column; use libp2p::identity::Keypair; use libp2p::PeerId; use nomos_da_network_core::address_book::AddressBook; use nomos_da_network_core::swarm::validator::ValidatorSwarm; use nomos_libp2p::Multiaddr; - use overwatch_rs::overwatch::handle::OverwatchHandle; + use std::time::Duration; + use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::unbounded_channel; - use tokio::sync::{broadcast, mpsc}; - use tokio_stream::wrappers::UnboundedReceiverStream; + use tracing_subscriber::fmt::TestWriter; + use tracing_subscriber::EnvFilter; #[tokio::test] async fn test_dispersal_with_swarms() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .compact() + .with_writer(TestWriter::default()) + .try_init(); let k1 = Keypair::generate_ed25519(); let k2 = Keypair::generate_ed25519(); let executor_peer = PeerId::from_public_key(&k1.public()); @@ -162,38 +169,53 @@ pub mod test { let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap(); let addr2 = addr.clone().with_p2p(validator_peer).unwrap(); + let addr2_book = AddressBook::from_iter(vec![(executor_peer, addr2.clone())]); + + let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = unbounded_channel(); + + let mut executor = + ExecutorSwarm::new(k1, neighbours.clone(), dispersal_broadcast_sender.clone()); + let (mut validator, events_stream) = + ValidatorSwarm::new(k2, neighbours.clone(), addr2_book); + + let join_validator = tokio::spawn(async move { validator.run().await }); + tokio::time::sleep(Duration::from_secs(1)).await; + executor.dial(addr2).unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + + let executor_open_stream_sender = executor.swarm.behaviour().open_stream_sender(); + let executor_disperse_blob_sender = executor.swarm.behaviour().blobs_sender(); + let (sender, mut receiver) = tokio::sync::oneshot::channel(); + + let blobs_sender = executor.blobs_sender(); + + tokio::time::sleep(Duration::from_secs(1)).await; + + println!("Sending blob..."); + + if let Err(SendError((subnetwork_id, blob_id))) = blobs_sender.send(( + 0, + DaBlob { + column_idx: 0, + column: Column(vec![]), + column_commitment: Default::default(), + aggregated_column_commitment: Default::default(), + aggregated_column_proof: Default::default(), + rows_commitments: vec![], + rows_proofs: vec![], + }, + )) { + println!( + "Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}" + ); + } - let addr_book = AddressBook::from_iter(vec![(executor_peer, addr.clone())]); - - let (dispersal_events_sender, dispersal_events_receiver) = unbounded_channel(); - - let mut executor_swarm = - ExecutorSwarm::new(k1, neighbours.clone(), dispersal_events_sender); - - let dispersal_request_sender = executor_swarm.blobs_sender(); - - let (mut validator_swarm, events_streams) = - ValidatorSwarm::new(k2, neighbours.clone(), addr_book); - - executor_swarm - .dial(addr) - .expect("Should schedule the dials"); - - let ov_handle = OverwatchHandle::new(tokio::runtime::Handle::current(), mpsc::channel(1).0); + println!("Blob sent."); - let task = ov_handle - .runtime() - .spawn(async move { executor_swarm.run().await }); - let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = - broadcast::channel(128usize); - let mut dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver); + let executor_task = tokio::spawn(async move { executor.run().await }); - let replies_task = ov_handle.runtime().spawn(async move { - while let Some(dispersal_event) = dispersal_events_receiver.next().await { - if let Err(e) = dispersal_broadcast_sender.send(dispersal_event) { - println!("Error in internal broadcast of dispersal event: {e:?}"); - } - } - }); + join_validator.await.unwrap(); + sender.send(()).unwrap(); + executor_task.await.unwrap(); } } From 1d814767f0a852d3de4433db1fc0486394de5e61 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 9 Sep 2024 11:40:30 +0800 Subject: [PATCH 07/10] fix: rewrite without using sampling --- nomos-cli/src/da/network/swarm.rs | 72 +++++++++++++++++++------------ 1 file changed, 44 insertions(+), 28 deletions(-) diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index 71b9083dd..c55ddae82 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -132,18 +132,24 @@ where #[cfg(test)] pub mod test { - use crate::da::network::swarm::ExecutorSwarm; + use crate::da::network::swarm::{DispersalEvent, ExecutorSwarm}; use crate::test_utils::AllNeighbours; use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::Column; use libp2p::identity::Keypair; + use libp2p::swarm::SwarmEvent; use libp2p::PeerId; + use log::info; use nomos_da_network_core::address_book::AddressBook; - use nomos_da_network_core::swarm::validator::ValidatorSwarm; + use nomos_da_network_core::protocols::sampling; + use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; + use nomos_da_network_service::backends::libp2p::validator::SamplingEvent; use nomos_libp2p::Multiaddr; use std::time::Duration; + use tokio::sync::broadcast; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::unbounded_channel; + use tokio_stream::StreamExt; use tracing_subscriber::fmt::TestWriter; use tracing_subscriber::EnvFilter; @@ -178,7 +184,14 @@ pub mod test { let (mut validator, events_stream) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book); - let join_validator = tokio::spawn(async move { validator.run().await }); + let validator_task = async move { + let validator_swarm = validator.protocol_swarm_mut(); + validator_swarm.listen_on(addr).unwrap(); + + validator.run().await; + }; + + let join_validator = tokio::spawn(validator_task); tokio::time::sleep(Duration::from_secs(1)).await; executor.dial(addr2).unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; @@ -186,35 +199,38 @@ pub mod test { let executor_open_stream_sender = executor.swarm.behaviour().open_stream_sender(); let executor_disperse_blob_sender = executor.swarm.behaviour().blobs_sender(); let (sender, mut receiver) = tokio::sync::oneshot::channel(); - - let blobs_sender = executor.blobs_sender(); - + let executor_poll = async move { + loop { + tokio::select! { + Some(event) = executor.swarm.next() => { + info!("Executor event: {event:?}"); + } + _ = &mut receiver => { + break; + } + } + } + }; + let executor_task = tokio::spawn(executor_poll); + executor_open_stream_sender.send(validator_peer).unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; println!("Sending blob..."); + executor_disperse_blob_sender + .send(( + 0, + DaBlob { + column_idx: 0, + column: Column(vec![]), + column_commitment: Default::default(), + aggregated_column_commitment: Default::default(), + aggregated_column_proof: Default::default(), + rows_commitments: vec![], + rows_proofs: vec![], + }, + )) + .unwrap(); - if let Err(SendError((subnetwork_id, blob_id))) = blobs_sender.send(( - 0, - DaBlob { - column_idx: 0, - column: Column(vec![]), - column_commitment: Default::default(), - aggregated_column_commitment: Default::default(), - aggregated_column_proof: Default::default(), - rows_commitments: vec![], - rows_proofs: vec![], - }, - )) { - println!( - "Error requesting sample for subnetwork id : {subnetwork_id}, blob_id: {blob_id:?}" - ); - } - - println!("Blob sent."); - - let executor_task = tokio::spawn(async move { executor.run().await }); - - join_validator.await.unwrap(); sender.send(()).unwrap(); executor_task.await.unwrap(); } From 7fdea1629b3fe22162ca09551a7b71a13b99c5dc Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 9 Sep 2024 12:17:00 +0800 Subject: [PATCH 08/10] fix: outstanding space --- .../network/core/src/protocols/dispersal/executor/behaviour.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs index a931653c4..242791054 100644 --- a/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/executor/behaviour.rs @@ -485,4 +485,3 @@ impl + 'static> Netw } } } - From a4103a2652e7956c322d757c8125c8abf0eb7ad5 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 10 Sep 2024 11:23:09 +0800 Subject: [PATCH 09/10] fix: use executor.run --- nomos-cli/src/da/network/swarm.rs | 48 ++++++------------------------- 1 file changed, 8 insertions(+), 40 deletions(-) diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index c55ddae82..47ffb476a 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -132,34 +132,20 @@ where #[cfg(test)] pub mod test { - use crate::da::network::swarm::{DispersalEvent, ExecutorSwarm}; + use crate::da::network::swarm::ExecutorSwarm; use crate::test_utils::AllNeighbours; use kzgrs_backend::common::blob::DaBlob; use kzgrs_backend::common::Column; use libp2p::identity::Keypair; - use libp2p::swarm::SwarmEvent; use libp2p::PeerId; - use log::info; use nomos_da_network_core::address_book::AddressBook; - use nomos_da_network_core::protocols::sampling; - use nomos_da_network_core::swarm::validator::{ValidatorEventsStream, ValidatorSwarm}; - use nomos_da_network_service::backends::libp2p::validator::SamplingEvent; + use nomos_da_network_core::swarm::validator::ValidatorSwarm; use nomos_libp2p::Multiaddr; use std::time::Duration; - use tokio::sync::broadcast; - use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::unbounded_channel; - use tokio_stream::StreamExt; - use tracing_subscriber::fmt::TestWriter; - use tracing_subscriber::EnvFilter; #[tokio::test] async fn test_dispersal_with_swarms() { - let _ = tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .compact() - .with_writer(TestWriter::default()) - .try_init(); let k1 = Keypair::generate_ed25519(); let k2 = Keypair::generate_ed25519(); let executor_peer = PeerId::from_public_key(&k1.public()); @@ -177,43 +163,25 @@ pub mod test { let addr2 = addr.clone().with_p2p(validator_peer).unwrap(); let addr2_book = AddressBook::from_iter(vec![(executor_peer, addr2.clone())]); - let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = unbounded_channel(); + let (dispersal_broadcast_sender, _) = unbounded_channel(); let mut executor = ExecutorSwarm::new(k1, neighbours.clone(), dispersal_broadcast_sender.clone()); - let (mut validator, events_stream) = + let (mut validator, _) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book); - let validator_task = async move { + tokio::spawn(async move { let validator_swarm = validator.protocol_swarm_mut(); validator_swarm.listen_on(addr).unwrap(); validator.run().await; - }; + }); - let join_validator = tokio::spawn(validator_task); tokio::time::sleep(Duration::from_secs(1)).await; executor.dial(addr2).unwrap(); tokio::time::sleep(Duration::from_secs(1)).await; - let executor_open_stream_sender = executor.swarm.behaviour().open_stream_sender(); let executor_disperse_blob_sender = executor.swarm.behaviour().blobs_sender(); - let (sender, mut receiver) = tokio::sync::oneshot::channel(); - let executor_poll = async move { - loop { - tokio::select! { - Some(event) = executor.swarm.next() => { - info!("Executor event: {event:?}"); - } - _ = &mut receiver => { - break; - } - } - } - }; - let executor_task = tokio::spawn(executor_poll); - executor_open_stream_sender.send(validator_peer).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; println!("Sending blob..."); executor_disperse_blob_sender @@ -231,7 +199,7 @@ pub mod test { )) .unwrap(); - sender.send(()).unwrap(); - executor_task.await.unwrap(); + let executor_task = tokio::spawn(async move {executor.run().await;}); + assert!(executor_task.await.is_ok()); } } From ca8a3a2345f6911b906c6d90ef1ba8d5add9d336 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 10 Sep 2024 11:24:15 +0800 Subject: [PATCH 10/10] fix: formatting --- nomos-cli/src/da/network/swarm.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/nomos-cli/src/da/network/swarm.rs b/nomos-cli/src/da/network/swarm.rs index 47ffb476a..5c8fc26a4 100644 --- a/nomos-cli/src/da/network/swarm.rs +++ b/nomos-cli/src/da/network/swarm.rs @@ -167,8 +167,7 @@ pub mod test { let mut executor = ExecutorSwarm::new(k1, neighbours.clone(), dispersal_broadcast_sender.clone()); - let (mut validator, _) = - ValidatorSwarm::new(k2, neighbours.clone(), addr2_book); + let (mut validator, _) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book); tokio::spawn(async move { let validator_swarm = validator.protocol_swarm_mut(); @@ -199,7 +198,9 @@ pub mod test { )) .unwrap(); - let executor_task = tokio::spawn(async move {executor.run().await;}); + let executor_task = tokio::spawn(async move { + executor.run().await; + }); assert!(executor_task.await.is_ok()); } }