From 09565f4580d9cf647bcf1a4441fcb6a19bde2ef6 Mon Sep 17 00:00:00 2001 From: Chaitanya Munukutla Date: Sun, 11 Aug 2024 13:07:12 +0530 Subject: [PATCH] Handle swarm peer additions and expiration Signed-off-by: Chaitanya Munukutla --- server/src/handlers/swarm.rs | 49 +++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/server/src/handlers/swarm.rs b/server/src/handlers/swarm.rs index 4a44d42..6d7a356 100644 --- a/server/src/handlers/swarm.rs +++ b/server/src/handlers/swarm.rs @@ -1,14 +1,23 @@ use crate::p2p::behaviour::{Behaviour, BehaviourEvent}; -use async_std::io; use futures::{prelude::*, select}; use libp2p::kad::store::MemoryStore; use libp2p::kad::Mode; use libp2p::swarm::SwarmEvent; -use libp2p::{kad, mdns, noise, tcp, yamux}; +use libp2p::{kad, mdns, noise, tcp, yamux, Multiaddr}; +use std::env; use std::error::Error; use std::time::Duration; pub(crate) async fn init_swarm() -> Result<(), Box> { + let default_swarm_port = 7001; + let swarm_port: u16 = env::var("SWARM_PORT") + .ok() + .and_then(|p| p.parse().ok()) + .unwrap_or(default_swarm_port); + + let default_swarm_host = String::from("0.0.0.0"); + let swarm_host = env::var("SWARM_HOST").ok().unwrap_or(default_swarm_host); + let mut swarm = libp2p::SwarmBuilder::with_new_identity() .with_async_std() .with_tcp( @@ -23,7 +32,11 @@ pub(crate) async fn init_swarm() -> Result<(), Box> { MemoryStore::new(key.public().to_peer_id()), ), mdns: mdns::async_io::Behaviour::new( - mdns::Config::default(), + mdns::Config { + ttl: Duration::from_secs(20), + query_interval: Duration::from_secs(5), + enable_ipv6: false, + }, key.public().to_peer_id(), )?, }) @@ -34,7 +47,28 @@ pub(crate) async fn init_swarm() -> Result<(), Box> { swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); // Listen on all interfaces and whatever port the OS assigns. - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + + let swarm_addr: Multiaddr = format!("/ip4/{}/tcp/{}", swarm_host, swarm_port).parse()?; + match swarm.listen_on(swarm_addr) { + Ok(listener_id) => { + println!("Swarm active, listener_id={}", listener_id); + } + Err(err) => { + eprintln!("failed to initialise swarm {:?}", err); + } + } + + if let Some(swarm_peer_url) = env::var("SWARM_PEER").ok() { + let peer_addr: Multiaddr = swarm_peer_url.parse()?; + match swarm.dial(peer_addr.clone()) { + Ok(_) => { + println!("dialed peer={}", peer_addr); + } + Err(err) => { + eprintln!("failed to dial peer={}, err={:?}", peer_addr, err); + } + } + } tokio::spawn(async move { loop { @@ -45,9 +79,16 @@ pub(crate) async fn init_swarm() -> Result<(), Box> { }, SwarmEvent::Behaviour(BehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { for (peer_id, multiaddr) in list { + println!("Connected to peer: peer_id={}, addr={}", peer_id, multiaddr); swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr); } } + SwarmEvent::Behaviour(BehaviourEvent::Mdns(mdns::Event::Expired(list))) => { + for (peer_id, multiaddr) in list { + println!("Disconnected from peer: peer_id={}, addr={}", peer_id, multiaddr); + swarm.behaviour_mut().kademlia.remove_address(&peer_id, &multiaddr); + } + } SwarmEvent::Behaviour(BehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..})) => { match result { kad::QueryResult::GetProviders(Ok(kad::GetProvidersOk::FoundProviders { key, providers, .. })) => {