diff --git a/Cargo.lock b/Cargo.lock index cf0cde790c3..af3ceb9eb0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -528,6 +528,23 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "autonatv2" +version = "0.1.0" +dependencies = [ + "cfg-if", + "clap", + "libp2p", + "opentelemetry 0.21.0", + "opentelemetry-jaeger", + "opentelemetry_sdk 0.21.2", + "rand 0.8.5", + "tokio", + "tracing", + "tracing-opentelemetry 0.22.0", + "tracing-subscriber", +] + [[package]] name = "axum" version = "0.6.20" @@ -1120,6 +1137,15 @@ dependencies = [ "itertools", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -1146,12 +1172,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "crunchy" @@ -2404,6 +2427,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "interceptor" version = "0.10.0" @@ -2454,7 +2483,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "wasm-logger", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -2664,9 +2693,13 @@ dependencies = [ "async-std", "async-trait", "asynchronous-codec", + "bytes", + "either", "futures", + "futures-bounded", "futures-timer", "libp2p-core", + "libp2p-identify", "libp2p-identity", "libp2p-request-response", "libp2p-swarm", @@ -2674,9 +2707,13 @@ dependencies = [ "quick-protobuf", "quick-protobuf-codec", "rand 0.8.5", + "rand_core 0.6.4", + "thiserror", + "tokio", "tracing", "tracing-subscriber", - "web-time", + "void", + "web-time 1.1.0", ] [[package]] @@ -2724,7 +2761,7 @@ dependencies = [ "tracing", "unsigned-varint 0.8.0", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -2758,7 +2795,7 @@ dependencies = [ "tracing", "tracing-subscriber", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -2833,7 +2870,7 @@ dependencies = [ "tracing", "tracing-subscriber", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -2921,7 +2958,7 @@ dependencies = [ "tracing-subscriber", "uint", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -2984,7 +3021,7 @@ dependencies = [ "libp2p-swarm", "pin-project", "prometheus-client", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -3076,7 +3113,7 @@ dependencies = [ "tracing", "tracing-subscriber", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -3096,7 +3133,7 @@ dependencies = [ "tracing", "tracing-subscriber", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -3193,7 +3230,7 @@ dependencies = [ "tracing", "tracing-subscriber", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -3223,7 +3260,7 @@ dependencies = [ "tracing", "tracing-subscriber", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -3252,7 +3289,7 @@ dependencies = [ "tracing", "tracing-subscriber", "void", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -3323,7 +3360,7 @@ dependencies = [ "trybuild", "void", "wasm-bindgen-futures", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -3741,13 +3778,13 @@ dependencies = [ "axum 0.7.5", "futures", "libp2p", - "opentelemetry", + "opentelemetry 0.23.0", "opentelemetry-otlp", - "opentelemetry_sdk", + "opentelemetry_sdk 0.23.0", "prometheus-client", "tokio", "tracing", - "tracing-opentelemetry", + "tracing-opentelemetry 0.24.0", "tracing-subscriber", ] @@ -4134,6 +4171,22 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" +dependencies = [ + "futures-core", + "futures-sink", + "indexmap 2.2.1", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + [[package]] name = "opentelemetry" version = "0.23.0" @@ -4148,6 +4201,22 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-jaeger" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e617c66fd588e40e0dbbd66932fdc87393095b125d4459b1a3a10feb1712f8a1" +dependencies = [ + "async-trait", + "futures-core", + "futures-util", + "opentelemetry 0.21.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.21.2", + "thrift", + "tokio", +] + [[package]] name = "opentelemetry-otlp" version = "0.16.0" @@ -4157,9 +4226,9 @@ dependencies = [ "async-trait", "futures-core", "http 0.2.9", - "opentelemetry", + "opentelemetry 0.23.0", "opentelemetry-proto", - "opentelemetry_sdk", + "opentelemetry_sdk 0.23.0", "prost", "thiserror", "tokio", @@ -4172,12 +4241,43 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "984806e6cf27f2b49282e2a05e288f30594f3dbc74eb7a6e99422bc48ed78162" dependencies = [ - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.23.0", + "opentelemetry_sdk 0.23.0", "prost", "tonic", ] +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" +dependencies = [ + "opentelemetry 0.21.0", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.21.0", + "ordered-float 4.2.0", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "opentelemetry_sdk" version = "0.23.0" @@ -4191,8 +4291,8 @@ dependencies = [ "glob", "lazy_static", "once_cell", - "opentelemetry", - "ordered-float", + "opentelemetry 0.23.0", + "ordered-float 4.2.0", "percent-encoding", "rand 0.8.5", "thiserror", @@ -4200,6 +4300,15 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "4.2.0" @@ -5926,6 +6035,28 @@ dependencies = [ "once_cell", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float 2.10.1", + "threadpool", +] + [[package]] name = "time" version = "0.3.36" @@ -6230,6 +6361,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.21.0", + "opentelemetry_sdk 0.21.2", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time 0.2.4", +] + [[package]] name = "tracing-opentelemetry" version = "0.24.0" @@ -6238,14 +6387,14 @@ checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" dependencies = [ "js-sys", "once_cell", - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.23.0", + "opentelemetry_sdk 0.23.0", "smallvec", "tracing", "tracing-core", "tracing-log", "tracing-subscriber", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -6429,6 +6578,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.1" @@ -6632,6 +6787,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa30049b1c872b72c89866d458eae9f20380ab280ffd1b1e18df2d3e2d98cfe0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web-time" version = "1.1.0" @@ -7227,7 +7392,7 @@ dependencies = [ "pin-project", "rand 0.8.5", "static_assertions", - "web-time", + "web-time 1.1.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7eb1517d24c..3f00734ae3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "core", "examples/autonat", + "examples/autonatv2", "examples/browser-webrtc", "examples/chat", "examples/dcutr", diff --git a/examples/autonatv2/Cargo.toml b/examples/autonatv2/Cargo.toml new file mode 100644 index 00000000000..6c862ee22e4 --- /dev/null +++ b/examples/autonatv2/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "autonatv2" +version = "0.1.0" +edition = "2021" +publish = false +license = "MIT or Apache-2.0" + +[package.metadata.release] +release = false + +[[bin]] +name = "autonatv2_client" + +[[bin]] +name = "autonatv2_server" + +[dependencies] +libp2p = { workspace = true, features = ["macros", "tokio", "tcp", "noise", "yamux", "autonat", "identify", "dns", "quic"] } +clap = { version = "4.4.18", features = ["derive"] } +tokio = { version = "1.35.1", features = ["macros", "rt-multi-thread"] } +tracing = "0.1.40" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +rand = "0.8.5" +opentelemetry = { version = "0.21.0", optional = true } +opentelemetry_sdk = { version = "0.21.1", optional = true, features = ["rt-tokio"] } +tracing-opentelemetry = { version = "0.22.0", optional = true } +opentelemetry-jaeger = { version = "0.20.0", optional = true, features = ["rt-tokio"] } +cfg-if = "1.0.0" + +[features] +jaeger = ["opentelemetry", "opentelemetry_sdk", "tracing-opentelemetry", "opentelemetry-jaeger"] +opentelemetry = ["dep:opentelemetry"] +opentelemetry_sdk = ["dep:opentelemetry_sdk"] +tracing-opentelemetry = ["dep:tracing-opentelemetry"] +opentelemetry-jaeger = ["dep:opentelemetry-jaeger"] + +[lints] +workspace = true diff --git a/examples/autonatv2/Dockerfile b/examples/autonatv2/Dockerfile new file mode 100644 index 00000000000..5a523649d80 --- /dev/null +++ b/examples/autonatv2/Dockerfile @@ -0,0 +1,20 @@ +FROM rust:1.75-alpine as builder + +RUN apk add musl-dev + +WORKDIR /workspace +COPY . . +RUN --mount=type=cache,target=./target \ + --mount=type=cache,target=/usr/local/cargo/registry \ + cargo build --release --package autonatv2 --bin autonatv2_server -F jaeger + +RUN --mount=type=cache,target=./target \ + mv ./target/release/autonatv2_server /usr/local/bin/autonatv2_server + +FROM alpine:latest + +COPY --from=builder /usr/local/bin/autonatv2_server /app/autonatv2_server + +EXPOSE 4884 + +ENTRYPOINT [ "/app/autonatv2_server", "-l", "4884" ] diff --git a/examples/autonatv2/docker-compose.yml b/examples/autonatv2/docker-compose.yml new file mode 100644 index 00000000000..75f44e7e6f9 --- /dev/null +++ b/examples/autonatv2/docker-compose.yml @@ -0,0 +1,16 @@ +version: '3' + +services: + autonatv2: + build: + context: ../.. + dockerfile: examples/autonatv2/Dockerfile + ports: + - 4884:4884 + jaeger: + image: jaegertracing/all-in-one + ports: + - 6831:6831/udp + - 6832:6832/udp + - 16686:16686 + - 14268:14268 diff --git a/examples/autonatv2/src/bin/autonatv2_client.rs b/examples/autonatv2/src/bin/autonatv2_client.rs new file mode 100644 index 00000000000..de902514dd8 --- /dev/null +++ b/examples/autonatv2/src/bin/autonatv2_client.rs @@ -0,0 +1,111 @@ +use std::{error::Error, net::Ipv4Addr, time::Duration}; + +use clap::Parser; +use libp2p::{ + autonat, + futures::StreamExt, + identify, identity, + multiaddr::Protocol, + noise, + swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent}, + tcp, yamux, Multiaddr, SwarmBuilder, +}; +use rand::rngs::OsRng; +use tracing_subscriber::EnvFilter; + +#[derive(Debug, Parser)] +#[clap(name = "libp2p autonatv2 client")] +struct Opt { + /// Port where the client will listen for incoming connections. + #[clap(short = 'p', long, default_value_t = 0)] + listen_port: u16, + + /// Address of the server where want to connect to. + #[clap(short = 'a', long)] + server_address: Multiaddr, + + /// Probe interval in seconds. + #[clap(short = 't', long, default_value = "2")] + probe_interval: u64, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let opt = Opt::parse(); + + let mut swarm = SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + )? + .with_quic() + .with_dns()? + .with_behaviour(|key| Behaviour::new(key.public(), opt.probe_interval))? + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(10))) + .build(); + + swarm.listen_on( + Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) + .with(Protocol::Tcp(opt.listen_port)), + )?; + + swarm.dial( + DialOpts::unknown_peer_id() + .address(opt.server_address) + .build(), + )?; + + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening on {address:?}"); + } + SwarmEvent::Behaviour(BehaviourEvent::Autonat(autonat::v2::client::Event { + server, + tested_addr, + bytes_sent, + result: Ok(()), + })) => { + println!("Tested {tested_addr} with {server}. Sent {bytes_sent} bytes for verification. Everything Ok and verified."); + } + SwarmEvent::Behaviour(BehaviourEvent::Autonat(autonat::v2::client::Event { + server, + tested_addr, + bytes_sent, + result: Err(e), + })) => { + println!("Tested {tested_addr} with {server}. Sent {bytes_sent} bytes for verification. Failed with {e:?}."); + } + SwarmEvent::ExternalAddrConfirmed { address } => { + println!("External address confirmed: {address}"); + } + _ => {} + } + } +} + +#[derive(NetworkBehaviour)] +pub struct Behaviour { + autonat: autonat::v2::client::Behaviour, + identify: identify::Behaviour, +} + +impl Behaviour { + pub fn new(key: identity::PublicKey, probe_interval: u64) -> Self { + Self { + autonat: autonat::v2::client::Behaviour::new( + OsRng, + autonat::v2::client::Config::default() + .with_probe_interval(Duration::from_secs(probe_interval)), + ), + identify: identify::Behaviour::new(identify::Config::new("/ipfs/0.1.0".into(), key)), + } + } +} diff --git a/examples/autonatv2/src/bin/autonatv2_server.rs b/examples/autonatv2/src/bin/autonatv2_server.rs new file mode 100644 index 00000000000..849ed3b3b0a --- /dev/null +++ b/examples/autonatv2/src/bin/autonatv2_server.rs @@ -0,0 +1,87 @@ +use std::{error::Error, net::Ipv4Addr, time::Duration}; + +use cfg_if::cfg_if; +use clap::Parser; +use libp2p::{ + autonat, + futures::StreamExt, + identify, identity, + multiaddr::Protocol, + noise, + swarm::{NetworkBehaviour, SwarmEvent}, + tcp, yamux, Multiaddr, SwarmBuilder, +}; +use rand::rngs::OsRng; + +#[derive(Debug, Parser)] +#[clap(name = "libp2p autonatv2 server")] +struct Opt { + #[clap(short, long, default_value_t = 0)] + listen_port: u16, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + cfg_if! { + if #[cfg(feature = "jaeger")] { + use tracing_subscriber::layer::SubscriberExt; + use opentelemetry_sdk::runtime::Tokio; + let tracer = opentelemetry_jaeger::new_agent_pipeline() + .with_endpoint("jaeger:6831") + .with_service_name("autonatv2") + .install_batch(Tokio)?; + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let subscriber = tracing_subscriber::Registry::default() + .with(telemetry); + } else { + let subscriber = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .finish(); + } + } + tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + + let opt = Opt::parse(); + + let mut swarm = SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + )? + .with_quic() + .with_dns()? + .with_behaviour(|key| Behaviour::new(key.public()))? + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) + .build(); + + swarm.listen_on( + Multiaddr::empty() + .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) + .with(Protocol::Tcp(opt.listen_port)), + )?; + + loop { + match swarm.select_next_some().await { + SwarmEvent::NewListenAddr { address, .. } => println!("Listening on {address:?}"), + SwarmEvent::Behaviour(event) => println!("{event:?}"), + e => println!("{e:?}"), + } + } +} + +#[derive(NetworkBehaviour)] +pub struct Behaviour { + autonat: autonat::v2::server::Behaviour, + identify: identify::Behaviour, +} + +impl Behaviour { + pub fn new(key: identity::PublicKey) -> Self { + Self { + autonat: autonat::v2::server::Behaviour::new(OsRng), + identify: identify::Behaviour::new(identify::Config::new("/ipfs/0.1.0".into(), key)), + } + } +} diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index 2a799221f7c..e171412aa58 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -3,6 +3,13 @@ - Due to the refactor of `Transport` it's no longer required to create a seperate transport for AutoNAT where port reuse is disabled. This information is now passed by the behaviour. See [PR 4568](https://github.com/libp2p/rust-libp2p/pull/4568). +- Introduce the new AutoNATv2 protocol. + It's split into a client and a server part, represented in their respective modules + Features: + - The server now always dials back over a newly allocated port. + This more accurately reflects the reachability state for other peers and avoids accidental hole punching. + - The server can now test addresses different from the observed address (i.e., the connection to the server was made through a `p2p-circuit`). To mitigate against DDoS attacks, the client has to send more data to the server than the dial-back costs. + See [PR 5526](https://github.com/libp2p/rust-libp2p/pull/5526). diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index 7e31a7f3895..2c01d18dceb 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -3,32 +3,47 @@ name = "libp2p-autonat" edition = "2021" rust-version = { workspace = true } description = "NAT and firewall detection for libp2p" -authors = ["David Craven ", "Elena Frank "] version = "0.13.0" +authors = ["David Craven ", "Elena Frank ", "Hannes Furmans "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] + [dependencies] -async-trait = "0.1" +async-trait = { version = "0.1", optional = true } +asynchronous-codec = { workspace = true } +bytes = { version = "1", optional = true } +either = { version = "1.9.0", optional = true } futures = { workspace = true } +futures-bounded = { workspace = true, optional = true } futures-timer = "3.0" -web-time = { workspace = true } +web-time = { workspace = true, optional = true } libp2p-core = { workspace = true } -libp2p-swarm = { workspace = true } -libp2p-request-response = { workspace = true } libp2p-identity = { workspace = true } +libp2p-request-response = { workspace = true, optional = true } +libp2p-swarm = { workspace = true } quick-protobuf = "0.8" -rand = "0.8" tracing = { workspace = true } quick-protobuf-codec = { workspace = true } -asynchronous-codec = { workspace = true } +rand = "0.8" +rand_core = { version = "0.6", optional = true } +thiserror = { version = "1.0.52", optional = true } +void = { version = "1", optional = true } [dev-dependencies] +tokio = { version = "1", features = ["macros", "rt", "sync"]} async-std = { version = "1.10", features = ["attributes"] } libp2p-swarm-test = { path = "../../swarm-test" } -tracing-subscriber = { workspace = true, features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +libp2p-identify = { workspace = true } +libp2p-swarm = { workspace = true, features = ["macros"]} + +[features] +default = ["v1", "v2"] +v1 = ["dep:libp2p-request-response", "dep:web-time", "dep:async-trait"] +v2 = ["dep:bytes", "dep:either", "dep:futures-bounded", "dep:thiserror", "dep:void", "dep:rand_core"] # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling diff --git a/protocols/autonat/src/lib.rs b/protocols/autonat/src/lib.rs index 10c87b1e984..e49eaadcb83 100644 --- a/protocols/autonat/src/lib.rs +++ b/protocols/autonat/src/lib.rs @@ -1,41 +1,10 @@ -// Copyright 2021 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. +#![cfg_attr(docsrs, feature(doc_auto_cfg))] -//! Implementation of the [AutoNAT](https://github.com/libp2p/specs/blob/master/autonat/README.md) protocol. +#[cfg(feature = "v1")] +pub mod v1; -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +#[cfg(feature = "v2")] +pub mod v2; -mod behaviour; -mod protocol; - -pub use self::{ - behaviour::{ - Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, NatStatus, - OutboundProbeError, OutboundProbeEvent, ProbeId, - }, - protocol::{ResponseError, DEFAULT_PROTOCOL_NAME}, -}; -pub use libp2p_request_response::{InboundFailure, OutboundFailure}; - -mod proto { - #![allow(unreachable_pub)] - include!("generated/mod.rs"); - pub(crate) use self::structs::{mod_Message::*, Message}; -} +#[cfg(feature = "v1")] +pub use v1::*; diff --git a/protocols/autonat/src/v1.rs b/protocols/autonat/src/v1.rs new file mode 100644 index 00000000000..c60e4805f40 --- /dev/null +++ b/protocols/autonat/src/v1.rs @@ -0,0 +1,45 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the [AutoNAT](https://github.com/libp2p/specs/blob/master/autonat/README.md) protocol. +//! +//! ## Eventual Deprecation +//! This version of the protocol will eventually be deprecated in favor of [v2](crate::v2). +//! We recommend using v2 for new projects. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +pub(crate) mod behaviour; +pub(crate) mod protocol; + +pub use self::{ + behaviour::{ + Behaviour, Config, Event, InboundProbeError, InboundProbeEvent, NatStatus, + OutboundProbeError, OutboundProbeEvent, ProbeId, + }, + protocol::{ResponseError, DEFAULT_PROTOCOL_NAME}, +}; +pub use libp2p_request_response::{InboundFailure, OutboundFailure}; + +pub(crate) mod proto { + #![allow(unreachable_pub)] + include!("v1/generated/mod.rs"); + pub(crate) use self::structs::{mod_Message::*, Message}; +} diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/v1/behaviour.rs similarity index 99% rename from protocols/autonat/src/behaviour.rs rename to protocols/autonat/src/v1/behaviour.rs index 64bebfb6233..7a717baed8d 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/v1/behaviour.rs @@ -339,7 +339,7 @@ impl Behaviour { ConnectedPoint::Dialer { address, role_override: Endpoint::Dialer, - .. + port_use: _, } => { if let Some(event) = self.as_server().on_outbound_connection(&peer, address) { self.pending_actions @@ -349,7 +349,7 @@ impl Behaviour { ConnectedPoint::Dialer { address: _, role_override: Endpoint::Listener, - .. + port_use: _, } => { // Outgoing connection was dialed as a listener. In other words outgoing connection // was dialed as part of a hole punch. `libp2p-autonat` never attempts to hole diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/v1/behaviour/as_client.rs similarity index 100% rename from protocols/autonat/src/behaviour/as_client.rs rename to protocols/autonat/src/v1/behaviour/as_client.rs diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/v1/behaviour/as_server.rs similarity index 99% rename from protocols/autonat/src/behaviour/as_server.rs rename to protocols/autonat/src/v1/behaviour/as_server.rs index 8f1d6642de5..3ecdd3ac26e 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/v1/behaviour/as_server.rs @@ -17,7 +17,6 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. - use super::{ Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, ProbeId, ResponseError, diff --git a/protocols/autonat/src/generated/mod.rs b/protocols/autonat/src/v1/generated/mod.rs similarity index 100% rename from protocols/autonat/src/generated/mod.rs rename to protocols/autonat/src/v1/generated/mod.rs diff --git a/protocols/autonat/src/generated/structs.proto b/protocols/autonat/src/v1/generated/structs.proto similarity index 100% rename from protocols/autonat/src/generated/structs.proto rename to protocols/autonat/src/v1/generated/structs.proto diff --git a/protocols/autonat/src/generated/structs.rs b/protocols/autonat/src/v1/generated/structs.rs similarity index 100% rename from protocols/autonat/src/generated/structs.rs rename to protocols/autonat/src/v1/generated/structs.rs diff --git a/protocols/autonat/src/protocol.rs b/protocols/autonat/src/v1/protocol.rs similarity index 100% rename from protocols/autonat/src/protocol.rs rename to protocols/autonat/src/v1/protocol.rs diff --git a/protocols/autonat/src/v2.rs b/protocols/autonat/src/v2.rs new file mode 100644 index 00000000000..994497cb1a0 --- /dev/null +++ b/protocols/autonat/src/v2.rs @@ -0,0 +1,37 @@ +//! The second version of the autonat protocol. +//! +//! The implementation follows the [libp2p spec](https://github.com/libp2p/specs/blob/03718ef0f2dea4a756a85ba716ee33f97e4a6d6c/autonat/autonat-v2.md). +//! +//! The new version fixes the issues of the first version: +//! - The server now always dials back over a newly allocated port. This greatly reduces the risk of +//! false positives that often occured in the first version, when the clinet-server connection +//! occured over a hole-punched port. +//! - The server protects against DoS attacks by requiring the client to send more data to the +//! server then the dial back puts on the client, thus making the protocol unatractive for an +//! attacker. +//! +//! The protocol is seperated into two parts: +//! - The client part, which is implemented in the `client` module. (The client is the party that +//! wants to check if it is reachable from the outside.) +//! - The server part, which is implemented in the `server` module. (The server is the party +//! performing reachability checks on behalf of the client.) +//! +//! The two can be used together. + +use libp2p_swarm::StreamProtocol; + +pub mod client; +pub(crate) mod protocol; +pub mod server; + +pub(crate) mod generated { + #![allow(unreachable_pub)] + include!("v2/generated/mod.rs"); +} + +pub(crate) const DIAL_REQUEST_PROTOCOL: StreamProtocol = + StreamProtocol::new("/libp2p/autonat/2/dial-request"); +pub(crate) const DIAL_BACK_PROTOCOL: StreamProtocol = + StreamProtocol::new("/libp2p/autonat/2/dial-back"); + +type Nonce = u64; diff --git a/protocols/autonat/src/v2/client.rs b/protocols/autonat/src/v2/client.rs new file mode 100644 index 00000000000..d3272512f35 --- /dev/null +++ b/protocols/autonat/src/v2/client.rs @@ -0,0 +1,5 @@ +mod behaviour; +mod handler; + +pub use behaviour::Event; +pub use behaviour::{Behaviour, Config}; diff --git a/protocols/autonat/src/v2/client/behaviour.rs b/protocols/autonat/src/v2/client/behaviour.rs new file mode 100644 index 00000000000..97509c05443 --- /dev/null +++ b/protocols/autonat/src/v2/client/behaviour.rs @@ -0,0 +1,439 @@ +use std::{ + collections::{HashMap, VecDeque}, + task::{Context, Poll}, + time::Duration, +}; + +use either::Either; +use futures::FutureExt; +use futures_timer::Delay; +use libp2p_core::{transport::PortUse, Endpoint, Multiaddr}; +use libp2p_identity::PeerId; +use libp2p_swarm::{ + behaviour::ConnectionEstablished, ConnectionClosed, ConnectionDenied, ConnectionHandler, + ConnectionId, FromSwarm, NetworkBehaviour, NewExternalAddrCandidate, NotifyHandler, ToSwarm, +}; +use rand::prelude::*; +use rand_core::OsRng; +use std::fmt::{Debug, Display, Formatter}; + +use crate::v2::{protocol::DialRequest, Nonce}; + +use super::handler::{ + dial_back::{self, IncomingNonce}, + dial_request, +}; + +#[derive(Debug, Clone, Copy)] +pub struct Config { + /// How many candidates we will test at most. + pub(crate) max_candidates: usize, + + /// The interval at which we will attempt to confirm candidates as external addresses. + pub(crate) probe_interval: Duration, +} + +impl Config { + pub fn with_max_candidates(self, max_candidates: usize) -> Self { + Self { + max_candidates, + ..self + } + } + + pub fn with_probe_interval(self, probe_interval: Duration) -> Self { + Self { + probe_interval, + ..self + } + } +} + +impl Default for Config { + fn default() -> Self { + Self { + max_candidates: 10, + probe_interval: Duration::from_secs(5), + } + } +} + +pub struct Behaviour +where + R: RngCore + 'static, +{ + rng: R, + config: Config, + pending_events: VecDeque< + ToSwarm< + ::ToSwarm, + <::ConnectionHandler as ConnectionHandler>::FromBehaviour, + >, + >, + address_candidates: HashMap, + next_tick: Delay, + peer_info: HashMap, +} + +impl NetworkBehaviour for Behaviour +where + R: RngCore + 'static, +{ + type ConnectionHandler = Either; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result<::ConnectionHandler, ConnectionDenied> { + Ok(Either::Right(dial_back::Handler::new())) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + _: PortUse, + ) -> Result<::ConnectionHandler, ConnectionDenied> { + Ok(Either::Left(dial_request::Handler::new())) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::NewExternalAddrCandidate(NewExternalAddrCandidate { addr }) => { + self.address_candidates + .entry(addr.clone()) + .or_default() + .score += 1; + } + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + connection_id, + endpoint: _, + .. + }) => { + self.peer_info.insert( + connection_id, + ConnectionInfo { + peer_id, + supports_autonat: false, + }, + ); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + let info = self + .peer_info + .remove(&connection_id) + .expect("inconsistent state"); + + if info.supports_autonat { + tracing::debug!(%peer_id, "Disconnected from AutoNAT server"); + } + } + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: ::ToBehaviour, + ) { + let (nonce, outcome) = match event { + Either::Right(IncomingNonce { nonce, sender }) => { + let Some((_, info)) = self + .address_candidates + .iter_mut() + .find(|(_, info)| info.is_pending_with_nonce(nonce)) + else { + let _ = sender.send(Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Received unexpected nonce: {nonce} from {peer_id}"), + ))); + return; + }; + + info.status = TestStatus::Received(nonce); + tracing::debug!(%peer_id, %nonce, "Successful dial-back"); + + let _ = sender.send(Ok(())); + + return; + } + Either::Left(dial_request::ToBehaviour::PeerHasServerSupport) => { + self.peer_info + .get_mut(&connection_id) + .expect("inconsistent state") + .supports_autonat = true; + return; + } + Either::Left(dial_request::ToBehaviour::TestOutcome { nonce, outcome }) => { + (nonce, outcome) + } + }; + + let ((tested_addr, bytes_sent), result) = match outcome { + Ok(address) => { + let received_dial_back = self + .address_candidates + .iter_mut() + .any(|(_, info)| info.is_received_with_nonce(nonce)); + + if !received_dial_back { + tracing::warn!( + %peer_id, + %nonce, + "Server reported reachbility but we never received a dial-back" + ); + return; + } + + self.pending_events + .push_back(ToSwarm::ExternalAddrConfirmed(address.0.clone())); + + (address, Ok(())) + } + Err(dial_request::Error::UnsupportedProtocol) => { + self.peer_info + .get_mut(&connection_id) + .expect("inconsistent state") + .supports_autonat = false; + + self.reset_status_to(nonce, TestStatus::Untested); // Reset so it will be tried again. + + return; + } + Err(dial_request::Error::Io(e)) => { + tracing::debug!( + %peer_id, + %nonce, + "Failed to complete AutoNAT probe: {e}" + ); + + self.reset_status_to(nonce, TestStatus::Untested); // Reset so it will be tried again. + + return; + } + Err(dial_request::Error::AddressNotReachable { + address, + bytes_sent, + error, + }) => { + self.reset_status_to(nonce, TestStatus::Failed); + + ((address, bytes_sent), Err(error)) + } + }; + + self.pending_events.push_back(ToSwarm::GenerateEvent(Event { + tested_addr, + bytes_sent, + server: peer_id, + result: result.map_err(|e| Error { inner: e }), + })); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll::FromBehaviour>> + { + loop { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); + } + + if self.next_tick.poll_unpin(cx).is_ready() { + self.next_tick.reset(self.config.probe_interval); + + self.issue_dial_requests_for_untested_candidates(); + continue; + } + + return Poll::Pending; + } + } +} + +impl Behaviour +where + R: RngCore + 'static, +{ + pub fn new(rng: R, config: Config) -> Self { + Self { + rng, + next_tick: Delay::new(config.probe_interval), + config, + pending_events: VecDeque::new(), + address_candidates: HashMap::new(), + peer_info: HashMap::new(), + } + } + + /// Issues dial requests to random AutoNAT servers for the most frequently reported, untested candidates. + /// + /// In the current implementation, we only send a single address to each AutoNAT server. + /// This spreads our candidates out across all servers we are connected to which should give us pretty fast feedback on all of them. + fn issue_dial_requests_for_untested_candidates(&mut self) { + for addr in self.untested_candidates() { + let Some((conn_id, peer_id)) = self.random_autonat_server() else { + tracing::debug!("Not connected to any AutoNAT servers"); + return; + }; + + let nonce = self.rng.gen(); + self.address_candidates + .get_mut(&addr) + .expect("only emit candidates") + .status = TestStatus::Pending(nonce); + + self.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::One(conn_id), + event: Either::Left(DialRequest { + nonce, + addrs: vec![addr], + }), + }); + } + } + + /// Returns all untested candidates, sorted by the frequency they were reported at. + /// + /// More frequently reported candidates are considered to more likely be external addresses and thus tested first. + fn untested_candidates(&self) -> impl Iterator { + let mut entries = self + .address_candidates + .iter() + .filter(|(_, info)| info.status == TestStatus::Untested) + .map(|(addr, count)| (addr.clone(), *count)) + .collect::>(); + + entries.sort_unstable_by_key(|(_, info)| info.score); + + if entries.is_empty() { + tracing::debug!("No untested address candidates"); + } + + entries + .into_iter() + .rev() // `sort_unstable` is ascending + .take(self.config.max_candidates) + .map(|(addr, _)| addr) + } + + /// Chooses an active connection to one of our peers that reported support for the [`DIAL_REQUEST_PROTOCOL`](crate::v2::DIAL_REQUEST_PROTOCOL) protocol. + fn random_autonat_server(&mut self) -> Option<(ConnectionId, PeerId)> { + let (conn_id, info) = self + .peer_info + .iter() + .filter(|(_, info)| info.supports_autonat) + .choose(&mut self.rng)?; + + Some((*conn_id, info.peer_id)) + } + + fn reset_status_to(&mut self, nonce: Nonce, new_status: TestStatus) { + let Some((_, info)) = self + .address_candidates + .iter_mut() + .find(|(_, i)| i.is_pending_with_nonce(nonce) || i.is_received_with_nonce(nonce)) + else { + return; + }; + + info.status = new_status; + } + + // FIXME: We don't want test-only APIs in our public API. + #[doc(hidden)] + pub fn validate_addr(&mut self, addr: &Multiaddr) { + if let Some(info) = self.address_candidates.get_mut(addr) { + info.status = TestStatus::Received(self.rng.next_u64()); + } + } +} + +impl Default for Behaviour { + fn default() -> Self { + Self::new(OsRng, Config::default()) + } +} + +pub struct Error { + pub(crate) inner: dial_request::DialBackError, +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.inner, f) + } +} + +impl Debug for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + Debug::fmt(&self.inner, f) + } +} + +#[derive(Debug)] +pub struct Event { + /// The address that was selected for testing. + pub tested_addr: Multiaddr, + /// The amount of data that was sent to the server. + /// Is 0 if it wasn't necessary to send any data. + /// Otherwise it's a number between 30.000 and 100.000. + pub bytes_sent: usize, + /// The peer id of the server that was selected for testing. + pub server: PeerId, + /// The result of the test. If the test was successful, this is `Ok(())`. + /// Otherwise it's an error. + pub result: Result<(), Error>, +} + +struct ConnectionInfo { + peer_id: PeerId, + supports_autonat: bool, +} + +#[derive(Copy, Clone, Default)] +struct AddressInfo { + score: usize, + status: TestStatus, +} + +impl AddressInfo { + fn is_pending_with_nonce(&self, nonce: Nonce) -> bool { + match self.status { + TestStatus::Pending(c) => c == nonce, + _ => false, + } + } + + fn is_received_with_nonce(&self, nonce: Nonce) -> bool { + match self.status { + TestStatus::Received(c) => c == nonce, + _ => false, + } + } +} + +#[derive(Clone, Copy, Default, PartialEq)] +enum TestStatus { + #[default] + Untested, + Pending(Nonce), + Failed, + Received(Nonce), +} diff --git a/protocols/autonat/src/v2/client/handler.rs b/protocols/autonat/src/v2/client/handler.rs new file mode 100644 index 00000000000..e526c2fb44c --- /dev/null +++ b/protocols/autonat/src/v2/client/handler.rs @@ -0,0 +1,2 @@ +pub(crate) mod dial_back; +pub(crate) mod dial_request; diff --git a/protocols/autonat/src/v2/client/handler/dial_back.rs b/protocols/autonat/src/v2/client/handler/dial_back.rs new file mode 100644 index 00000000000..b94580e69ba --- /dev/null +++ b/protocols/autonat/src/v2/client/handler/dial_back.rs @@ -0,0 +1,141 @@ +use std::{ + io, + task::{Context, Poll}, + time::Duration, +}; + +use futures::channel::oneshot; +use futures_bounded::StreamSet; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_swarm::{ + handler::{ConnectionEvent, FullyNegotiatedInbound, ListenUpgradeError}, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, +}; +use void::Void; + +use crate::v2::{protocol, Nonce, DIAL_BACK_PROTOCOL}; + +pub struct Handler { + inbound: StreamSet>, +} + +impl Handler { + pub(crate) fn new() -> Self { + Self { + inbound: StreamSet::new(Duration::from_secs(5), 2), + } + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = Void; + type ToBehaviour = IncomingNonce; + type InboundProtocol = ReadyUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(DIAL_BACK_PROTOCOL), ()) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + loop { + match self.inbound.poll_next_unpin(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => continue, + Poll::Ready(Some(Err(err))) => { + tracing::debug!("Stream timed out: {err}"); + continue; + } + Poll::Ready(Some(Ok(Err(err)))) => { + tracing::debug!("Dial back handler failed with: {err:?}"); + continue; + } + Poll::Ready(Some(Ok(Ok(incoming_nonce)))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(incoming_nonce)); + } + } + } + } + + fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => { + if self.inbound.try_push(perform_dial_back(protocol)).is_err() { + tracing::warn!("Dial back request dropped, too many requests in flight"); + } + } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { error, .. }) => { + void::unreachable(error); + } + _ => {} + } + } +} + +struct State { + stream: libp2p_swarm::Stream, + oneshot: Option>>, +} + +#[derive(Debug)] +pub struct IncomingNonce { + pub nonce: Nonce, + pub sender: oneshot::Sender>, +} + +fn perform_dial_back( + stream: libp2p_swarm::Stream, +) -> impl futures::Stream> { + let state = State { + stream, + oneshot: None, + }; + futures::stream::unfold(state, |mut state| async move { + if let Some(ref mut receiver) = state.oneshot { + match receiver.await { + Ok(Ok(())) => {} + Ok(Err(e)) => return Some((Err(e), state)), + Err(_) => { + return Some(( + Err(io::Error::new(io::ErrorKind::Other, "Sender got cancelled")), + state, + )); + } + } + if let Err(e) = protocol::dial_back_response(&mut state.stream).await { + return Some((Err(e), state)); + } + return None; + } + + let nonce = match protocol::recv_dial_back(&mut state.stream).await { + Ok(nonce) => nonce, + Err(err) => { + return Some((Err(err), state)); + } + }; + + let (sender, receiver) = oneshot::channel(); + state.oneshot = Some(receiver); + Some((Ok(IncomingNonce { nonce, sender }), state)) + }) +} diff --git a/protocols/autonat/src/v2/client/handler/dial_request.rs b/protocols/autonat/src/v2/client/handler/dial_request.rs new file mode 100644 index 00000000000..9d2df8ee6b4 --- /dev/null +++ b/protocols/autonat/src/v2/client/handler/dial_request.rs @@ -0,0 +1,343 @@ +use futures::{channel::oneshot, AsyncWrite}; +use futures_bounded::FuturesMap; +use libp2p_core::{ + upgrade::{DeniedUpgrade, ReadyUpgrade}, + Multiaddr, +}; + +use libp2p_swarm::{ + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedOutbound, OutboundUpgradeSend, + ProtocolsChange, + }, + ConnectionHandler, ConnectionHandlerEvent, Stream, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, +}; +use std::{ + collections::VecDeque, + io, + iter::{once, repeat}, + task::{Context, Poll}, + time::Duration, +}; + +use crate::v2::{ + generated::structs::{mod_DialResponse::ResponseStatus, DialStatus}, + protocol::{ + Coder, DialDataRequest, DialDataResponse, DialRequest, Response, + DATA_FIELD_LEN_UPPER_BOUND, DATA_LEN_LOWER_BOUND, DATA_LEN_UPPER_BOUND, + }, + Nonce, DIAL_REQUEST_PROTOCOL, +}; + +#[derive(Debug)] +pub enum ToBehaviour { + TestOutcome { + nonce: Nonce, + outcome: Result<(Multiaddr, usize), Error>, + }, + PeerHasServerSupport, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Address is not reachable: {error}")] + AddressNotReachable { + address: Multiaddr, + bytes_sent: usize, + error: DialBackError, + }, + #[error("Peer does not support AutoNAT dial-request protocol")] + UnsupportedProtocol, + #[error("IO error: {0}")] + Io(io::Error), +} + +impl From for Error { + fn from(value: io::Error) -> Self { + Self::Io(value) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum DialBackError { + #[error("server failed to establish a connection")] + NoConnection, + #[error("dial back stream failed")] + StreamFailed, +} + +pub struct Handler { + queued_events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::ToBehaviour, + >, + >, + outbound: FuturesMap>, + queued_streams: VecDeque< + oneshot::Sender< + Result< + Stream, + StreamUpgradeError< as OutboundUpgradeSend>::Error>, + >, + >, + >, +} + +impl Handler { + pub(crate) fn new() -> Self { + Self { + queued_events: VecDeque::new(), + outbound: FuturesMap::new(Duration::from_secs(10), 10), + queued_streams: VecDeque::default(), + } + } + + fn perform_request(&mut self, req: DialRequest) { + let (tx, rx) = oneshot::channel(); + self.queued_streams.push_back(tx); + self.queued_events + .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(DIAL_REQUEST_PROTOCOL), ()), + }); + if self + .outbound + .try_push(req.nonce, start_stream_handle(req, rx)) + .is_err() + { + tracing::debug!("Dial request dropped, too many requests in flight"); + } + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = DialRequest; + type ToBehaviour = ToBehaviour; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = ReadyUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + match self.outbound.poll_unpin(cx) { + Poll::Ready((nonce, Ok(outcome))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::TestOutcome { nonce, outcome }, + )) + } + Poll::Ready((nonce, Err(_))) => { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::TestOutcome { + nonce, + outcome: Err(Error::Io(io::ErrorKind::TimedOut.into())), + }, + )); + } + Poll::Pending => {} + } + + Poll::Pending + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + self.perform_request(event); + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { + tracing::debug!("Dial request failed: {}", error); + match self.queued_streams.pop_front() { + Some(stream_tx) => { + let _ = stream_tx.send(Err(error)); + } + None => { + tracing::warn!( + "Opened unexpected substream without a pending dial request" + ); + } + } + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, .. + }) => match self.queued_streams.pop_front() { + Some(stream_tx) => { + if stream_tx.send(Ok(protocol)).is_err() { + tracing::debug!("Failed to send stream to dead handler"); + } + } + None => { + tracing::warn!("Opened unexpected substream without a pending dial request"); + } + }, + ConnectionEvent::RemoteProtocolsChange(ProtocolsChange::Added(mut added)) => { + if added.any(|p| p.as_ref() == DIAL_REQUEST_PROTOCOL) { + self.queued_events + .push_back(ConnectionHandlerEvent::NotifyBehaviour( + ToBehaviour::PeerHasServerSupport, + )); + } + } + _ => {} + } + } +} + +async fn start_stream_handle( + req: DialRequest, + stream_recv: oneshot::Receiver>>, +) -> Result<(Multiaddr, usize), Error> { + let stream = stream_recv + .await + .map_err(|_| io::Error::from(io::ErrorKind::BrokenPipe))? + .map_err(|e| match e { + StreamUpgradeError::NegotiationFailed => Error::UnsupportedProtocol, + StreamUpgradeError::Timeout => Error::Io(io::ErrorKind::TimedOut.into()), + StreamUpgradeError::Apply(v) => void::unreachable(v), + StreamUpgradeError::Io(e) => Error::Io(e), + })?; + + let mut coder = Coder::new(stream); + coder.send(req.clone()).await?; + + let (res, bytes_sent) = match coder.next().await? { + Response::Data(DialDataRequest { + addr_idx, + num_bytes, + }) => { + if addr_idx >= req.addrs.len() { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "address index out of bounds", + ))); + } + if !(DATA_LEN_LOWER_BOUND..=DATA_LEN_UPPER_BOUND).contains(&num_bytes) { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "requested bytes out of bounds", + ))); + } + + send_aap_data(&mut coder, num_bytes).await?; + + let Response::Dial(dial_response) = coder.next().await? else { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "expected message", + ))); + }; + + (dial_response, num_bytes) + } + Response::Dial(dial_response) => (dial_response, 0), + }; + match coder.close().await { + Ok(_) => {} + Err(err) => { + if err.kind() == io::ErrorKind::ConnectionReset { + // The AutoNAT server may have already closed the stream (this is normal because the probe is finished), in this case we have this error: + // Err(Custom { kind: ConnectionReset, error: Stopped(0) }) + // so we silently ignore this error + } else { + return Err(err.into()); + } + } + } + + match res.status { + ResponseStatus::E_REQUEST_REJECTED => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::Other, + "server rejected request", + ))) + } + ResponseStatus::E_DIAL_REFUSED => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::Other, + "server refused dial", + ))) + } + ResponseStatus::E_INTERNAL_ERROR => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::Other, + "server encountered internal error", + ))) + } + ResponseStatus::OK => {} + } + + let tested_address = req + .addrs + .get(res.addr_idx) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "address index out of bounds"))? + .clone(); + + match res.dial_status { + DialStatus::UNUSED => { + return Err(Error::Io(io::Error::new( + io::ErrorKind::InvalidInput, + "unexpected message", + ))) + } + DialStatus::E_DIAL_ERROR => { + return Err(Error::AddressNotReachable { + address: tested_address, + bytes_sent, + error: DialBackError::NoConnection, + }) + } + DialStatus::E_DIAL_BACK_ERROR => { + return Err(Error::AddressNotReachable { + address: tested_address, + bytes_sent, + error: DialBackError::StreamFailed, + }) + } + DialStatus::OK => {} + } + + Ok((tested_address, bytes_sent)) +} + +async fn send_aap_data(stream: &mut Coder, num_bytes: usize) -> io::Result<()> +where + I: AsyncWrite + Unpin, +{ + let count_full = num_bytes / DATA_FIELD_LEN_UPPER_BOUND; + let partial_len = num_bytes % DATA_FIELD_LEN_UPPER_BOUND; + for req in repeat(DATA_FIELD_LEN_UPPER_BOUND) + .take(count_full) + .chain(once(partial_len)) + .filter(|e| *e > 0) + .map(|data_count| { + DialDataResponse::new(data_count).expect("data count is unexpectedly too big") + }) + { + stream.send(req).await?; + } + + Ok(()) +} diff --git a/protocols/autonat/src/v2/generated/mod.rs b/protocols/autonat/src/v2/generated/mod.rs new file mode 100644 index 00000000000..e52c5a80bc0 --- /dev/null +++ b/protocols/autonat/src/v2/generated/mod.rs @@ -0,0 +1,2 @@ +// Automatically generated mod.rs +pub mod structs; diff --git a/protocols/autonat/src/v2/generated/structs.proto b/protocols/autonat/src/v2/generated/structs.proto new file mode 100644 index 00000000000..d298f43d047 --- /dev/null +++ b/protocols/autonat/src/v2/generated/structs.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +package structs; + +message Message { + oneof msg { + DialRequest dialRequest = 1; + DialResponse dialResponse = 2; + DialDataRequest dialDataRequest = 3; + DialDataResponse dialDataResponse = 4; + } +} + +message DialRequest { + repeated bytes addrs = 1; + fixed64 nonce = 2; +} + +message DialDataRequest { + uint32 addrIdx = 1; + uint64 numBytes = 2; +} + +enum DialStatus { + UNUSED = 0; + E_DIAL_ERROR = 100; + E_DIAL_BACK_ERROR = 101; + OK = 200; +} + +message DialResponse { + enum ResponseStatus { + E_INTERNAL_ERROR = 0; + E_REQUEST_REJECTED = 100; + E_DIAL_REFUSED = 101; + OK = 200; + } + + ResponseStatus status = 1; + uint32 addrIdx = 2; + DialStatus dialStatus = 3; +} + +message DialDataResponse { bytes data = 1; } + +message DialBack { fixed64 nonce = 1; } + +message DialBackResponse { + enum DialBackStatus { + OK = 0; + } + + DialBackStatus status = 1; +} diff --git a/protocols/autonat/src/v2/generated/structs.rs b/protocols/autonat/src/v2/generated/structs.rs new file mode 100644 index 00000000000..e188adb8a42 --- /dev/null +++ b/protocols/autonat/src/v2/generated/structs.rs @@ -0,0 +1,403 @@ +// Automatically generated rust module for 'structs.proto' file + +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(unused_imports)] +#![allow(unknown_lints)] +#![allow(clippy::all)] +#![cfg_attr(rustfmt, rustfmt_skip)] + + +use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result}; +use quick_protobuf::sizeofs::*; +use super::*; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum DialStatus { + UNUSED = 0, + E_DIAL_ERROR = 100, + E_DIAL_BACK_ERROR = 101, + OK = 200, +} + +impl Default for DialStatus { + fn default() -> Self { + DialStatus::UNUSED + } +} + +impl From for DialStatus { + fn from(i: i32) -> Self { + match i { + 0 => DialStatus::UNUSED, + 100 => DialStatus::E_DIAL_ERROR, + 101 => DialStatus::E_DIAL_BACK_ERROR, + 200 => DialStatus::OK, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for DialStatus { + fn from(s: &'a str) -> Self { + match s { + "UNUSED" => DialStatus::UNUSED, + "E_DIAL_ERROR" => DialStatus::E_DIAL_ERROR, + "E_DIAL_BACK_ERROR" => DialStatus::E_DIAL_BACK_ERROR, + "OK" => DialStatus::OK, + _ => Self::default(), + } + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct Message { + pub msg: structs::mod_Message::OneOfmsg, +} + +impl<'a> MessageRead<'a> for Message { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.msg = structs::mod_Message::OneOfmsg::dialRequest(r.read_message::(bytes)?), + Ok(18) => msg.msg = structs::mod_Message::OneOfmsg::dialResponse(r.read_message::(bytes)?), + Ok(26) => msg.msg = structs::mod_Message::OneOfmsg::dialDataRequest(r.read_message::(bytes)?), + Ok(34) => msg.msg = structs::mod_Message::OneOfmsg::dialDataResponse(r.read_message::(bytes)?), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for Message { + fn get_size(&self) -> usize { + 0 + + match self.msg { + structs::mod_Message::OneOfmsg::dialRequest(ref m) => 1 + sizeof_len((m).get_size()), + structs::mod_Message::OneOfmsg::dialResponse(ref m) => 1 + sizeof_len((m).get_size()), + structs::mod_Message::OneOfmsg::dialDataRequest(ref m) => 1 + sizeof_len((m).get_size()), + structs::mod_Message::OneOfmsg::dialDataResponse(ref m) => 1 + sizeof_len((m).get_size()), + structs::mod_Message::OneOfmsg::None => 0, + } } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + match self.msg { structs::mod_Message::OneOfmsg::dialRequest(ref m) => { w.write_with_tag(10, |w| w.write_message(m))? }, + structs::mod_Message::OneOfmsg::dialResponse(ref m) => { w.write_with_tag(18, |w| w.write_message(m))? }, + structs::mod_Message::OneOfmsg::dialDataRequest(ref m) => { w.write_with_tag(26, |w| w.write_message(m))? }, + structs::mod_Message::OneOfmsg::dialDataResponse(ref m) => { w.write_with_tag(34, |w| w.write_message(m))? }, + structs::mod_Message::OneOfmsg::None => {}, + } Ok(()) + } +} + +pub mod mod_Message { + +use super::*; + +#[derive(Debug, PartialEq, Clone)] +pub enum OneOfmsg { + dialRequest(structs::DialRequest), + dialResponse(structs::DialResponse), + dialDataRequest(structs::DialDataRequest), + dialDataResponse(structs::DialDataResponse), + None, +} + +impl Default for OneOfmsg { + fn default() -> Self { + OneOfmsg::None + } +} + +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct DialRequest { + pub addrs: Vec>, + pub nonce: u64, +} + +impl<'a> MessageRead<'a> for DialRequest { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.addrs.push(r.read_bytes(bytes)?.to_owned()), + Ok(17) => msg.nonce = r.read_fixed64(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for DialRequest { + fn get_size(&self) -> usize { + 0 + + self.addrs.iter().map(|s| 1 + sizeof_len((s).len())).sum::() + + if self.nonce == 0u64 { 0 } else { 1 + 8 } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + for s in &self.addrs { w.write_with_tag(10, |w| w.write_bytes(&**s))?; } + if self.nonce != 0u64 { w.write_with_tag(17, |w| w.write_fixed64(*&self.nonce))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct DialDataRequest { + pub addrIdx: u32, + pub numBytes: u64, +} + +impl<'a> MessageRead<'a> for DialDataRequest { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(8) => msg.addrIdx = r.read_uint32(bytes)?, + Ok(16) => msg.numBytes = r.read_uint64(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for DialDataRequest { + fn get_size(&self) -> usize { + 0 + + if self.addrIdx == 0u32 { 0 } else { 1 + sizeof_varint(*(&self.addrIdx) as u64) } + + if self.numBytes == 0u64 { 0 } else { 1 + sizeof_varint(*(&self.numBytes) as u64) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.addrIdx != 0u32 { w.write_with_tag(8, |w| w.write_uint32(*&self.addrIdx))?; } + if self.numBytes != 0u64 { w.write_with_tag(16, |w| w.write_uint64(*&self.numBytes))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct DialResponse { + pub status: structs::mod_DialResponse::ResponseStatus, + pub addrIdx: u32, + pub dialStatus: structs::DialStatus, +} + +impl<'a> MessageRead<'a> for DialResponse { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(8) => msg.status = r.read_enum(bytes)?, + Ok(16) => msg.addrIdx = r.read_uint32(bytes)?, + Ok(24) => msg.dialStatus = r.read_enum(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for DialResponse { + fn get_size(&self) -> usize { + 0 + + if self.status == structs::mod_DialResponse::ResponseStatus::E_INTERNAL_ERROR { 0 } else { 1 + sizeof_varint(*(&self.status) as u64) } + + if self.addrIdx == 0u32 { 0 } else { 1 + sizeof_varint(*(&self.addrIdx) as u64) } + + if self.dialStatus == structs::DialStatus::UNUSED { 0 } else { 1 + sizeof_varint(*(&self.dialStatus) as u64) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.status != structs::mod_DialResponse::ResponseStatus::E_INTERNAL_ERROR { w.write_with_tag(8, |w| w.write_enum(*&self.status as i32))?; } + if self.addrIdx != 0u32 { w.write_with_tag(16, |w| w.write_uint32(*&self.addrIdx))?; } + if self.dialStatus != structs::DialStatus::UNUSED { w.write_with_tag(24, |w| w.write_enum(*&self.dialStatus as i32))?; } + Ok(()) + } +} + +pub mod mod_DialResponse { + + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum ResponseStatus { + E_INTERNAL_ERROR = 0, + E_REQUEST_REJECTED = 100, + E_DIAL_REFUSED = 101, + OK = 200, +} + +impl Default for ResponseStatus { + fn default() -> Self { + ResponseStatus::E_INTERNAL_ERROR + } +} + +impl From for ResponseStatus { + fn from(i: i32) -> Self { + match i { + 0 => ResponseStatus::E_INTERNAL_ERROR, + 100 => ResponseStatus::E_REQUEST_REJECTED, + 101 => ResponseStatus::E_DIAL_REFUSED, + 200 => ResponseStatus::OK, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for ResponseStatus { + fn from(s: &'a str) -> Self { + match s { + "E_INTERNAL_ERROR" => ResponseStatus::E_INTERNAL_ERROR, + "E_REQUEST_REJECTED" => ResponseStatus::E_REQUEST_REJECTED, + "E_DIAL_REFUSED" => ResponseStatus::E_DIAL_REFUSED, + "OK" => ResponseStatus::OK, + _ => Self::default(), + } + } +} + +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct DialDataResponse { + pub data: Vec, +} + +impl<'a> MessageRead<'a> for DialDataResponse { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(10) => msg.data = r.read_bytes(bytes)?.to_owned(), + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for DialDataResponse { + fn get_size(&self) -> usize { + 0 + + if self.data.is_empty() { 0 } else { 1 + sizeof_len((&self.data).len()) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if !self.data.is_empty() { w.write_with_tag(10, |w| w.write_bytes(&**&self.data))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct DialBack { + pub nonce: u64, +} + +impl<'a> MessageRead<'a> for DialBack { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(9) => msg.nonce = r.read_fixed64(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for DialBack { + fn get_size(&self) -> usize { + 0 + + if self.nonce == 0u64 { 0 } else { 1 + 8 } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.nonce != 0u64 { w.write_with_tag(9, |w| w.write_fixed64(*&self.nonce))?; } + Ok(()) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Debug, Default, PartialEq, Clone)] +pub struct DialBackResponse { + pub status: structs::mod_DialBackResponse::DialBackStatus, +} + +impl<'a> MessageRead<'a> for DialBackResponse { + fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result { + let mut msg = Self::default(); + while !r.is_eof() { + match r.next_tag(bytes) { + Ok(8) => msg.status = r.read_enum(bytes)?, + Ok(t) => { r.read_unknown(bytes, t)?; } + Err(e) => return Err(e), + } + } + Ok(msg) + } +} + +impl MessageWrite for DialBackResponse { + fn get_size(&self) -> usize { + 0 + + if self.status == structs::mod_DialBackResponse::DialBackStatus::OK { 0 } else { 1 + sizeof_varint(*(&self.status) as u64) } + } + + fn write_message(&self, w: &mut Writer) -> Result<()> { + if self.status != structs::mod_DialBackResponse::DialBackStatus::OK { w.write_with_tag(8, |w| w.write_enum(*&self.status as i32))?; } + Ok(()) + } +} + +pub mod mod_DialBackResponse { + + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum DialBackStatus { + OK = 0, +} + +impl Default for DialBackStatus { + fn default() -> Self { + DialBackStatus::OK + } +} + +impl From for DialBackStatus { + fn from(i: i32) -> Self { + match i { + 0 => DialBackStatus::OK, + _ => Self::default(), + } + } +} + +impl<'a> From<&'a str> for DialBackStatus { + fn from(s: &'a str) -> Self { + match s { + "OK" => DialBackStatus::OK, + _ => Self::default(), + } + } +} + +} + diff --git a/protocols/autonat/src/v2/protocol.rs b/protocols/autonat/src/v2/protocol.rs new file mode 100644 index 00000000000..4077fd65f5d --- /dev/null +++ b/protocols/autonat/src/v2/protocol.rs @@ -0,0 +1,337 @@ +// change to quick-protobuf-codec + +use std::io; +use std::io::ErrorKind; + +use asynchronous_codec::{Framed, FramedRead, FramedWrite}; + +use futures::{AsyncRead, AsyncWrite, SinkExt, StreamExt}; +use libp2p_core::Multiaddr; + +use quick_protobuf_codec::Codec; +use rand::Rng; + +use crate::v2::{generated::structs as proto, Nonce}; + +const REQUEST_MAX_SIZE: usize = 4104; +pub(super) const DATA_LEN_LOWER_BOUND: usize = 30_000u32 as usize; +pub(super) const DATA_LEN_UPPER_BOUND: usize = 100_000u32 as usize; +pub(super) const DATA_FIELD_LEN_UPPER_BOUND: usize = 4096; + +fn new_io_invalid_data_err(msg: impl Into) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, msg.into()) +} + +pub(crate) struct Coder { + inner: Framed>, +} + +impl Coder +where + I: AsyncWrite + AsyncRead + Unpin, +{ + pub(crate) fn new(io: I) -> Self { + Self { + inner: Framed::new(io, Codec::new(REQUEST_MAX_SIZE)), + } + } + pub(crate) async fn close(mut self) -> io::Result<()> { + self.inner.close().await?; + Ok(()) + } +} + +impl Coder +where + I: AsyncRead + Unpin, +{ + pub(crate) async fn next(&mut self) -> io::Result + where + proto::Message: TryInto, + io::Error: From, + { + Ok(self.next_msg().await?.try_into()?) + } + + async fn next_msg(&mut self) -> io::Result { + self.inner + .next() + .await + .ok_or(io::Error::new( + ErrorKind::UnexpectedEof, + "no request to read", + ))? + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e)) + } +} + +impl Coder +where + I: AsyncWrite + Unpin, +{ + pub(crate) async fn send(&mut self, msg: M) -> io::Result<()> + where + M: Into, + { + self.inner.send(msg.into()).await?; + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum Request { + Dial(DialRequest), + Data(DialDataResponse), +} + +impl From for proto::Message { + fn from(val: DialRequest) -> Self { + let addrs = val.addrs.iter().map(|e| e.to_vec()).collect(); + let nonce = val.nonce; + + proto::Message { + msg: proto::mod_Message::OneOfmsg::dialRequest(proto::DialRequest { addrs, nonce }), + } + } +} + +impl From for proto::Message { + fn from(val: DialDataResponse) -> Self { + debug_assert!( + val.data_count <= DATA_FIELD_LEN_UPPER_BOUND, + "data_count too large" + ); + proto::Message { + msg: proto::mod_Message::OneOfmsg::dialDataResponse(proto::DialDataResponse { + data: vec![0; val.data_count], // One could use Cow::Borrowed here, but it will require a modification of the generated code and that will fail the CI + }), + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct DialRequest { + pub(crate) addrs: Vec, + pub(crate) nonce: u64, +} + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct DialDataResponse { + data_count: usize, +} + +impl DialDataResponse { + pub(crate) fn new(data_count: usize) -> Option { + if data_count <= DATA_FIELD_LEN_UPPER_BOUND { + Some(Self { data_count }) + } else { + None + } + } + + pub(crate) fn get_data_count(&self) -> usize { + self.data_count + } +} + +impl TryFrom for Request { + type Error = io::Error; + + fn try_from(msg: proto::Message) -> Result { + match msg.msg { + proto::mod_Message::OneOfmsg::dialRequest(proto::DialRequest { addrs, nonce }) => { + let addrs = addrs + .into_iter() + .map(|e| e.to_vec()) + .map(|e| { + Multiaddr::try_from(e).map_err(|err| { + new_io_invalid_data_err(format!("invalid multiaddr: {}", err)) + }) + }) + .collect::, io::Error>>()?; + Ok(Self::Dial(DialRequest { addrs, nonce })) + } + proto::mod_Message::OneOfmsg::dialDataResponse(proto::DialDataResponse { data }) => { + let data_count = data.len(); + Ok(Self::Data(DialDataResponse { data_count })) + } + _ => Err(new_io_invalid_data_err( + "expected dialResponse or dialDataRequest", + )), + } + } +} + +#[derive(Debug, Clone)] +pub(crate) enum Response { + Dial(DialResponse), + Data(DialDataRequest), +} + +#[derive(Debug, Clone)] +pub(crate) struct DialDataRequest { + pub(crate) addr_idx: usize, + pub(crate) num_bytes: usize, +} + +#[derive(Debug, Clone)] +pub(crate) struct DialResponse { + pub(crate) status: proto::mod_DialResponse::ResponseStatus, + pub(crate) addr_idx: usize, + pub(crate) dial_status: proto::DialStatus, +} + +impl TryFrom for Response { + type Error = io::Error; + + fn try_from(msg: proto::Message) -> Result { + match msg.msg { + proto::mod_Message::OneOfmsg::dialResponse(proto::DialResponse { + status, + addrIdx, + dialStatus, + }) => Ok(Response::Dial(DialResponse { + status, + addr_idx: addrIdx as usize, + dial_status: dialStatus, + })), + proto::mod_Message::OneOfmsg::dialDataRequest(proto::DialDataRequest { + addrIdx, + numBytes, + }) => Ok(Self::Data(DialDataRequest { + addr_idx: addrIdx as usize, + num_bytes: numBytes as usize, + })), + _ => Err(new_io_invalid_data_err( + "invalid message type, expected dialResponse or dialDataRequest", + )), + } + } +} + +impl From for proto::Message { + fn from(val: Response) -> Self { + match val { + Response::Dial(DialResponse { + status, + addr_idx, + dial_status, + }) => proto::Message { + msg: proto::mod_Message::OneOfmsg::dialResponse(proto::DialResponse { + status, + addrIdx: addr_idx as u32, + dialStatus: dial_status, + }), + }, + Response::Data(DialDataRequest { + addr_idx, + num_bytes, + }) => proto::Message { + msg: proto::mod_Message::OneOfmsg::dialDataRequest(proto::DialDataRequest { + addrIdx: addr_idx as u32, + numBytes: num_bytes as u64, + }), + }, + } + } +} + +impl DialDataRequest { + pub(crate) fn from_rng(addr_idx: usize, mut rng: R) -> Self { + let num_bytes = rng.gen_range(DATA_LEN_LOWER_BOUND..=DATA_LEN_UPPER_BOUND); + Self { + addr_idx, + num_bytes, + } + } +} + +const DIAL_BACK_MAX_SIZE: usize = 10; + +pub(crate) async fn dial_back(stream: impl AsyncWrite + Unpin, nonce: Nonce) -> io::Result<()> { + let msg = proto::DialBack { nonce }; + let mut framed = FramedWrite::new(stream, Codec::::new(DIAL_BACK_MAX_SIZE)); + + framed + .send(msg) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(()) +} + +pub(crate) async fn recv_dial_back(stream: impl AsyncRead + Unpin) -> io::Result { + let framed = &mut FramedRead::new(stream, Codec::::new(DIAL_BACK_MAX_SIZE)); + let proto::DialBack { nonce } = framed + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; + Ok(nonce) +} + +pub(crate) async fn dial_back_response(stream: impl AsyncWrite + Unpin) -> io::Result<()> { + let msg = proto::DialBackResponse { + status: proto::mod_DialBackResponse::DialBackStatus::OK, + }; + let mut framed = FramedWrite::new( + stream, + Codec::::new(DIAL_BACK_MAX_SIZE), + ); + framed + .send(msg) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(()) +} + +pub(crate) async fn recv_dial_back_response( + stream: impl AsyncRead + AsyncWrite + Unpin, +) -> io::Result<()> { + let framed = &mut FramedRead::new( + stream, + Codec::::new(DIAL_BACK_MAX_SIZE), + ); + let proto::DialBackResponse { status } = framed + .next() + .await + .ok_or(io::Error::from(io::ErrorKind::UnexpectedEof))??; + + if proto::mod_DialBackResponse::DialBackStatus::OK == status { + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid dial back response", + )) + } +} + +#[cfg(test)] +mod tests { + use crate::v2::generated::structs::{ + mod_Message::OneOfmsg, DialDataResponse as GenDialDataResponse, Message, + }; + + #[test] + fn message_correct_max_size() { + let message_bytes = quick_protobuf::serialize_into_vec(&Message { + msg: OneOfmsg::dialDataResponse(GenDialDataResponse { + data: vec![0; 4096], + }), + }) + .unwrap(); + assert_eq!(message_bytes.len(), super::REQUEST_MAX_SIZE); + } + + #[test] + fn dial_back_correct_size() { + let dial_back = super::proto::DialBack { nonce: 0 }; + let buf = quick_protobuf::serialize_into_vec(&dial_back).unwrap(); + assert!(buf.len() <= super::DIAL_BACK_MAX_SIZE); + + let dial_back_max_nonce = super::proto::DialBack { nonce: u64::MAX }; + let buf = quick_protobuf::serialize_into_vec(&dial_back_max_nonce).unwrap(); + assert!(buf.len() <= super::DIAL_BACK_MAX_SIZE); + } +} diff --git a/protocols/autonat/src/v2/server.rs b/protocols/autonat/src/v2/server.rs new file mode 100644 index 00000000000..25819307784 --- /dev/null +++ b/protocols/autonat/src/v2/server.rs @@ -0,0 +1,5 @@ +mod behaviour; +mod handler; + +pub use behaviour::Behaviour; +pub use behaviour::Event; diff --git a/protocols/autonat/src/v2/server/behaviour.rs b/protocols/autonat/src/v2/server/behaviour.rs new file mode 100644 index 00000000000..5f7b21d165b --- /dev/null +++ b/protocols/autonat/src/v2/server/behaviour.rs @@ -0,0 +1,156 @@ +use std::{ + collections::{HashMap, VecDeque}, + io, + task::{Context, Poll}, +}; + +use crate::v2::server::handler::dial_request::DialBackStatus; +use either::Either; +use libp2p_core::{transport::PortUse, Endpoint, Multiaddr}; +use libp2p_identity::PeerId; +use libp2p_swarm::dial_opts::PeerCondition; +use libp2p_swarm::{ + dial_opts::DialOpts, dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, + FromSwarm, NetworkBehaviour, ToSwarm, +}; +use rand_core::{OsRng, RngCore}; + +use crate::v2::server::handler::{ + dial_back, + dial_request::{self, DialBackCommand}, + Handler, +}; + +pub struct Behaviour +where + R: Clone + Send + RngCore + 'static, +{ + dialing_dial_back: HashMap, + pending_events: VecDeque< + ToSwarm< + ::ToSwarm, + <::ConnectionHandler as ConnectionHandler>::FromBehaviour, + >, + >, + rng: R, +} + +impl Default for Behaviour { + fn default() -> Self { + Self::new(OsRng) + } +} + +impl Behaviour +where + R: RngCore + Send + Clone + 'static, +{ + pub fn new(rng: R) -> Self { + Self { + dialing_dial_back: HashMap::new(), + pending_events: VecDeque::new(), + rng, + } + } +} + +impl NetworkBehaviour for Behaviour +where + R: RngCore + Send + Clone + 'static, +{ + type ConnectionHandler = Handler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result<::ConnectionHandler, ConnectionDenied> { + Ok(Either::Right(dial_request::Handler::new( + peer, + remote_addr.clone(), + self.rng.clone(), + ))) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: Endpoint, + _port_use: PortUse, + ) -> Result<::ConnectionHandler, ConnectionDenied> { + Ok(match self.dialing_dial_back.remove(&connection_id) { + Some(cmd) => Either::Left(Either::Left(dial_back::Handler::new(cmd))), + None => Either::Left(Either::Right(dummy::ConnectionHandler)), + }) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + if let FromSwarm::DialFailure(DialFailure { connection_id, .. }) = event { + if let Some(DialBackCommand { back_channel, .. }) = + self.dialing_dial_back.remove(&connection_id) + { + let dial_back_status = DialBackStatus::DialErr; + let _ = back_channel.send(Err(dial_back_status)); + } + } + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + _connection_id: ConnectionId, + event: as ConnectionHandler>::ToBehaviour, + ) { + match event { + Either::Left(Either::Left(Ok(_))) => {} + Either::Left(Either::Left(Err(e))) => { + tracing::debug!("dial back error: {e:?}"); + } + Either::Left(Either::Right(v)) => void::unreachable(v), + Either::Right(Either::Left(cmd)) => { + let addr = cmd.addr.clone(); + let opts = DialOpts::peer_id(peer_id) + .addresses(Vec::from([addr])) + .condition(PeerCondition::Always) + .allocate_new_port() + .build(); + let conn_id = opts.connection_id(); + self.dialing_dial_back.insert(conn_id, cmd); + self.pending_events.push_back(ToSwarm::Dial { opts }); + } + Either::Right(Either::Right(status_update)) => self + .pending_events + .push_back(ToSwarm::GenerateEvent(status_update)), + } + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll as ConnectionHandler>::FromBehaviour>> { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event); + } + Poll::Pending + } +} + +#[derive(Debug)] +pub struct Event { + /// All address that were submitted for testing. + pub all_addrs: Vec, + /// The address that was eventually tested. + pub tested_addr: Multiaddr, + /// The peer id of the client that submitted addresses for testing. + pub client: PeerId, + /// The amount of data that was requested by the server and was transmitted. + pub data_amount: usize, + /// The result of the test. + pub result: Result<(), io::Error>, +} diff --git a/protocols/autonat/src/v2/server/handler.rs b/protocols/autonat/src/v2/server/handler.rs new file mode 100644 index 00000000000..ffdad69c86f --- /dev/null +++ b/protocols/autonat/src/v2/server/handler.rs @@ -0,0 +1,8 @@ +use either::Either; +use libp2p_swarm::dummy; + +pub(crate) mod dial_back; +pub(crate) mod dial_request; + +pub(crate) type Handler = + Either, dial_request::Handler>; diff --git a/protocols/autonat/src/v2/server/handler/dial_back.rs b/protocols/autonat/src/v2/server/handler/dial_back.rs new file mode 100644 index 00000000000..3cacd4ff32b --- /dev/null +++ b/protocols/autonat/src/v2/server/handler/dial_back.rs @@ -0,0 +1,140 @@ +use std::{ + convert::identity, + io, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{AsyncRead, AsyncWrite}; +use futures_bounded::FuturesSet; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_swarm::{ + handler::{ConnectionEvent, DialUpgradeError, FullyNegotiatedOutbound}, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, +}; + +use crate::v2::{ + protocol::{dial_back, recv_dial_back_response}, + DIAL_BACK_PROTOCOL, +}; + +use super::dial_request::{DialBackCommand, DialBackStatus as DialBackRes}; + +pub(crate) type ToBehaviour = io::Result<()>; + +pub struct Handler { + pending_nonce: Option, + requested_substream_nonce: Option, + outbound: FuturesSet, +} + +impl Handler { + pub(crate) fn new(cmd: DialBackCommand) -> Self { + Self { + pending_nonce: Some(cmd), + requested_substream_nonce: None, + outbound: FuturesSet::new(Duration::from_secs(10), 5), + } + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = (); + type ToBehaviour = ToBehaviour; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = ReadyUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Poll::Ready(result) = self.outbound.poll_unpin(cx) { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + result + .map_err(|timeout| io::Error::new(io::ErrorKind::TimedOut, timeout)) + .and_then(identity), + )); + } + if let Some(cmd) = self.pending_nonce.take() { + self.requested_substream_nonce = Some(cmd); + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(DIAL_BACK_PROTOCOL), ()), + }); + } + Poll::Pending + } + + fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, .. + }) => { + if let Some(cmd) = self.requested_substream_nonce.take() { + if self + .outbound + .try_push(perform_dial_back(protocol, cmd)) + .is_err() + { + tracing::warn!("Dial back dropped, too many requests in flight"); + } + } else { + tracing::warn!("received dial back substream without nonce"); + } + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: StreamUpgradeError::NegotiationFailed | StreamUpgradeError::Timeout, + .. + }) => { + if let Some(cmd) = self.requested_substream_nonce.take() { + let _ = cmd.back_channel.send(Err(DialBackRes::DialBackErr)); + } + } + _ => {} + } + } +} + +async fn perform_dial_back( + mut stream: impl AsyncRead + AsyncWrite + Unpin, + DialBackCommand { + nonce, + back_channel, + .. + }: DialBackCommand, +) -> io::Result<()> { + let res = dial_back(&mut stream, nonce) + .await + .map_err(|_| DialBackRes::DialBackErr) + .map(|_| ()); + + let res = match res { + Ok(()) => recv_dial_back_response(stream) + .await + .map_err(|_| DialBackRes::DialBackErr) + .map(|_| ()), + Err(e) => Err(e), + }; + back_channel + .send(res) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "send error"))?; + Ok(()) +} diff --git a/protocols/autonat/src/v2/server/handler/dial_request.rs b/protocols/autonat/src/v2/server/handler/dial_request.rs new file mode 100644 index 00000000000..9a3729d4ccf --- /dev/null +++ b/protocols/autonat/src/v2/server/handler/dial_request.rs @@ -0,0 +1,332 @@ +use std::{ + io, + task::{Context, Poll}, + time::Duration, +}; + +use either::Either; +use futures::{ + channel::{mpsc, oneshot}, + AsyncRead, AsyncWrite, SinkExt, StreamExt, +}; +use futures_bounded::FuturesSet; +use libp2p_core::{ + upgrade::{DeniedUpgrade, ReadyUpgrade}, + Multiaddr, +}; +use libp2p_identity::PeerId; +use libp2p_swarm::{ + handler::{ConnectionEvent, FullyNegotiatedInbound, ListenUpgradeError}, + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, SubstreamProtocol, +}; +use rand_core::RngCore; + +use crate::v2::{ + generated::structs::{mod_DialResponse::ResponseStatus, DialStatus}, + protocol::{Coder, DialDataRequest, DialRequest, DialResponse, Request, Response}, + server::behaviour::Event, + Nonce, DIAL_REQUEST_PROTOCOL, +}; + +#[derive(Debug, PartialEq)] +pub(crate) enum DialBackStatus { + /// Failure during dial + DialErr, + /// Failure during dial back + DialBackErr, +} + +#[derive(Debug)] +pub struct DialBackCommand { + pub(crate) addr: Multiaddr, + pub(crate) nonce: Nonce, + pub(crate) back_channel: oneshot::Sender>, +} + +pub struct Handler { + client_id: PeerId, + observed_multiaddr: Multiaddr, + dial_back_cmd_sender: mpsc::Sender, + dial_back_cmd_receiver: mpsc::Receiver, + inbound: FuturesSet, + rng: R, +} + +impl Handler +where + R: RngCore, +{ + pub(crate) fn new(client_id: PeerId, observed_multiaddr: Multiaddr, rng: R) -> Self { + let (dial_back_cmd_sender, dial_back_cmd_receiver) = mpsc::channel(10); + Self { + client_id, + observed_multiaddr, + dial_back_cmd_sender, + dial_back_cmd_receiver, + inbound: FuturesSet::new(Duration::from_secs(10), 10), + rng, + } + } +} + +impl ConnectionHandler for Handler +where + R: RngCore + Send + Clone + 'static, +{ + type FromBehaviour = void::Void; + type ToBehaviour = Either; + type InboundProtocol = ReadyUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(DIAL_REQUEST_PROTOCOL), ()) + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + loop { + match self.inbound.poll_unpin(cx) { + Poll::Ready(Ok(event)) => { + if let Err(e) = &event.result { + tracing::warn!("inbound request handle failed: {:?}", e); + } + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Right( + event, + ))); + } + Poll::Ready(Err(e)) => { + tracing::warn!("inbound request handle timed out {e:?}"); + } + Poll::Pending => break, + } + } + if let Poll::Ready(Some(cmd)) = self.dial_back_cmd_receiver.poll_next_unpin(cx) { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(Either::Left(cmd))); + } + Poll::Pending + } + + fn on_behaviour_event(&mut self, _event: Self::FromBehaviour) {} + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => { + if self + .inbound + .try_push(handle_request( + protocol, + self.observed_multiaddr.clone(), + self.client_id, + self.dial_back_cmd_sender.clone(), + self.rng.clone(), + )) + .is_err() + { + tracing::warn!( + "failed to push inbound request handler, too many requests in flight" + ); + } + } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { error, .. }) => { + tracing::debug!("inbound request failed: {:?}", error); + } + _ => {} + } + } +} + +enum HandleFail { + InternalError(usize), + RequestRejected, + DialRefused, + DialBack { + idx: usize, + result: Result<(), DialBackStatus>, + }, +} + +impl From for DialResponse { + fn from(value: HandleFail) -> Self { + match value { + HandleFail::InternalError(addr_idx) => Self { + status: ResponseStatus::E_INTERNAL_ERROR, + addr_idx, + dial_status: DialStatus::UNUSED, + }, + HandleFail::RequestRejected => Self { + status: ResponseStatus::E_REQUEST_REJECTED, + addr_idx: 0, + dial_status: DialStatus::UNUSED, + }, + HandleFail::DialRefused => Self { + status: ResponseStatus::E_DIAL_REFUSED, + addr_idx: 0, + dial_status: DialStatus::UNUSED, + }, + HandleFail::DialBack { idx, result } => Self { + status: ResponseStatus::OK, + addr_idx: idx, + dial_status: match result { + Err(DialBackStatus::DialErr) => DialStatus::E_DIAL_ERROR, + Err(DialBackStatus::DialBackErr) => DialStatus::E_DIAL_BACK_ERROR, + Ok(()) => DialStatus::OK, + }, + }, + } + } +} + +async fn handle_request( + stream: impl AsyncRead + AsyncWrite + Unpin, + observed_multiaddr: Multiaddr, + client: PeerId, + dial_back_cmd_sender: mpsc::Sender, + rng: impl RngCore, +) -> Event { + let mut coder = Coder::new(stream); + let mut all_addrs = Vec::new(); + let mut tested_addr_opt = None; + let mut data_amount = 0; + let response = handle_request_internal( + &mut coder, + observed_multiaddr.clone(), + dial_back_cmd_sender, + rng, + &mut all_addrs, + &mut tested_addr_opt, + &mut data_amount, + ) + .await + .unwrap_or_else(|e| e.into()); + let Some(tested_addr) = tested_addr_opt else { + return Event { + all_addrs, + tested_addr: observed_multiaddr, + client, + data_amount, + result: Err(io::Error::new( + io::ErrorKind::Other, + "client is not conformint to protocol. the tested address is not the observed address", + )), + }; + }; + if let Err(e) = coder.send(Response::Dial(response)).await { + return Event { + all_addrs, + tested_addr, + client, + data_amount, + result: Err(e), + }; + } + if let Err(e) = coder.close().await { + return Event { + all_addrs, + tested_addr, + client, + data_amount, + result: Err(e), + }; + } + Event { + all_addrs, + tested_addr, + client, + data_amount, + result: Ok(()), + } +} + +async fn handle_request_internal( + coder: &mut Coder, + observed_multiaddr: Multiaddr, + dial_back_cmd_sender: mpsc::Sender, + mut rng: impl RngCore, + all_addrs: &mut Vec, + tested_addrs: &mut Option, + data_amount: &mut usize, +) -> Result +where + I: AsyncRead + AsyncWrite + Unpin, +{ + let DialRequest { mut addrs, nonce } = match coder + .next() + .await + .map_err(|_| HandleFail::InternalError(0))? + { + Request::Dial(dial_request) => dial_request, + Request::Data(_) => { + return Err(HandleFail::RequestRejected); + } + }; + all_addrs.clone_from(&addrs); + let idx = 0; + let addr = addrs.pop().ok_or(HandleFail::DialRefused)?; + *tested_addrs = Some(addr.clone()); + *data_amount = 0; + if addr != observed_multiaddr { + let dial_data_request = DialDataRequest::from_rng(idx, &mut rng); + let mut rem_data = dial_data_request.num_bytes; + coder + .send(Response::Data(dial_data_request)) + .await + .map_err(|_| HandleFail::InternalError(idx))?; + while rem_data > 0 { + let data_count = match coder + .next() + .await + .map_err(|_e| HandleFail::InternalError(idx))? + { + Request::Dial(_) => { + return Err(HandleFail::RequestRejected); + } + Request::Data(dial_data_response) => dial_data_response.get_data_count(), + }; + rem_data = rem_data.saturating_sub(data_count); + *data_amount += data_count; + } + } + let (back_channel, rx) = oneshot::channel(); + let dial_back_cmd = DialBackCommand { + addr, + nonce, + back_channel, + }; + dial_back_cmd_sender + .clone() + .send(dial_back_cmd) + .await + .map_err(|_| HandleFail::DialBack { + idx, + result: Err(DialBackStatus::DialErr), + })?; + + let dial_back = rx.await.map_err(|_e| HandleFail::InternalError(idx))?; + if let Err(err) = dial_back { + return Err(HandleFail::DialBack { + idx, + result: Err(err), + }); + } + Ok(DialResponse { + status: ResponseStatus::OK, + addr_idx: idx, + dial_status: DialStatus::OK, + }) +} diff --git a/protocols/autonat/tests/autonatv2.rs b/protocols/autonat/tests/autonatv2.rs new file mode 100644 index 00000000000..abd0c4bd8eb --- /dev/null +++ b/protocols/autonat/tests/autonatv2.rs @@ -0,0 +1,568 @@ +use libp2p_autonat::v2::client::{self, Config}; +use libp2p_autonat::v2::server; +use libp2p_core::transport::TransportError; +use libp2p_core::Multiaddr; +use libp2p_swarm::{ + DialError, FromSwarm, NetworkBehaviour, NewExternalAddrCandidate, Swarm, SwarmEvent, +}; +use libp2p_swarm_test::SwarmExt; +use rand_core::OsRng; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::oneshot; +use tracing_subscriber::EnvFilter; + +#[tokio::test] +async fn confirm_successful() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let (mut alice, mut bob) = start_and_connect().await; + + let cor_server_peer = *alice.local_peer_id(); + let cor_client_peer = *bob.local_peer_id(); + let bob_external_addrs = Arc::new(bob.external_addresses().cloned().collect::>()); + let alice_bob_external_addrs = bob_external_addrs.clone(); + + let alice_task = async { + let _ = alice + .wait(|event| match event { + SwarmEvent::NewExternalAddrCandidate { .. } => Some(()), + _ => None, + }) + .await; + + let (dialed_peer_id, dialed_connection_id) = alice + .wait(|event| match event { + SwarmEvent::Dialing { + peer_id, + connection_id, + .. + } => peer_id.map(|peer_id| (peer_id, connection_id)), + _ => None, + }) + .await; + + assert_eq!(dialed_peer_id, cor_client_peer); + + let _ = alice + .wait(|event| match event { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + .. + } if peer_id == dialed_peer_id + && peer_id == cor_client_peer + && connection_id == dialed_connection_id => + { + Some(()) + } + _ => None, + }) + .await; + + let server::Event { + all_addrs, + tested_addr, + client, + data_amount, + result, + } = alice + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedServerEvent::Autonat(status_update)) => { + Some(status_update) + } + _ => None, + }) + .await; + + assert_eq!(tested_addr, bob_external_addrs.first().cloned().unwrap()); + assert_eq!(data_amount, 0); + assert_eq!(client, cor_client_peer); + assert_eq!(&all_addrs[..], &bob_external_addrs[..]); + assert!(result.is_ok(), "Result: {result:?}"); + }; + + let bob_task = async { + bob.wait(|event| match event { + SwarmEvent::NewExternalAddrCandidate { address } => Some(address), + _ => None, + }) + .await; + let incoming_conn_id = bob + .wait(|event| match event { + SwarmEvent::IncomingConnection { connection_id, .. } => Some(connection_id), + _ => None, + }) + .await; + + let _ = bob + .wait(|event| match event { + SwarmEvent::ConnectionEstablished { + connection_id, + peer_id, + .. + } if incoming_conn_id == connection_id && peer_id == cor_server_peer => Some(()), + _ => None, + }) + .await; + + let client::Event { + tested_addr, + bytes_sent, + server, + result, + } = bob + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedClientEvent::Autonat(status_update)) => { + Some(status_update) + } + _ => None, + }) + .await; + assert_eq!( + tested_addr, + alice_bob_external_addrs.first().cloned().unwrap() + ); + assert_eq!(bytes_sent, 0); + assert_eq!(server, cor_server_peer); + assert!(result.is_ok(), "Result is {result:?}"); + }; + + tokio::join!(alice_task, bob_task); +} + +#[tokio::test] +async fn dial_back_to_unsupported_protocol() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let (mut alice, mut bob) = bootstrap().await; + + let alice_peer_id = *alice.local_peer_id(); + + let test_addr: Multiaddr = "/ip4/127.0.0.1/udp/1234/quic/webtransport".parse().unwrap(); + let bob_test_addr = test_addr.clone(); + bob.behaviour_mut() + .autonat + .on_swarm_event(FromSwarm::NewExternalAddrCandidate( + NewExternalAddrCandidate { addr: &test_addr }, + )); + + let (bob_done_tx, bob_done_rx) = oneshot::channel(); + + let alice_task = async { + let (alice_dialing_peer, alice_conn_id) = alice + .wait(|event| match event { + SwarmEvent::Dialing { + peer_id, + connection_id, + } => peer_id.map(|e| (e, connection_id)), + _ => None, + }) + .await; + let mut outgoing_conn_error = alice + .wait(|event| match event { + SwarmEvent::OutgoingConnectionError { + connection_id, + peer_id: Some(peer_id), + error: DialError::Transport(transport_errs), + } if connection_id == alice_conn_id && alice_dialing_peer == peer_id => { + Some(transport_errs) + } + _ => None, + }) + .await; + if let Some((multiaddr, TransportError::MultiaddrNotSupported(not_supported_addr))) = + outgoing_conn_error.pop() + { + assert_eq!( + multiaddr, + test_addr.clone().with_p2p(alice_dialing_peer).unwrap() + ); + assert_eq!(not_supported_addr, multiaddr,); + } else { + panic!("Peers are empty"); + } + assert_eq!(outgoing_conn_error.len(), 0); + let data_amount = alice + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedServerEvent::Autonat(server::Event { + all_addrs, + tested_addr, + client, + data_amount, + result: Ok(()), + })) if all_addrs == vec![test_addr.clone()] + && tested_addr == test_addr.clone() + && client == alice_dialing_peer => + { + Some(data_amount) + } + _ => None, + }) + .await; + + let handler = tokio::spawn(async move { + alice.loop_on_next().await; + }); + let _ = bob_done_rx.await; + handler.abort(); + data_amount + }; + + let bob_task = async { + let data_amount = bob + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedClientEvent::Autonat(client::Event { + tested_addr, + bytes_sent, + server, + result: Err(_), + })) if server == alice_peer_id && tested_addr == bob_test_addr => Some(bytes_sent), + _ => None, + }) + .await; + bob_done_tx.send(()).unwrap(); + data_amount + }; + let (alice_amount, bob_amount) = tokio::join!(alice_task, bob_task); + assert_eq!(alice_amount, bob_amount); +} + +#[tokio::test] +async fn dial_back_to_non_libp2p() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + let (mut alice, mut bob) = bootstrap().await; + let alice_peer_id = *alice.local_peer_id(); + + for addr_str in ["/ip4/169.150.247.38/tcp/32", "/ip6/::1/tcp/1000"] { + let addr: Multiaddr = addr_str.parse().unwrap(); + let bob_addr = addr.clone(); + bob.behaviour_mut() + .autonat + .on_swarm_event(FromSwarm::NewExternalAddrCandidate( + NewExternalAddrCandidate { addr: &addr }, + )); + + let alice_task = async { + let (alice_dialing_peer, alice_conn_id) = alice + .wait(|event| match event { + SwarmEvent::Dialing { + peer_id, + connection_id, + } => peer_id.map(|p| (p, connection_id)), + _ => None, + }) + .await; + let mut outgoing_conn_error = alice + .wait(|event| match event { + SwarmEvent::OutgoingConnectionError { + connection_id, + peer_id: Some(peer_id), + error: DialError::Transport(peers), + } if connection_id == alice_conn_id && peer_id == alice_dialing_peer => { + Some(peers) + } + _ => None, + }) + .await; + + if let Some((multiaddr, TransportError::Other(o))) = outgoing_conn_error.pop() { + assert_eq!( + multiaddr, + addr.clone().with_p2p(alice_dialing_peer).unwrap() + ); + let error_string = o.to_string(); + assert!( + error_string.contains("Connection refused"), + "Correct error string: {error_string} for {addr_str}" + ); + } else { + panic!("No outgoing connection errors"); + } + + alice + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedServerEvent::Autonat(server::Event { + all_addrs, + tested_addr, + client, + data_amount, + result: Ok(()), + })) if all_addrs == vec![addr.clone()] + && tested_addr == addr + && alice_dialing_peer == client => + { + Some(data_amount) + } + _ => None, + }) + .await + }; + let bob_task = async { + bob.wait(|event| match event { + SwarmEvent::Behaviour(CombinedClientEvent::Autonat(client::Event { + tested_addr, + bytes_sent, + server, + result: Err(_), + })) if tested_addr == bob_addr && server == alice_peer_id => Some(bytes_sent), + _ => None, + }) + .await + }; + + let (alice_bytes_sent, bob_bytes_sent) = tokio::join!(alice_task, bob_task); + assert_eq!(alice_bytes_sent, bob_bytes_sent); + bob.behaviour_mut().autonat.validate_addr(&addr); + } +} + +#[tokio::test] +async fn dial_back_to_not_supporting() { + let _ = tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .try_init(); + + let (mut alice, mut bob) = bootstrap().await; + let alice_peer_id = *alice.local_peer_id(); + + let (bob_done_tx, bob_done_rx) = oneshot::channel(); + + let hannes = new_dummy().await; + let hannes_peer_id = *hannes.local_peer_id(); + let unreachable_address = hannes.external_addresses().next().unwrap().clone(); + let bob_unreachable_address = unreachable_address.clone(); + bob.behaviour_mut() + .autonat + .on_swarm_event(FromSwarm::NewExternalAddrCandidate( + NewExternalAddrCandidate { + addr: &unreachable_address, + }, + )); + + let handler = tokio::spawn(async { hannes.loop_on_next().await }); + + let alice_task = async { + let (alice_dialing_peer, alice_conn_id) = alice + .wait(|event| match event { + SwarmEvent::Dialing { + peer_id, + connection_id, + } => peer_id.map(|p| (p, connection_id)), + _ => None, + }) + .await; + alice + .wait(|event| match event { + SwarmEvent::OutgoingConnectionError { + connection_id, + peer_id: Some(peer_id), + error: DialError::WrongPeerId { obtained, .. }, + } if connection_id == alice_conn_id + && peer_id == alice_dialing_peer + && obtained == hannes_peer_id => + { + Some(()) + } + _ => None, + }) + .await; + + let data_amount = alice + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedServerEvent::Autonat(server::Event { + all_addrs, + tested_addr, + client, + data_amount, + result: Ok(()), + })) if all_addrs == vec![unreachable_address.clone()] + && tested_addr == unreachable_address + && alice_dialing_peer == client => + { + Some(data_amount) + } + _ => None, + }) + .await; + tokio::select! { + _ = bob_done_rx => { + data_amount + } + _ = alice.loop_on_next() => { + unreachable!(); + } + } + }; + + let bob_task = async { + let bytes_sent = bob + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedClientEvent::Autonat(client::Event { + tested_addr, + bytes_sent, + server, + result: Err(_), + })) if tested_addr == bob_unreachable_address && server == alice_peer_id => { + Some(bytes_sent) + } + _ => None, + }) + .await; + bob_done_tx.send(()).unwrap(); + bytes_sent + }; + + let (alice_bytes_sent, bob_bytes_sent) = tokio::join!(alice_task, bob_task); + assert_eq!(alice_bytes_sent, bob_bytes_sent); + handler.abort(); +} + +async fn new_server() -> Swarm { + let mut node = Swarm::new_ephemeral(|identity| CombinedServer { + autonat: libp2p_autonat::v2::server::Behaviour::default(), + identify: libp2p_identify::Behaviour::new(libp2p_identify::Config::new( + "/libp2p-test/1.0.0".into(), + identity.public().clone(), + )), + }); + node.listen().with_tcp_addr_external().await; + + node +} + +async fn new_client() -> Swarm { + let mut node = Swarm::new_ephemeral(|identity| CombinedClient { + autonat: libp2p_autonat::v2::client::Behaviour::new( + OsRng, + Config::default().with_probe_interval(Duration::from_millis(100)), + ), + identify: libp2p_identify::Behaviour::new(libp2p_identify::Config::new( + "/libp2p-test/1.0.0".into(), + identity.public().clone(), + )), + }); + node.listen().with_tcp_addr_external().await; + node +} + +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct CombinedServer { + autonat: libp2p_autonat::v2::server::Behaviour, + identify: libp2p_identify::Behaviour, +} + +#[derive(libp2p_swarm::NetworkBehaviour)] +#[behaviour(prelude = "libp2p_swarm::derive_prelude")] +struct CombinedClient { + autonat: libp2p_autonat::v2::client::Behaviour, + identify: libp2p_identify::Behaviour, +} + +async fn new_dummy() -> Swarm { + let mut node = Swarm::new_ephemeral(|identity| { + libp2p_identify::Behaviour::new(libp2p_identify::Config::new( + "/libp2p-test/1.0.0".into(), + identity.public().clone(), + )) + }); + node.listen().with_tcp_addr_external().await; + node +} + +async fn start_and_connect() -> (Swarm, Swarm) { + let mut alice = new_server().await; + let mut bob = new_client().await; + + bob.connect(&mut alice).await; + (alice, bob) +} + +async fn bootstrap() -> (Swarm, Swarm) { + let (mut alice, mut bob) = start_and_connect().await; + + let cor_server_peer = *alice.local_peer_id(); + let cor_client_peer = *bob.local_peer_id(); + + let alice_task = async { + let _ = alice + .wait(|event| match event { + SwarmEvent::NewExternalAddrCandidate { .. } => Some(()), + _ => None, + }) + .await; + + let (dialed_peer_id, dialed_connection_id) = alice + .wait(|event| match event { + SwarmEvent::Dialing { + peer_id, + connection_id, + .. + } => peer_id.map(|peer_id| (peer_id, connection_id)), + _ => None, + }) + .await; + + let _ = alice + .wait(|event| match event { + SwarmEvent::ConnectionEstablished { + peer_id, + connection_id, + .. + } if peer_id == dialed_peer_id + && peer_id == cor_client_peer + && connection_id == dialed_connection_id => + { + Some(()) + } + _ => None, + }) + .await; + + alice + .wait(|event| match event { + SwarmEvent::Behaviour(CombinedServerEvent::Autonat(_)) => Some(()), + _ => None, + }) + .await; + }; + + let bob_task = async { + bob.wait(|event| match event { + SwarmEvent::NewExternalAddrCandidate { address } => Some(address), + _ => None, + }) + .await; + let incoming_conn_id = bob + .wait(|event| match event { + SwarmEvent::IncomingConnection { connection_id, .. } => Some(connection_id), + _ => None, + }) + .await; + + let _ = bob + .wait(|event| match event { + SwarmEvent::ConnectionEstablished { + connection_id, + peer_id, + .. + } if incoming_conn_id == connection_id && peer_id == cor_server_peer => Some(()), + _ => None, + }) + .await; + + bob.wait(|event| match event { + SwarmEvent::Behaviour(CombinedClientEvent::Autonat(_)) => Some(()), + _ => None, + }) + .await; + }; + + tokio::join!(alice_task, bob_task); + (alice, bob) +}