Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Da dispersal unit tests update #720

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nomos-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Cli app to interact with Nomos nodes and perform various tasks"
[dependencies]
fraction = "0.13"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
async-trait = "0.1"
clap = { version = "4", features = ["derive"] }
serde_yaml = "0.9"
Expand Down
75 changes: 75 additions & 0 deletions nomos-cli/src/da/network/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,78 @@ where
}
}
}

#[cfg(test)]
pub mod test {
use crate::da::network::swarm::ExecutorSwarm;
use crate::test_utils::AllNeighbours;
use kzgrs_backend::common::blob::DaBlob;
use kzgrs_backend::common::Column;
use libp2p::identity::Keypair;
use libp2p::PeerId;
use nomos_da_network_core::address_book::AddressBook;
use nomos_da_network_core::swarm::validator::ValidatorSwarm;
use nomos_libp2p::Multiaddr;
use std::time::Duration;
use tokio::sync::mpsc::unbounded_channel;

#[tokio::test]
async fn test_dispersal_with_swarms() {
let k1 = Keypair::generate_ed25519();
let k2 = Keypair::generate_ed25519();
let executor_peer = PeerId::from_public_key(&k1.public());
let validator_peer = PeerId::from_public_key(&k2.public());
let neighbours = AllNeighbours {
neighbours: [
PeerId::from_public_key(&k1.public()),
PeerId::from_public_key(&k2.public()),
]
.into_iter()
.collect(),
};

let addr: Multiaddr = "/ip4/127.0.0.1/udp/5063/quic-v1".parse().unwrap();
let addr2 = addr.clone().with_p2p(validator_peer).unwrap();
let addr2_book = AddressBook::from_iter(vec![(executor_peer, addr2.clone())]);

let (dispersal_broadcast_sender, _) = unbounded_channel();

let mut executor =
ExecutorSwarm::new(k1, neighbours.clone(), dispersal_broadcast_sender.clone());
let (mut validator, _) = ValidatorSwarm::new(k2, neighbours.clone(), addr2_book);

tokio::spawn(async move {
let validator_swarm = validator.protocol_swarm_mut();
validator_swarm.listen_on(addr).unwrap();

validator.run().await;
});

tokio::time::sleep(Duration::from_secs(1)).await;
executor.dial(addr2).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After dialing, executor needs to signal the other end about new stream:

let executor_open_stream_sender = executor.open_stream_sender();
executor_open_stream_sender.send(*validator_peer).unwrap();

To have open_stream_sender you'd need to target the da-integration-tests branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not wrap my head around one problem. When I call executor_disperse_blob_sender.send(), handle_dispersal_event() is never called from run().loop. What else should I do ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like our Swarm abstractions are only wrappers around libp2p's concepts? And we just forward all good and bad into upper layers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like our Swarm abstractions are only wrappers around libp2p's concepts? And we just forward all good and bad into upper layers.

Well, they are. This is just a networking layer, it should not do anything else that forward/received messages according to the specs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not wrap my head around one problem. When I call executor_disperse_blob_sender.send(), handle_dispersal_event() is never called from run().loop. What else should I do ?

If you inspect the executor dispersal behaviour, you'll see that before the blob is sent, the peer_id for desired subnetwork need to be available (.../executor/behaviour.rs:289). For this reason try to apply the changes I suggested and call open_stream_sender().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me invest one more day and finish this exercise. Perhaps it will be useful for error case tests which require deep understanding of streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like our Swarm abstractions are only wrappers around libp2p's concepts? And we just forward all good and bad into upper layers.

Well, they are. This is just a networking layer, it should not do anything else that forward/received messages according to the specs.

Let me study Swarms in lipp2p more. Both go-waku and nwaku don't use them, that's why I need to have a deeper look.

You can think of Swarm as a hook or controller for all the underlying behaviours.

I feel uncomfortable we bring channels/hooks into higher levels. Channels are like pipes, no house has them visible unless it is for aestetical reasons :)

Here is an example how things get confusing:
Once dispersal_broadcast_sender became dispersal_events_sender. Just one level away we already start forgetting and shifting. Let my un-smartness be a guide here.

dispersal_broadcast_sender: UnboundedSender<DispersalEvent>,

let (dispersal_events_sender, dispersal_events_receiver) = unbounded_channel();

@danielSanchezQ Could not we have something like Event Manager which understands messages, errors? Backend user could just drop data and it will be taken care of ? Any performance penalty for this one more redirect ? WDYT ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one level away we already start forgetting and shifting
what do you mean by this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one level away we already start forgetting and shifting
what do you mean by this?

Channel with the same purpose has different names.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one level away we already start forgetting and shifting
what do you mean by this?

Channel with the same purpose has different names.

Well, yeah, we could make it more consistent in naming. But having the channels go away the layer should be good. I may be missing the point, but I do not think having an extra layer (event manager) would solve anything.

We usually have 3 layers:

-----------------------------
 Core (behaviours + Swarm)
-----------------------------
Backend (network backend service)
-----------------------------
Service
-----------------------------

The most important one is the Service, as the API should not change at all if possible. Imagine we change the core for something else (my most famous exaple: Pigeons). We just need to create a new (pigeons) backend while the service is left untouched. That makes pieces to be changed easily, is basically changing the type in the final overwatch app.

What I wanted to say with all this is that:

  • Wrapping types is ok for the service and backend, if changing types is just a matter of rewrap (into the stream from whatever).
  • Escaping types are not, otherwise is difficult to refactor.

tokio::time::sleep(Duration::from_secs(1)).await;

let executor_disperse_blob_sender = executor.swarm.behaviour().blobs_sender();

println!("Sending blob...");
executor_disperse_blob_sender
.send((
0,
DaBlob {
column_idx: 0,
column: Column(vec![]),
column_commitment: Default::default(),
aggregated_column_commitment: Default::default(),
aggregated_column_proof: Default::default(),
rows_commitments: vec![],
rows_proofs: vec![],
},
))
.unwrap();

let executor_task = tokio::spawn(async move {
executor.run().await;
});
assert!(executor_task.await.is_ok());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if you could update the asserts part to listen for responses, we should consider test successful if for all dispersed blobs we'd get DispersalEvent::DispersalSuccess.

}
}
3 changes: 3 additions & 0 deletions nomos-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ pub mod api;
pub mod cmds;
pub mod da;

#[cfg(test)]
pub mod test_utils;

use clap::Parser;
use cmds::Command;

Expand Down
25 changes: 25 additions & 0 deletions nomos-cli/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use libp2p::PeerId;
use std::collections::HashSet;
use subnetworks_assignations::MembershipHandler;

#[derive(Clone)]
pub struct AllNeighbours {
pub neighbours: HashSet<PeerId>,
}

impl MembershipHandler for AllNeighbours {
type NetworkId = u32;
type Id = PeerId;

fn membership(&self, _self_id: &Self::Id) -> HashSet<Self::NetworkId> {
[0].into_iter().collect()
}

fn is_allowed(&self, _id: &Self::Id) -> bool {
true
}

fn members_of(&self, _network_id: &Self::NetworkId) -> HashSet<Self::Id> {
self.neighbours.clone()
}
}
Loading