Skip to content

Commit

Permalink
Handle swarm peer additions and expiration
Browse files Browse the repository at this point in the history
Signed-off-by: Chaitanya Munukutla <[email protected]>
  • Loading branch information
c16a committed Aug 11, 2024
1 parent 218f243 commit 09565f4
Showing 1 changed file with 45 additions and 4 deletions.
49 changes: 45 additions & 4 deletions server/src/handlers/swarm.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
use crate::p2p::behaviour::{Behaviour, BehaviourEvent};
use async_std::io;
use futures::{prelude::*, select};
use libp2p::kad::store::MemoryStore;
use libp2p::kad::Mode;
use libp2p::swarm::SwarmEvent;
use libp2p::{kad, mdns, noise, tcp, yamux};
use libp2p::{kad, mdns, noise, tcp, yamux, Multiaddr};
use std::env;
use std::error::Error;
use std::time::Duration;

pub(crate) async fn init_swarm() -> Result<(), Box<dyn Error>> {
let default_swarm_port = 7001;
let swarm_port: u16 = env::var("SWARM_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(default_swarm_port);

let default_swarm_host = String::from("0.0.0.0");
let swarm_host = env::var("SWARM_HOST").ok().unwrap_or(default_swarm_host);

let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_async_std()
.with_tcp(
Expand All @@ -23,7 +32,11 @@ pub(crate) async fn init_swarm() -> Result<(), Box<dyn Error>> {
MemoryStore::new(key.public().to_peer_id()),
),
mdns: mdns::async_io::Behaviour::new(
mdns::Config::default(),
mdns::Config {
ttl: Duration::from_secs(20),
query_interval: Duration::from_secs(5),
enable_ipv6: false,
},
key.public().to_peer_id(),
)?,
})
Expand All @@ -34,7 +47,28 @@ pub(crate) async fn init_swarm() -> Result<(), Box<dyn Error>> {
swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));

// Listen on all interfaces and whatever port the OS assigns.
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

let swarm_addr: Multiaddr = format!("/ip4/{}/tcp/{}", swarm_host, swarm_port).parse()?;
match swarm.listen_on(swarm_addr) {
Ok(listener_id) => {
println!("Swarm active, listener_id={}", listener_id);
}
Err(err) => {
eprintln!("failed to initialise swarm {:?}", err);
}
}

if let Some(swarm_peer_url) = env::var("SWARM_PEER").ok() {
let peer_addr: Multiaddr = swarm_peer_url.parse()?;
match swarm.dial(peer_addr.clone()) {
Ok(_) => {
println!("dialed peer={}", peer_addr);
}
Err(err) => {
eprintln!("failed to dial peer={}, err={:?}", peer_addr, err);
}
}
}

tokio::spawn(async move {
loop {
Expand All @@ -45,9 +79,16 @@ pub(crate) async fn init_swarm() -> Result<(), Box<dyn Error>> {
},
SwarmEvent::Behaviour(BehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, multiaddr) in list {
println!("Connected to peer: peer_id={}, addr={}", peer_id, multiaddr);
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr);
}
}
SwarmEvent::Behaviour(BehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, multiaddr) in list {
println!("Disconnected from peer: peer_id={}, addr={}", peer_id, multiaddr);
swarm.behaviour_mut().kademlia.remove_address(&peer_id, &multiaddr);
}
}
SwarmEvent::Behaviour(BehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..})) => {
match result {
kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => {
Expand Down

0 comments on commit 09565f4

Please sign in to comment.