Skip to content

Commit

Permalink
DA: Integration tests for sampling and dispersal (#733)
Browse files Browse the repository at this point in the history
* Use common test node in da tests

* Pick ranged subnetids when sampling

* Da network settings for integration tests

* Predefined node keys and deterministic rng

* Disperse kzgrs encoded blob

* Cli swarm fixes

* Increase indexer integration test timeout

* Check dispersal responses in cli da adapter

* DA membership configuration in node tests

* Nomos Cli act as a node in tests

* Increase timeout for dispersal tests

* Different node configurations for da tests

* Collect unique ids for sampling
  • Loading branch information
bacv authored Sep 18, 2024
1 parent fadf6d0 commit 9f4f139
Show file tree
Hide file tree
Showing 16 changed files with 797 additions and 445 deletions.
42 changes: 40 additions & 2 deletions nomos-cli/src/da/network/adapters/libp2p.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use futures::future::join_all;
// std
use std::collections::HashSet;
// crates
use futures::{future::join_all, StreamExt};
use kzgrs_backend::{common::blob::DaBlob, encoder::EncodedData as KzgEncodedData};
use nomos_core::da::DaDispersal;
use nomos_da_network_core::SubnetworkId;
use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use thiserror::Error;
use tokio::sync::oneshot;
// internal
use crate::da::{network::backend::Command, NetworkBackend};
use crate::da::{
network::{backend::Command, swarm::DispersalEvent},
NetworkBackend,
};

type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>;

Expand Down Expand Up @@ -39,6 +45,14 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter {
async fn disperse(&self, encoded_data: Self::EncodedData) -> Result<(), Self::Error> {
let mut tasks = Vec::new();

let (sender, receiver) = oneshot::channel();
self.network_relay
.send(DaNetworkMsg::Subscribe { kind: (), sender })
.await
.map_err(|(e, _)| e.to_string())?;
let mut event_stream = receiver.await.map_err(|e| e.to_string())?;
let mut expected_acknowledgments = HashSet::new();

for (i, column) in encoded_data.extended_data.columns().enumerate() {
let blob = DaBlob {
column: column.clone(),
Expand All @@ -56,6 +70,8 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter {
.collect(),
};

expected_acknowledgments.insert((blob.id().clone(), i as SubnetworkId));

let relay = self.network_relay.clone();
let command = DaNetworkMsg::Process(Command::Disperse {
blob,
Expand All @@ -73,6 +89,28 @@ impl DaDispersal for Libp2pExecutorDispersalAdapter {
result?;
}

while !expected_acknowledgments.is_empty() {
let event = event_stream.next().await;
match event {
Some(event) => match event {
DispersalEvent::DispersalSuccess {
blob_id,
subnetwork_id,
} => {
expected_acknowledgments.remove(&(blob_id.to_vec(), subnetwork_id));
}
DispersalEvent::DispersalError { error } => {
return Err(DispersalError(format!("Received dispersal error: {error}")));
}
},
None => {
return Err(DispersalError(
"Event stream ended before receiving all acknowledgments".into(),
));
}
}
}

Ok(())
}
}
34 changes: 30 additions & 4 deletions nomos-cli/src/da/network/backend.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// std
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
Expand Down Expand Up @@ -59,6 +59,7 @@ pub struct ExecutorBackendSettings<Membership> {
/// Membership of DA network PoV set
pub membership: Membership,
pub node_addrs: HashMap<PeerId, Multiaddr>,
pub num_subnets: u16,
}

impl<Membership> ExecutorBackend<Membership> {
Expand Down Expand Up @@ -96,18 +97,43 @@ where
let keypair =
libp2p::identity::Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
let mut executor_swarm =
ExecutorSwarm::new(keypair, config.membership, dispersal_events_sender);
ExecutorSwarm::new(keypair, config.membership.clone(), dispersal_events_sender);
let dispersal_request_sender = executor_swarm.blobs_sender();

for (_, addr) in config.node_addrs {
let mut connected_peers = HashSet::new();

let local_peer_id = *executor_swarm.local_peer_id();
for subnetwork_id in 0..config.num_subnets {
// Connect to one peer in a subnet.
let mut members = config.membership.members_of(&(subnetwork_id as u32));
members.remove(&local_peer_id);
let peer_id = *members
.iter()
.next()
.expect("Subnet should have at least one node which is not the nomos_cli");

let addr = config
.node_addrs
.get(&peer_id)
.expect("Peer address should be in the list");

executor_swarm
.dial(addr)
.dial(addr.clone())
.expect("Should schedule the dials");

connected_peers.insert(peer_id);
}

let executor_open_stream_sender = executor_swarm.open_stream_sender();

let task = overwatch_handle
.runtime()
.spawn(async move { executor_swarm.run().await });

for peer_id in connected_peers.iter() {
executor_open_stream_sender.send(*peer_id).unwrap();
}

let (dispersal_broadcast_sender, dispersal_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver);
Expand Down
59 changes: 37 additions & 22 deletions nomos-cli/src/da/network/swarm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// std
use std::time::Duration;
// crates
use kzgrs_backend::common::blob::DaBlob;
use libp2p::futures::StreamExt;
Expand Down Expand Up @@ -56,6 +57,10 @@ where
self.swarm.behaviour().blobs_sender()
}

pub fn open_stream_sender(&self) -> UnboundedSender<PeerId> {
self.swarm.behaviour().open_stream_sender()
}

fn build_swarm(
key: Keypair,
membership: Membership,
Expand All @@ -65,6 +70,9 @@ where
.with_quic()
.with_behaviour(|_key| DispersalExecutorBehaviour::new(membership))
.expect("Validator behaviour should build")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build()
}

Expand All @@ -73,29 +81,36 @@ where
Ok(())
}

pub fn local_peer_id(&self) -> &PeerId {
self.swarm.local_peer_id()
}

pub async fn run(&mut self) {
tokio::select! {
Some(event) = self.swarm.next() => {
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_dispersal_event(behaviour_event).await;
},
SwarmEvent::ConnectionEstablished{ .. } => {}
SwarmEvent::ConnectionClosed{ .. } => {}
SwarmEvent::IncomingConnection{ .. } => {}
SwarmEvent::IncomingConnectionError{ .. } => {}
SwarmEvent::OutgoingConnectionError{ .. } => {}
SwarmEvent::NewListenAddr{ .. } => {}
SwarmEvent::ExpiredListenAddr{ .. } => {}
SwarmEvent::ListenerClosed{ .. } => {}
SwarmEvent::ListenerError{ .. } => {}
SwarmEvent::Dialing{ .. } => {}
SwarmEvent::NewExternalAddrCandidate{ .. } => {}
SwarmEvent::ExternalAddrConfirmed{ .. } => {}
SwarmEvent::ExternalAddrExpired{ .. } => {}
SwarmEvent::NewExternalAddrOfPeer{ .. } => {}
event => {
debug!("Unsupported validator swarm event: {event:?}");
loop {
tokio::select! {
Some(event) = self.swarm.next() => {
debug!("Executor received an event: {event:?}");
match event {
SwarmEvent::Behaviour(behaviour_event) => {
self.handle_dispersal_event(behaviour_event).await;
},
SwarmEvent::ConnectionEstablished{ .. } => {}
SwarmEvent::ConnectionClosed{ .. } => {}
SwarmEvent::IncomingConnection{ .. } => {}
SwarmEvent::IncomingConnectionError{ .. } => {}
SwarmEvent::OutgoingConnectionError{ .. } => {}
SwarmEvent::NewListenAddr{ .. } => {}
SwarmEvent::ExpiredListenAddr{ .. } => {}
SwarmEvent::ListenerClosed{ .. } => {}
SwarmEvent::ListenerError{ .. } => {}
SwarmEvent::Dialing{ .. } => {}
SwarmEvent::NewExternalAddrCandidate{ .. } => {}
SwarmEvent::ExternalAddrConfirmed{ .. } => {}
SwarmEvent::ExternalAddrExpired{ .. } => {}
SwarmEvent::NewExternalAddrOfPeer{ .. } => {}
event => {
debug!("Unsupported validator swarm event: {event:?}");
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions nomos-da/network/core/src/swarm/validator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// std
use std::time::Duration;
// crates
use futures::StreamExt;
use kzgrs_backend::common::blob::DaBlob;
Expand Down Expand Up @@ -68,6 +69,9 @@ where
.with_quic()
.with_behaviour(|key| ValidatorBehaviour::new(key, membership, addresses))
.expect("Validator behaviour should build")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
.build()
}

Expand Down
51 changes: 42 additions & 9 deletions nomos-services/data-availability/sampling/src/backend/kzgrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::{Duration, Instant};

// crates
use hex;
use rand::distributions::Standard;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use tokio::time;
Expand All @@ -28,6 +27,7 @@ pub struct SamplingContext {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KzgrsSamplingBackendSettings {
pub num_samples: u16,
pub num_subnets: u16,
pub old_blobs_check_interval: Duration,
pub blobs_validity_duration: Duration,
}
Expand Down Expand Up @@ -116,10 +116,9 @@ impl<R: Rng + Sync + Send> DaSamplingServiceBackend<R> for KzgrsSamplingBackend<
return SamplingState::Terminated;
}

let subnets: Vec<SubnetworkId> = Standard
.sample_iter(&mut self.rng)
.take(self.settings.num_samples as usize)
.collect();
let subnets: Vec<SubnetworkId> = (0..self.settings.num_subnets as SubnetworkId)
.choose_multiple(&mut self.rng, self.settings.num_samples.into());

let ctx: SamplingContext = SamplingContext {
subnets: HashSet::new(),
started: Instant::now(),
Expand Down Expand Up @@ -149,23 +148,57 @@ mod test {
use kzgrs_backend::common::{blob::DaBlob, Column};
use nomos_core::da::BlobId;

fn create_sampler(subnet_num: usize) -> KzgrsSamplingBackend<StdRng> {
fn create_sampler(num_samples: usize, num_subnets: usize) -> KzgrsSamplingBackend<StdRng> {
let settings = KzgrsSamplingBackendSettings {
num_samples: subnet_num as u16,
num_samples: num_samples as u16,
num_subnets: num_subnets as u16,
old_blobs_check_interval: Duration::from_millis(20),
blobs_validity_duration: Duration::from_millis(10),
};
let rng = StdRng::from_entropy();
KzgrsSamplingBackend::new(settings, rng)
}

#[tokio::test]
async fn test_init_sampling_subnet_range() {
let number_of_subnets = 42;
let num_samples = 50; // Testing with more samples than subnets to check the limit
let mut backend = create_sampler(num_samples, number_of_subnets);

let blob_id = BlobId::default();
let state = backend.init_sampling(blob_id).await;

if let SamplingState::Init(subnets) = state {
let unique_subnet_ids: HashSet<_> = subnets.iter().cloned().collect();

assert_eq!(
unique_subnet_ids.len(),
subnets.len(),
"Subnet IDs are not unique"
);

assert_eq!(
subnets.len(),
number_of_subnets.min(num_samples),
"Incorrect number of subnet IDs selected"
);

for subnet_id in subnets {
assert!(
(subnet_id as usize) < number_of_subnets,
"Subnet ID is out of range"
);
}
}
}

#[tokio::test]
async fn test_sampler() {
// fictitious number of subnets
let subnet_num: usize = 42;

// create a sampler instance
let sampler = &mut create_sampler(subnet_num);
let sampler = &mut create_sampler(subnet_num, 42);

// create some blobs and blob_ids
let b1: BlobId = sampler.rng.gen();
Expand Down Expand Up @@ -309,7 +342,7 @@ mod test {

#[tokio::test]
async fn test_pruning() {
let mut sampler = create_sampler(42);
let mut sampler = create_sampler(42, 42);

// create some sampling contexes
// first set will go through as in time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@ use nomos_da_network_service::{DaNetworkMsg, NetworkService};
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
use overwatch_rs::DynError;
use serde::{Deserialize, Serialize};
use subnetworks_assignations::MembershipHandler;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaNetworkSamplingSettings {
pub num_samples: u16,
pub subnet_size: SubnetworkId,
}

pub struct Libp2pAdapter<Membership>
where
Membership: MembershipHandler<NetworkId = SubnetworkId, Id = PeerId>
Expand All @@ -51,7 +44,7 @@ where
+ 'static,
{
type Backend = DaNetworkValidatorBackend<Membership>;
type Settings = DaNetworkSamplingSettings;
type Settings = ();

async fn new(
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
Expand Down
3 changes: 2 additions & 1 deletion nomos-services/data-availability/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb
nomos-log = { path = "../../log" }
nomos-network = { path = "../../network", features = ["mock"] }
nomos-libp2p = { path = "../../../nomos-libp2p" }
libp2p = { version = "0.53.2", features = ["ed25519"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", rev = "ac28d01" }
rand = "0.8"
Expand All @@ -40,6 +41,6 @@ time = "0.3"
blake2 = { version = "0.10" }

[features]
default = []
default = ["libp2p"]
libp2p = []
mixnet = []
Loading

0 comments on commit 9f4f139

Please sign in to comment.