diff --git a/Cargo.lock b/Cargo.lock index ec519b247..fed28431d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1255,6 +1255,7 @@ dependencies = [ "sqlx", "ssh-key", "tempfile", + "test-log", "tokio", "tokio-stream", "tracing", diff --git a/beetle/iroh-bitswap/src/handler.rs b/beetle/iroh-bitswap/src/handler.rs index 8cd0846ce..320ec2db5 100644 --- a/beetle/iroh-bitswap/src/handler.rs +++ b/beetle/iroh-bitswap/src/handler.rs @@ -141,7 +141,7 @@ impl Debug for BitswapHandler { impl BitswapHandler { /// Builds a new [`BitswapHandler`]. - // TODO(nathanielc): Remove uses of KeepAlive::Until + // TODO(WS1-1291): Remove uses of KeepAlive::Until #[allow(deprecated)] pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self { Self { @@ -175,7 +175,7 @@ impl ConnectionHandler for BitswapHandler { self.keep_alive } - // TODO(nathanielc): Remove uses of KeepAlive::Until + // TODO(WS1-1291): Remove uses of KeepAlive::Until #[allow(deprecated)] fn poll(&mut self, cx: &mut Context<'_>) -> Poll { inc!(BitswapMetrics::HandlerPollCount); @@ -276,7 +276,7 @@ impl ConnectionHandler for BitswapHandler { } } - // TODO(nathanielc): Remove uses of KeepAlive::Until + // TODO(WS1-1291): Remove uses of KeepAlive::Until #[allow(deprecated)] fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { match event { diff --git a/beetle/iroh-bitswap/src/lib.rs b/beetle/iroh-bitswap/src/lib.rs index 5e43bba6f..7bb9d1f21 100644 --- a/beetle/iroh-bitswap/src/lib.rs +++ b/beetle/iroh-bitswap/src/lib.rs @@ -610,7 +610,7 @@ mod tests { use libp2p::swarm::SwarmEvent; use libp2p::tcp::{tokio::Transport as TcpTransport, Config as TcpConfig}; use libp2p::yamux; - use libp2p::{core::muxing::StreamMuxerBox, swarm::SwarmBuilder}; + use libp2p::{core::muxing::StreamMuxerBox, swarm}; use libp2p::{noise, PeerId, Swarm, Transport}; use tokio::sync::{mpsc, RwLock}; use tracing::{info, trace}; @@ -741,7 +741,8 @@ mod tests { let store1 = TestStore::default(); let bs1 = Bitswap::new(peer1_id, store1.clone(), Config::default()).await; - let mut swarm1 = SwarmBuilder::with_tokio_executor(trans, bs1, peer1_id).build(); + let config = swarm::Config::with_tokio_executor(); + let mut swarm1 = Swarm::new(trans, bs1, peer1_id, config); let blocks = (0..N).map(|_| create_random_block_v1()).collect::>(); for block in &blocks { @@ -774,7 +775,8 @@ mod tests { let store2 = TestStore::default(); let bs2 = Bitswap::new(peer2_id, store2.clone(), Config::default()).await; - let mut swarm2 = SwarmBuilder::with_tokio_executor(trans, bs2, peer2_id).build(); + let config = swarm::Config::with_tokio_executor(); + let mut swarm2 = Swarm::new(trans, bs2, peer2_id, config); let swarm2_bs = swarm2.behaviour().clone(); let peer2 = tokio::task::spawn(async move { @@ -790,8 +792,8 @@ mod tests { swarm2.behaviour().on_identify( &peer_id, &[ - "/ipfs/bitswap/1.2.0".to_string(), - "/ipfs/bitswap/1.1.0".to_string(), + StreamProtocol::new("/ipfs/bitswap/1.2.0"), + StreamProtocol::new("/ipfs/bitswap/1.1.0"), ], ); } diff --git a/beetle/iroh-bitswap/src/protocol.rs b/beetle/iroh-bitswap/src/protocol.rs index ed80954a2..9eec63236 100644 --- a/beetle/iroh-bitswap/src/protocol.rs +++ b/beetle/iroh-bitswap/src/protocol.rs @@ -197,39 +197,9 @@ impl Decoder for BitswapCodec { #[cfg(test)] mod tests { - use futures::prelude::*; - use libp2p::core::upgrade; - use tokio::net::{TcpListener, TcpStream}; - use tokio_util::compat::*; use super::*; - #[tokio::test] - async fn test_upgrade() { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = async move { - let (incoming, _) = listener.accept().await.unwrap(); - upgrade::apply_inbound(incoming.compat(), ProtocolConfig::default()) - .await - .unwrap(); - }; - - let client = async move { - let stream = TcpStream::connect(&listener_addr).await.unwrap(); - upgrade::apply_outbound( - stream.compat(), - ProtocolConfig::default(), - upgrade::Version::V1Lazy, - ) - .await - .unwrap(); - }; - - future::select(Box::pin(server), Box::pin(client)).await; - } - #[test] fn test_ord() { let mut protocols = [ diff --git a/beetle/iroh-car/src/header.rs b/beetle/iroh-car/src/header.rs index 7b948dbf4..7717f5179 100644 --- a/beetle/iroh-car/src/header.rs +++ b/beetle/iroh-car/src/header.rs @@ -80,8 +80,8 @@ impl From> for CarHeaderV1 { #[cfg(test)] mod tests { - use ipld::codec::{Decode, Encode}; - use ipld_cbor::DagCborCodec; + use libipld::codec::{Decode, Encode}; + use libipld_cbor::DagCborCodec; use multihash::MultihashDigest; use super::*; diff --git a/beetle/iroh-car/src/reader.rs b/beetle/iroh-car/src/reader.rs index 991759629..700baba1b 100644 --- a/beetle/iroh-car/src/reader.rs +++ b/beetle/iroh-car/src/reader.rs @@ -64,7 +64,7 @@ mod tests { use cid::Cid; use futures::TryStreamExt; - use ipld_cbor::DagCborCodec; + use libipld_cbor::DagCborCodec; use multihash::MultihashDigest; use crate::{header::CarHeaderV1, writer::CarWriter}; diff --git a/beetle/iroh-metrics/Cargo.toml b/beetle/iroh-metrics/Cargo.toml index be01f009c..26540a647 100644 --- a/beetle/iroh-metrics/Cargo.toml +++ b/beetle/iroh-metrics/Cargo.toml @@ -40,7 +40,6 @@ features = [ "yamux", "tcp", "dns", - #"mplex", "request-response", "websocket", "serde", diff --git a/beetle/iroh-util/tests/config.a.toml b/beetle/iroh-util/tests/config.a.toml deleted file mode 100644 index 2eca94b06..000000000 --- a/beetle/iroh-util/tests/config.a.toml +++ /dev/null @@ -1,6 +0,0 @@ -port = 5000 - -[map] -seven = 7 -eight = 8 -nine = 9 diff --git a/beetle/iroh-util/tests/config.b.toml b/beetle/iroh-util/tests/config.b.toml deleted file mode 100644 index 573f8dd04..000000000 --- a/beetle/iroh-util/tests/config.b.toml +++ /dev/null @@ -1,8 +0,0 @@ -port = 5000 -enabled = true -list = ["changed", "values"] - -[map] -four = 4 -five = 5 -six = 6 diff --git a/beetle/iroh-util/tests/config.rs b/beetle/iroh-util/tests/config.rs deleted file mode 100644 index ba214ef5e..000000000 --- a/beetle/iroh-util/tests/config.rs +++ /dev/null @@ -1,196 +0,0 @@ -use std::collections::HashMap; -use std::net::SocketAddr; -use std::path::PathBuf; - -use config::{ConfigError, Map, Source, Value}; -use serde::{Deserialize, Serialize}; - -use iroh_util::{insert_into_config_map, make_config}; - -const CONFIG_A: &str = "tests/config.a.toml"; -const CONFIG_B: &str = "tests/config.b.toml"; - -// write test config with nested tables & lists -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -struct TestConfig { - port: u16, - addr: SocketAddr, - enabled: bool, - list: Vec, - map: HashMap, - metrics: Metrics, -} - -// impl default -impl TestConfig { - fn new() -> Self { - let mut map: HashMap = HashMap::default(); - map.insert("one".to_string(), 1); - map.insert("two".to_string(), 2); - map.insert("three".to_string(), 3); - Self { - port: 3030, - addr: "0.0.0.0:3031".parse().unwrap(), - enabled: true, - list: vec!["hello".to_string(), "world".to_string()], - map, - metrics: Metrics::new(), - } - } -} - -impl Source for TestConfig { - fn clone_into_box(&self) -> Box { - Box::new(self.clone()) - } - - fn collect(&self) -> Result, ConfigError> { - let mut map: Map = Map::default(); - insert_into_config_map(&mut map, "port", self.port as i32); - insert_into_config_map(&mut map, "addr", self.addr.to_string()); - insert_into_config_map(&mut map, "enabled", self.enabled); - insert_into_config_map(&mut map, "list", self.list.clone()); - insert_into_config_map(&mut map, "map", self.map.clone()); - let metrics = self.metrics.collect().unwrap(); - insert_into_config_map(&mut map, "metrics", metrics); - Ok(map) - } -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -struct Metrics { - service_env: String, - instance_id: String, - foo: bool, - bar: i32, -} - -impl Metrics { - fn new() -> Self { - Self { - instance_id: "test_instance_id".to_string(), - service_env: "test_service".to_string(), - foo: false, - bar: 0, - } - } -} - -impl Source for Metrics { - fn clone_into_box(&self) -> Box { - Box::new(self.clone()) - } - - fn collect(&self) -> Result, ConfigError> { - let mut map = Map::new(); - insert_into_config_map(&mut map, "service_env", self.service_env.clone()); - insert_into_config_map(&mut map, "instance_id", self.instance_id.clone()); - insert_into_config_map(&mut map, "foo", self.foo); - insert_into_config_map(&mut map, "bar", self.bar); - Ok(map) - } -} - -#[test] -fn test_collect() { - let default = TestConfig::new(); - let mut expect: Map = Map::new(); - expect.insert("port".to_string(), Value::new(None, default.port as i32)); - expect.insert( - "addr".to_string(), - Value::new(None, default.addr.to_string()), - ); - expect.insert("enabled".to_string(), Value::new(None, default.enabled)); - expect.insert("list".to_string(), Value::new(None, default.list)); - expect.insert("map".to_string(), Value::new(None, default.map)); - let mut metrics = Map::new(); - metrics.insert( - "service_env".to_string(), - Value::new(None, default.metrics.service_env), - ); - metrics.insert( - "instance_id".to_string(), - Value::new(None, default.metrics.instance_id), - ); - metrics.insert("foo".to_string(), Value::new(None, default.metrics.foo)); - metrics.insert("bar".to_string(), Value::new(None, default.metrics.bar)); - expect.insert("metrics".to_string(), Value::new(None, metrics)); - - let got = TestConfig::new().collect().unwrap(); - for key in got.keys() { - let left = expect.get(key).unwrap_or_else(|| panic!("{}", key)); - let right = got.get(key).unwrap(); - assert_eq!(left, right); - } -} - -#[test] -fn test_make_config() { - let map = HashMap::from([ - ("one".to_string(), 1), - ("two".to_string(), 2), - ("three".to_string(), 3), - ("four".to_string(), 4), - ("five".to_string(), 5), - ("six".to_string(), 6), - ("seven".to_string(), 7), - ("eight".to_string(), 8), - ("nine".to_string(), 9), - ]); - - // expect - let expect = TestConfig { - // changed by env var - port: 4000, - // stays default - addr: "0.0.0.0:3031".parse().unwrap(), - // changed by flag - enabled: false, - // changed by CONFIG_A - list: vec!["changed".to_string(), "values".to_string()], - // added to by by default, CONFIG_A, & CONFIG_B - map, - metrics: Metrics { - // set by custom metrics env var - service_env: "new_service_env".to_string(), - // set by custom metrics env var - instance_id: "new_id".to_string(), - // set by `env_prefix` env var - foo: true, - // set by metrics env var - bar: 10, - }, - }; - - temp_env::with_vars( - vec![ - // set config field using env var - ("IROH_TEST__PORT", Some("4000")), - // set metrics fiels using `env_prefix`, double-underbar for deep - // nesting - ("IROH_TEST__METRICS__FOO", Some("true")), - // set metrics field using `IROH_METRICS` prefix - ("IROH_METRICS__BAR", Some("10")), - // custom metrics env var - ("IROH_INSTANCE_ID", Some("new_id")), - // custom metrics env var - ("IROH_ENV", Some("new_service_env")), - ], - || { - let got = make_config( - TestConfig::new(), - &[ - Some(&PathBuf::from(CONFIG_A)), - Some(&PathBuf::from(CONFIG_B)), - None, - ], - "IROH_TEST", - HashMap::from([("enabled", "false"), ("metrics.debug", "true")]), - ) - .unwrap(); - assert_eq!(expect, got); - }, - ); -} - -// add metrics diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index 3f30dbcd4..53a9fa0d9 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -77,6 +77,7 @@ features = [ [dev-dependencies] criterion.workspace = true rand_chacha.workspace = true +test-log.workspace = true [[bench]] name = "lru_cache" diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index dc0d2ed08..b8743972b 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -158,12 +158,6 @@ where warn!("Could not parse bootstrap addr {}", multiaddr); } } - - // Trigger initial bootstrap - if let Err(e) = kademlia.bootstrap() { - warn!("Kademlia bootstrap failed: {}", e); - } - Some(kademlia) } else { None diff --git a/p2p/src/config.rs b/p2p/src/config.rs index fcc6caa58..37779b3d3 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -64,6 +64,11 @@ pub struct Libp2pConfig { pub connection_event_buffer_size: usize, pub dial_concurrency_factor: NonZeroU8, pub idle_connection_timeout: Duration, + /// Trust as a confirmed external address any reported observed address. + /// + /// NOTE: It is generally not safe to trust observed addresses received from arbitrary peers. + /// Only enable this option if its known that all connecting peers can be trusted. + pub trust_observed_addrs: bool, } impl Default for Libp2pConfig { @@ -91,7 +96,8 @@ impl Default for Libp2pConfig { notify_handler_buffer_size: NonZeroUsize::new(256).expect("should not be zero"), connection_event_buffer_size: 256, dial_concurrency_factor: NonZeroU8::new(8).expect("should not be zero"), - idle_connection_timeout: Duration::from_secs(10), + idle_connection_timeout: Duration::from_secs(30), + trust_observed_addrs: false, } } } diff --git a/p2p/src/node.rs b/p2p/src/node.rs index c5b2130d4..d5822fd0b 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -15,23 +15,22 @@ use iroh_metrics::{core::MRecorder, inc, libp2p_metrics, p2p::P2PMetrics}; use iroh_rpc_client::Client as RpcClient; use iroh_rpc_client::Lookup; use iroh_rpc_types::p2p::P2pAddr; -use libp2p::core::Multiaddr; -pub use libp2p::gossipsub::{IdentTopic, Topic}; -use libp2p::identify::{Event as IdentifyEvent, Info as IdentifyInfo}; -use libp2p::kad::{ - self, BootstrapOk, GetClosestPeersError, GetClosestPeersOk, GetProvidersOk, KBucketDistance, - NodeStatus, QueryId, QueryResult, RecordKey, +use libp2p::{ + autonat::{self, OutboundProbeEvent}, + core::Multiaddr, + gossipsub::IdentTopic, + identify, + identity::Keypair, + kad::{ + self, BootstrapOk, GetClosestPeersError, GetClosestPeersOk, GetProvidersOk, QueryId, + QueryResult, RecordKey, + }, + mdns, + metrics::Recorder, + multiaddr::Protocol, + swarm::{dial_opts::DialOpts, ConnectionHandler, DialError, NetworkBehaviour, SwarmEvent}, + PeerId, StreamProtocol, Swarm, }; -use libp2p::mdns; -use libp2p::metrics::Recorder; -use libp2p::multiaddr::Protocol; -use libp2p::swarm::{ - dial_opts::{DialOpts, PeerCondition}, - DialError, -}; -use libp2p::swarm::{ConnectionHandler, NetworkBehaviour, SwarmEvent}; -use libp2p::{identity::Keypair, StreamProtocol}; -use libp2p::{PeerId, Swarm}; use sqlx::SqlitePool; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::oneshot::{self, Sender as OneShotSender}; @@ -70,14 +69,12 @@ where swarm: Swarm>, net_receiver_in: Receiver, dial_queries: AHashMap>>>, - lookup_queries: AHashMap>>>, + lookup_queries: AHashMap>>>, // TODO(ramfox): use new providers queue instead find_on_dht_queries: AHashMap, DHTQuery>, network_events: Vec<(Arc, Sender)>, #[allow(dead_code)] rpc_client: RpcClient, - #[allow(dead_code)] - kad_last_range: Option<(KBucketDistance, KBucketDistance)>, rpc_task: JoinHandle<()>, use_dht: bool, bitswap_sessions: BitswapSessions, @@ -86,6 +83,8 @@ where ceramic_peers_key: RecordKey, ceramic_peers_query_id: Option, + + trust_observed_addrs: bool, } impl fmt::Debug for Node @@ -102,7 +101,6 @@ where .field("find_on_dht_queries", &self.find_on_dht_queries) .field("network_events", &self.network_events) .field("rpc_client", &self.rpc_client) - .field("kad_last_range", &self.kad_last_range) .field("rpc_task", &self.rpc_task) .field("use_dht", &self.use_dht) .field("bitswap_sessions", &self.bitswap_sessions) @@ -193,7 +191,6 @@ where find_on_dht_queries: Default::default(), network_events: Vec::new(), rpc_client, - kad_last_range: None, rpc_task, use_dht: libp2p_config.kademlia, bitswap_sessions: Default::default(), @@ -201,6 +198,7 @@ where listen_addrs, ceramic_peers_key: RecordKey::new(&ceramic_peers_key), ceramic_peers_query_id: None, + trust_observed_addrs: libp2p_config.trust_observed_addrs, }) } @@ -271,6 +269,8 @@ where _ = bootstrap_interval.tick() => { if let Err(e) = self.swarm.behaviour_mut().kad_bootstrap() { warn!("kad bootstrap failed: {:?}", e); + } else { + debug!("kad bootstrap succeeded"); } } _ = expiry_interval.tick() => { @@ -311,49 +311,6 @@ where Ok(()) } - /// Check the next node in the DHT. - #[tracing::instrument(skip(self))] - async fn dht_nice_tick(&mut self) { - let mut to_dial: Option<(DialOpts, (KBucketDistance, KBucketDistance))> = None; - if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { - for kbucket in kad.kbuckets() { - if let Some(range) = self.kad_last_range { - if kbucket.range() == range { - continue; - } - } - - // find the first disconnected node - for entry in kbucket.iter() { - if entry.status == NodeStatus::Disconnected { - let peer_id = entry.node.key.preimage(); - - let dial_opts = DialOpts::peer_id(*peer_id) - .condition(PeerCondition::Disconnected) - .addresses(entry.node.value.clone().into_vec()) - .extend_addresses_through_behaviour() - .build(); - to_dial = Some((dial_opts, kbucket.range())); - break; - } - } - } - } - - if let Some((dial_opts, range)) = to_dial { - trace!( - "checking node {:?} in bucket range ({:?})", - dial_opts.get_peer_id().unwrap(), - range - ); - - if let Err(e) = self.swarm.dial(dial_opts) { - warn!("failed to dial: {:?}", e); - } - self.kad_last_range = Some(range); - } - } - /// Subscribe to [`NetworkEvent`]s. #[tracing::instrument(skip(self))] pub fn network_events(&mut self) -> Receiver { @@ -563,6 +520,10 @@ where } = e { match result { + QueryResult::StartProviding(result) => match result { + Ok(_) => {} + Err(err) => warn!("kad: failed to provide record: {}", err), + }, QueryResult::GetProviders(Ok(p)) => { match p { GetProvidersOk::FoundProviders { key, providers } => { @@ -585,8 +546,16 @@ where .map(|i| i == id) .unwrap_or_default() { - info!(peers = providers.len(), "discovered ceramic peers"); - for peer in providers { + let local_peer_id = *self.swarm.local_peer_id(); + let mut providers = providers + .into_iter() + .filter(|peer| *peer != local_peer_id); + info!( + peers.count = providers.by_ref().count(), + "discovered ceramic peers" + ); + providers.for_each(|peer| { + debug!(?peer, "dialing ceramic peer"); if let Err(err) = self.swarm.dial(peer) { if !matches!( err, @@ -595,7 +564,7 @@ where warn!(%err, "failed to dial ceramic peer") } } - } + }); } else if let Some(kad) = behaviour.kad.as_mut() { debug!( "provider results for {:?} last: {}", @@ -687,7 +656,34 @@ where Event::Identify(e) => { libp2p_metrics().record(&*e); trace!("tick: identify {:?}", e); - if let IdentifyEvent::Received { peer_id, info } = *e { + if let identify::Event::Received { peer_id, info } = *e { + // Did we learn about a new external address? + if !self + .swarm + .external_addresses() + .any(|addr| addr == &info.observed_addr) + { + if self.trust_observed_addrs { + debug!( + address=%info.observed_addr, + %peer_id, + "adding trusted external address observed from peer", + ); + // Explicily trust any observed address from any peer. + self.swarm.add_external_address(info.observed_addr.clone()); + } else if let Some(autonat) = self.swarm.behaviour_mut().autonat.as_mut() { + // Probe the observed addr for external connectivity. + // See OutboundProbeEvent case for + + debug!( + address=%info.observed_addr, + %peer_id, + "probing observed address from peer for external connectivity", + ); + autonat.probe_address(info.observed_addr.clone()); + }; + }; + for protocol in &info.protocols { if protocol == &kad::PROTOCOL_NAME { for addr in &info.listen_addrs { @@ -719,7 +715,7 @@ where chan.send(Ok(info.clone())).ok(); } } - } else if let IdentifyEvent::Error { peer_id, error } = *e { + } else if let identify::Event::Error { peer_id, error } = *e { if let Some(channels) = self.lookup_queries.remove(&peer_id) { for chan in channels { chan.send(Err(anyhow!( @@ -790,6 +786,18 @@ where } mdns::Event::Expired(_) => {} }, + Event::Autonat(autonat::Event::OutboundProbe(OutboundProbeEvent::Response { + address, + .. + })) => { + if !self.swarm.external_addresses().any(|addr| addr == &address) { + debug!( + %address, + "adding external address after successful autonat probe", + ); + self.swarm.add_external_address(address); + } + } _ => { // TODO: check all important events are handled } @@ -1169,6 +1177,7 @@ mod tests { use rand_chacha::ChaCha8Rng; use recon::Sha256a; use ssh_key::private::Ed25519Keypair; + use test_log::test; use libp2p::{identity::Keypair as Libp2pKeypair, kad::record::Key}; @@ -1228,6 +1237,8 @@ mod tests { seed: Option, /// Optional `Keys` the node should provide to the DHT on start up. keys: Option>, + /// Pass through to node.trust_observed_addrs + trust_observed_addrs: bool, } #[derive(Clone)] @@ -1275,6 +1286,7 @@ mod tests { bootstrap: true, seed: None, keys: None, + trust_observed_addrs: false, } } @@ -1297,6 +1309,10 @@ mod tests { self.seed = Some(seed); self } + fn with_trust_observed_addrs(mut self, trust_observed_addrs: bool) -> Self { + self.trust_observed_addrs = trust_observed_addrs; + self + } async fn build(self) -> Result { let (rpc_server_addr, rpc_client_addr) = match self.rpc_addrs { @@ -1307,9 +1323,10 @@ mod tests { } }; let mut network_config = Config::default_with_rpc(rpc_client_addr.clone()); + network_config.libp2p.trust_observed_addrs = self.trust_observed_addrs; - if let Some(addr) = self.addrs { - network_config.libp2p.listening_multiaddrs = addr; + if let Some(addrs) = self.addrs { + network_config.libp2p.listening_multiaddrs = addrs; } else { network_config.libp2p.listening_multiaddrs = vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()]; @@ -1732,7 +1749,7 @@ mod tests { Ok(()) } - #[tokio::test] + #[test(tokio::test)] async fn test_dht() -> Result<()> { // set up three nodes // two connect to one @@ -1741,12 +1758,21 @@ mod tests { .parse() .unwrap(); - let test_runner_a = TestRunnerBuilder::new().no_bootstrap().build().await?; + let test_runner_a = TestRunnerBuilder::new() + .no_bootstrap() + // We can trust all peers as they are the other test runners. + // + // We need to trust the observed_addrs because otherwise kademlia will not switch into server mode for the + // established connections because there is no external address to be used. + .with_trust_observed_addrs(true) + .build() + .await?; println!("peer_a: {:?}", test_runner_a.peer_id); // peer_id 12D3KooWLo6JTNKXfjkZtKf8ooLQoXVXUEeuu4YDY3CYqK6rxHXt let mut test_runner_b = TestRunnerBuilder::new() .no_bootstrap() + .with_trust_observed_addrs(true) .with_seed(ChaCha8Rng::from_seed([0; 32])) .build() .await?; @@ -1756,6 +1782,7 @@ mod tests { let test_runner_c = TestRunnerBuilder::new() .no_bootstrap() + .with_trust_observed_addrs(true) .with_seed(ChaCha8Rng::from_seed([1; 32])) .build() .await?; diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index 70fcfded8..9a262fc9d 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -66,7 +66,7 @@ where // See doc comment for State, each row of the transitions table // should map to exactly one call of this transition_state function. // - // TODO(nathanielc): Remove uses of KeepAlive::Until + // TODO(WS1-1291): Remove uses of KeepAlive::Until #[allow(deprecated)] fn transition_state(&mut self, state: State) { debug!( @@ -374,8 +374,12 @@ where | State::Inbound(_) => {} } } - libp2p::swarm::handler::ConnectionEvent::LocalProtocolsChange(_) => todo!(), - libp2p::swarm::handler::ConnectionEvent::RemoteProtocolsChange(_) => todo!(), + libp2p::swarm::handler::ConnectionEvent::LocalProtocolsChange(changes) => { + debug!(?changes, "local protocols change") + } + libp2p::swarm::handler::ConnectionEvent::RemoteProtocolsChange(changes) => { + debug!(?changes, "remote protocols change") + } } } }