From a15290725d0fa68ab1714f2925e051e43b0c0a92 Mon Sep 17 00:00:00 2001 From: SimaoMoreira5228 Date: Wed, 8 Oct 2025 20:28:43 +0100 Subject: [PATCH 01/25] Adds WebTransport over HTTP/3 support Enables handling of WebTransport sessions, uni- and bidirectional streams, and updates dependencies and Bazel integration for WebTransport crate. --- crates/http/Cargo.toml | 8 + crates/http/README.md | 4 +- crates/http/examples/webtransport_tls.rs | 117 ++++++ crates/http/src/backend/h3.rs | 362 +++++++++++++++--- crates/http/src/lib.rs | 1 - misc/toolchains/rust.MODULE.bazel | 1 + vendor/cargo/BUILD.bazel | 12 + vendor/cargo/BUILD.h3-0.0.8.bazel | 1 + vendor/cargo/BUILD.h3-datagram-0.0.2.bazel | 68 ++++ vendor/cargo/BUILD.h3-quinn-0.0.10.bazel | 2 + .../cargo/BUILD.h3-webtransport-0.1.2.bazel | 73 ++++ vendor/cargo/defs.bzl | 33 ++ 12 files changed, 633 insertions(+), 49 deletions(-) create mode 100644 crates/http/examples/webtransport_tls.rs create mode 100644 vendor/cargo/BUILD.h3-datagram-0.0.2.bazel create mode 100644 vendor/cargo/BUILD.h3-webtransport-0.1.2.bazel diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 305a7169e..aeca4e363 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -28,6 +28,11 @@ name = "scuffle-http-axum" path = "examples/axum.rs" required-features = ["default", "tls-rustls", "http3", "tower", "tracing"] +[[example]] +name = "scuffle-http-webtransport-tls" +path = "examples/webtransport_tls.rs" +required-features = ["default", "tls-rustls", "http3", "webtransport", "tracing"] + [features] default = ["http1", "http2", "tower"] ## Enables tracing support @@ -56,6 +61,8 @@ http2 = [ ] ## Enables http3 support http3 = ["dep:quinn", "dep:h3-quinn", "dep:h3"] +## Enables WebTransport over HTTP/3 support (requires http3) +webtransport = ["http3", "dep:h3-webtransport", "h3-quinn?/datagram"] ## Enables tls via rustls tls-rustls = ["dep:tokio-rustls"] ## Alias for ["http3", "tls-rustls"] @@ -90,6 +97,7 @@ libc = { default-features = false, optional = true, version = "0.2" } h3 = { default-features = false, optional = true, version = "0.0.8" } h3-quinn = { default-features = false, optional = true, version = "0.0.10" } quinn = { default-features = false, features = ["platform-verifier", "runtime-tokio", "rustls-aws-lc-rs"], optional = true, version = "0.11" } +h3-webtransport = { default-features = false, optional = true, version = "0.1" } # TLS tokio-rustls = { default-features = false, features = ["aws_lc_rs", "tls12"], optional = true, version = "0.26" } diff --git a/crates/http/README.md b/crates/http/README.md index ccb96869a..b51965b33 100644 --- a/crates/http/README.md +++ b/crates/http/README.md @@ -3,7 +3,7 @@ # scuffle-http -> [!WARNING] +> [!WARNING] > This crate is under active development and may not be stable. @@ -32,6 +32,7 @@ See the [changelog](./CHANGELOG.md) for a full release history. * **`http1`** *(enabled by default)* — Enables http1 support * **`http2`** *(enabled by default)* — Enabled http2 support * **`http3`** — Enables http3 support +* **`webtransport`** — Enables WebTransport over HTTP/3 support (requires http3) * **`tls-rustls`** — Enables tls via rustls * **`http3-tls-rustls`** — Alias for \[“http3”, “tls-rustls”\] * **`tower`** *(enabled by default)* — Enables tower service support @@ -68,7 +69,6 @@ scuffle_http::HttpServer::builder() #### Missing Features -* HTTP/3 webtransport support * Upgrading to websocket connections from HTTP/3 connections (this is usually done via HTTP/1.1 anyway) ### License diff --git a/crates/http/examples/webtransport_tls.rs b/crates/http/examples/webtransport_tls.rs new file mode 100644 index 000000000..f953b3744 --- /dev/null +++ b/crates/http/examples/webtransport_tls.rs @@ -0,0 +1,117 @@ +use std::convert::Infallible; +use std::net::SocketAddr; + +use http::{Method, StatusCode}; +use scuffle_http as http_srv; +use scuffle_http::service::{fn_http_service, service_clone_factory}; + +fn assets_path(item: &str) -> std::path::PathBuf { + if let Some(env) = std::env::var_os("ASSETS_DIR") { + std::path::PathBuf::from(env).join(item) + } else { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(format!("../../assets/{item}")) + } +} + +fn rustls_config() -> rustls::ServerConfig { + static ONCE: std::sync::Once = std::sync::Once::new(); + ONCE.call_once(|| { + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("failed to install aws lc provider"); + }); + + let certfile = std::fs::File::open(assets_path("cert.pem")).expect("cert not found"); + let certs = rustls_pemfile::certs(&mut std::io::BufReader::new(certfile)) + .collect::, _>>() + .expect("failed to load certs"); + let keyfile = std::fs::File::open(assets_path("key.pem")).expect("key not found"); + let key = rustls_pemfile::private_key(&mut std::io::BufReader::new(keyfile)) + .expect("failed to load key") + .expect("no key found"); + + rustls::ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(certs, key) + .expect("failed to build config") +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let addr: SocketAddr = "[::]:4443".parse().unwrap(); + + let service = fn_http_service(|req: http_srv::IncomingRequest| async move { + if req.uri().path() == "/" && req.method() == Method::GET { + let html = r#" + + +

WebTransport demo

+ + + + "#; + let resp = http::Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "text/html; charset=utf-8") + .body(html.to_string()) + .unwrap(); + Ok::<_, Infallible>(resp) + } else if req.uri().path() == "/wt" && req.method() == Method::CONNECT { + Ok::<_, Infallible>(http::Response::builder().status(StatusCode::NO_CONTENT).body(String::new()).unwrap()) + } else { + Ok::<_, Infallible>(http::Response::builder().status(StatusCode::NOT_FOUND).body(String::new()).unwrap()) + } + }); + + let server = http_srv::HttpServer::builder() + .service_factory(service_clone_factory(service)) + .bind(addr) + .rustls_config(rustls_config()) + .enable_http3(true) + .build(); + + tracing::info!(%addr, "serving WebTransport demo over TLS (HTTP/3)"); + if let Err(e) = server.run().await { + eprintln!("server error: {e}"); + } +} diff --git a/crates/http/src/backend/h3.rs b/crates/http/src/backend/h3.rs index 8f669ba64..9e472bfb5 100644 --- a/crates/http/src/backend/h3.rs +++ b/crates/http/src/backend/h3.rs @@ -9,6 +9,8 @@ use scuffle_context::ContextFutExt; #[cfg(feature = "tracing")] use tracing::Instrument; use utils::copy_response_body; +#[cfg(feature = "webtransport")] +use {h3::ext::Protocol, h3_webtransport as h3wt}; use crate::error::HttpError; use crate::service::{HttpService, HttpServiceFactory}; @@ -82,23 +84,24 @@ where let socket = socket.try_clone().expect("failed to clone socket"); let runtime = Arc::clone(&runtime); - let worker_fut = async move { - let endpoint = h3_quinn::quinn::Endpoint::new( - h3_quinn::quinn::EndpointConfig::default(), - Some(server_config), - socket, - runtime, - )?; + let worker_fut = + async move { + let endpoint = h3_quinn::quinn::Endpoint::new( + h3_quinn::quinn::EndpointConfig::default(), + Some(server_config), + socket, + runtime, + )?; - #[cfg(feature = "tracing")] - tracing::trace!("waiting for connections"); + #[cfg(feature = "tracing")] + tracing::trace!("waiting for connections"); - while let Some(Some(new_conn)) = endpoint.accept().with_context(&ctx).await { - let mut service_factory = service_factory.clone(); - let ctx = ctx.clone(); + while let Some(Some(new_conn)) = endpoint.accept().with_context(&ctx).await { + let mut service_factory = service_factory.clone(); + let ctx = ctx.clone(); - tokio::spawn(async move { - let _res: Result<_, HttpError> = async move { + tokio::spawn(async move { + let _res: Result<_, HttpError> = async move { let Some(conn) = new_conn.with_context(&ctx).await.transpose()? else { #[cfg(feature = "tracing")] tracing::trace!("context done while accepting connection"); @@ -130,7 +133,7 @@ where } // make a new service for this connection - let http_service = service_factory + let mut http_service = service_factory .new_service(addr) .await .map_err(|e| HttpError::ServiceFactoryError(e))?; @@ -138,6 +141,7 @@ where loop { match h3_conn.accept().with_context(&ctx).await { Some(Ok(Some(resolver))) => { + // Resolve the request let (req, stream) = match resolver.resolve_request().await { Ok(r) => r, Err(_err) => { @@ -150,6 +154,29 @@ where #[cfg(feature = "tracing")] tracing::debug!(method = %req.method(), uri = %req.uri(), "received request"); + // Check if this is a WebTransport CONNECT request + #[cfg(feature = "webtransport")] + if req.extensions().get::() == Some(&Protocol::WEB_TRANSPORT) + && req.method() == http::Method::CONNECT + { + #[cfg(feature = "tracing")] + tracing::debug!("starting WebTransport session"); + + match drive_webtransport_session::( + req, stream, h3_conn, ctx, &mut http_service + ).await { + Ok(_) => { + #[cfg(feature = "tracing")] + tracing::debug!("WebTransport session ended"); + } + Err(err) => { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "WebTransport session error"); + } + } + break; + } + let (mut send, recv) = stream.split(); let size_hint = req @@ -161,30 +188,32 @@ where req.extensions_mut().extend(extra_extensions.clone()); - let ctx = ctx.clone(); - let mut http_service = http_service.clone(); - tokio::spawn(async move { - let _res: Result<_, HttpError> = async move { - let resp = http_service - .call(req) - .await - .map_err(|e| HttpError::ServiceError(e))?; - let (parts, body) = resp.into_parts(); + tokio::spawn({ + let ctx = ctx.clone(); + let mut http_service = http_service.clone(); + async move { + let _res: Result<_, HttpError> = async move { + let resp = http_service + .call(req) + .await + .map_err(|e| HttpError::ServiceError(e))?; + let (parts, body) = resp.into_parts(); - send.send_response(http::Response::from_parts(parts, ())).await?; - copy_response_body(send, body).await?; + send.send_response(http::Response::from_parts(parts, ())).await?; + copy_response_body(send, body).await?; - Ok(()) - } - .await; + Ok(()) + } + .await; - #[cfg(feature = "tracing")] - if let Err(e) = _res { - tracing::warn!(err = %e, "error handling request"); - } + #[cfg(feature = "tracing")] + if let Err(e) = _res { + tracing::warn!(err = %e, "error handling request"); + } - // This moves the context into the async block because it is dropped here - drop(ctx); + // This moves the context into the async block because it is dropped here + drop(ctx); + } }); } // indicating no more streams to be received @@ -214,19 +243,19 @@ where } .await; - #[cfg(feature = "tracing")] - if let Err(err) = _res { - tracing::warn!(err = %err, "error handling connection"); - } - }); - } + #[cfg(feature = "tracing")] + if let Err(err) = _res { + tracing::warn!(err = %err, "error handling connection"); + } + }); + } - // shut down gracefully - // wait for connections to be closed before exiting - endpoint.wait_idle().await; + // shut down gracefully + // wait for connections to be closed before exiting + endpoint.wait_idle().await; - Ok::<_, crate::error::HttpError>(()) - }; + Ok::<_, crate::error::HttpError>(()) + }; #[cfg(feature = "tracing")] let worker_fut = worker_fut.instrument(tracing::trace_span!("worker", n = _n)); @@ -248,3 +277,244 @@ where Ok(()) } } + +#[cfg(feature = "webtransport")] +async fn drive_webtransport_session( + req: http::Request<()>, + stream: h3::server::RequestStream, bytes::Bytes>, + h3_conn: h3::server::Connection, + ctx: scuffle_context::Context, + http_service: &mut F::Service, +) -> Result<(), crate::error::HttpError> +where + F: HttpServiceFactory + Clone + Send + 'static, + F::Error: std::error::Error + Send, + F::Service: Clone + Send + 'static, + ::Error: std::error::Error + Send + Sync, + ::ResBody: Send, + <::ResBody as http_body::Body>::Data: Send, + <::ResBody as http_body::Body>::Error: std::error::Error + Send + Sync, +{ + // Accept the WebTransport session + let session = std::sync::Arc::new(h3wt::server::WebTransportSession::accept(req, stream, h3_conn).await?); + + // Spawn task to handle unidirectional streams + let uni_handle = tokio::spawn({ + let ctx = ctx.clone(); + let session = session.clone(); + async move { + loop { + match session.accept_uni().with_context(&ctx).await { + Some(Ok(Some((sid, stream)))) => { + #[cfg(feature = "tracing")] + tracing::debug!(session_id = ?sid, "received WebTransport uni stream"); + + tokio::spawn({ + let ctx = ctx.clone(); + async move { + if let Err(err) = handle_webtransport_uni_stream(stream, ctx).await { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "error handling WebTransport uni stream"); + } + } + }); + } + Some(Ok(None)) => break, + Some(Err(err)) => { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "WebTransport uni stream accept error"); + break; + } + None => break, + } + } + } + }); + + // Handle bidirectional streams and requests + loop { + match session.accept_bi().with_context(&ctx).await { + Some(Ok(Some(h3wt::server::AcceptedBi::Request(req, stream)))) => { + let (mut send, recv) = stream.split(); + let size_hint = req + .headers() + .get(http::header::CONTENT_LENGTH) + .and_then(|len| len.to_str().ok().and_then(|x| x.parse().ok())); + let body = QuicIncomingBody::new(recv, size_hint); + let req = req.map(|_| crate::body::IncomingBody::from(body)); + + let resp = match http_service.call(req).await { + Ok(r) => r, + Err(_e) => { + #[cfg(feature = "tracing")] + tracing::warn!(err = %_e, "service error in WebTransport request"); + let _ = send + .send_response( + http::Response::builder() + .status(http::StatusCode::INTERNAL_SERVER_ERROR) + .body(()) + .unwrap(), + ) + .await; + continue; + } + }; + + let (parts, body) = resp.into_parts(); + if let Err(err) = send.send_response(http::Response::from_parts(parts, ())).await { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "failed to send WebTransport response headers"); + continue; + } + if let Err(err) = copy_response_body::<_, F>(send, body).await { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "failed to send WebTransport response body"); + } + } + Some(Ok(Some(h3wt::server::AcceptedBi::BidiStream(sid, bidi)))) => { + // Handle raw bidirectional WebTransport stream + #[cfg(feature = "tracing")] + tracing::debug!(session_id = ?sid, "received WebTransport bidi stream"); + + tokio::spawn({ + let ctx = ctx.clone(); + async move { + if let Err(err) = handle_webtransport_bidi_stream(bidi, ctx).await { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "error handling WebTransport bidi stream"); + } + } + }); + } + Some(Ok(None)) => break, + Some(Err(err)) => { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "WebTransport session error"); + break; + } + None => break, + } + } + + // Wait for uni stream handler to complete + let _ = uni_handle.await; + + Ok(()) +} + +#[cfg(feature = "webtransport")] +async fn handle_webtransport_uni_stream( + mut stream: h3wt::stream::RecvStream, + ctx: scuffle_context::Context, +) -> Result<(), Box> { + use bytes::Buf; + use h3::quic::RecvStream; + + let mut buffer = Vec::new(); + loop { + match std::future::poll_fn(|cx| stream.poll_data(cx)).with_context(&ctx).await { + Some(Ok(Some(mut chunk))) => { + let chunk_size = chunk.remaining(); + let chunk_bytes = chunk.copy_to_bytes(chunk_size); + buffer.extend_from_slice(&chunk_bytes); + + #[cfg(feature = "tracing")] + tracing::debug!( + bytes = chunk_size, + total = buffer.len(), + "received data on WebTransport uni stream" + ); + } + Some(Ok(None)) => { + #[cfg(feature = "tracing")] + tracing::debug!( + total_bytes = buffer.len(), + data = ?String::from_utf8_lossy(&buffer), + "WebTransport uni stream finished" + ); + break; + } + Some(Err(err)) => { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "error reading from WebTransport uni stream"); + return Err(err.into()); + } + None => { + #[cfg(feature = "tracing")] + tracing::trace!("context done while reading WebTransport uni stream"); + return Ok(()); + } + } + } + + Ok(()) +} + +#[cfg(feature = "webtransport")] +async fn handle_webtransport_bidi_stream( + mut stream: h3wt::stream::BidiStream, bytes::Bytes>, + ctx: scuffle_context::Context, +) -> Result<(), Box> { + use bytes::Buf; + use h3::quic::RecvStream; + + // Read data from the receive side + let mut buffer = Vec::new(); + loop { + match std::future::poll_fn(|cx| stream.poll_data(cx)).with_context(&ctx).await { + Some(Ok(Some(mut chunk))) => { + let chunk_size = chunk.remaining(); + let chunk_bytes = chunk.copy_to_bytes(chunk_size); + buffer.extend_from_slice(&chunk_bytes); + + #[cfg(feature = "tracing")] + tracing::debug!( + bytes = chunk_size, + total = buffer.len(), + "received data on WebTransport bidi stream" + ); + } + Some(Ok(None)) => { + // Stream finished + #[cfg(feature = "tracing")] + tracing::debug!(total_bytes = buffer.len(), "WebTransport bidi stream finished reading"); + break; + } + Some(Err(err)) => { + #[cfg(feature = "tracing")] + tracing::warn!(err = %err, "error reading from WebTransport bidi stream"); + return Err(err.into()); + } + None => { + // Context cancelled + #[cfg(feature = "tracing")] + tracing::trace!("context done while reading WebTransport bidi stream"); + return Ok(()); + } + } + } + + // Echo back the received data + if !buffer.is_empty() { + #[cfg(feature = "tracing")] + tracing::debug!(bytes = buffer.len(), "echoing data back on WebTransport bidi stream"); + + let response_msg = format!("Echo: received {} bytes", buffer.len()); + let response_bytes = bytes::Bytes::from(response_msg); + + // Send the response using SendStreamUnframed + use h3::quic::{SendStream, SendStreamUnframed}; + let mut bytes_buf = response_bytes.clone(); + while bytes_buf.has_remaining() { + let written = std::future::poll_fn(|cx| stream.poll_send(cx, &mut bytes_buf)).await?; + if written == 0 { + break; + } + } + std::future::poll_fn(|cx| stream.poll_finish(cx)).await?; + #[cfg(feature = "tracing")] + tracing::debug!("successfully echoed data on WebTransport bidi stream"); + } + + Ok(()) +} diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 852f0188d..05d018eac 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -43,7 +43,6 @@ //! //! ### Missing Features //! -//! - HTTP/3 webtransport support //! - Upgrading to websocket connections from HTTP/3 connections (this is usually done via HTTP/1.1 anyway) //! //! ## License diff --git a/misc/toolchains/rust.MODULE.bazel b/misc/toolchains/rust.MODULE.bazel index fd3088392..09e74c5be 100644 --- a/misc/toolchains/rust.MODULE.bazel +++ b/misc/toolchains/rust.MODULE.bazel @@ -223,6 +223,7 @@ use_repo( "cargo_vendor__guppy-0.17.22", "cargo_vendor__h3-0.0.8", "cargo_vendor__h3-quinn-0.0.10", + "cargo_vendor__h3-webtransport-0.1.2", "cargo_vendor__heck-0.5.0", "cargo_vendor__hex-0.4.3", "cargo_vendor__hmac-0.12.1", diff --git a/vendor/cargo/BUILD.bazel b/vendor/cargo/BUILD.bazel index e16a41ebc..27ec34363 100644 --- a/vendor/cargo/BUILD.bazel +++ b/vendor/cargo/BUILD.bazel @@ -621,6 +621,18 @@ transition_alias_opt( tags = ["manual"], ) +transition_alias_opt( + name = "h3-webtransport-0.1.2", + actual = "@cargo_vendor__h3-webtransport-0.1.2//:h3_webtransport", + tags = ["manual"], +) + +transition_alias_opt( + name = "h3-webtransport", + actual = "@cargo_vendor__h3-webtransport-0.1.2//:h3_webtransport", + tags = ["manual"], +) + transition_alias_opt( name = "heck-0.5.0", actual = "@cargo_vendor__heck-0.5.0//:heck", diff --git a/vendor/cargo/BUILD.h3-0.0.8.bazel b/vendor/cargo/BUILD.h3-0.0.8.bazel index ac58139a3..a3daab49e 100644 --- a/vendor/cargo/BUILD.h3-0.0.8.bazel +++ b/vendor/cargo/BUILD.h3-0.0.8.bazel @@ -35,6 +35,7 @@ rust_library( ], ), crate_features = [ + "i-implement-a-third-party-backend-and-opt-into-breaking-changes", "tracing", ], crate_root = "src/lib.rs", diff --git a/vendor/cargo/BUILD.h3-datagram-0.0.2.bazel b/vendor/cargo/BUILD.h3-datagram-0.0.2.bazel new file mode 100644 index 000000000..b05ec0f3b --- /dev/null +++ b/vendor/cargo/BUILD.h3-datagram-0.0.2.bazel @@ -0,0 +1,68 @@ +############################################################################### +# @generated +# DO NOT MODIFY: This file is auto-generated by a crate_universe tool. To +# regenerate this file, run the following: +# +# bazel run @@//vendor:cargo_vendor +############################################################################### + +load("@rules_rust//cargo:defs.bzl", "cargo_toml_env_vars") +load("@rules_rust//rust:defs.bzl", "rust_library") + +package(default_visibility = ["//visibility:public"]) + +cargo_toml_env_vars( + name = "cargo_toml_env_vars", + src = "Cargo.toml", +) + +rust_library( + name = "h3_datagram", + srcs = glob( + include = ["**/*.rs"], + allow_empty = True, + ), + compile_data = glob( + include = ["**"], + allow_empty = True, + exclude = [ + "**/* *", + ".tmp_git_root/**/*", + "BUILD", + "BUILD.bazel", + "WORKSPACE", + "WORKSPACE.bazel", + ], + ), + crate_root = "src/lib.rs", + edition = "2021", + rustc_env_files = [ + ":cargo_toml_env_vars", + ], + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-bazel", + "crate-name=h3-datagram", + "manual", + "noclippy", + "norustfmt", + ], + target_compatible_with = select({ + "@rules_rust//rust/platform:aarch64-apple-darwin": [], + "@rules_rust//rust/platform:aarch64-pc-windows-msvc": [], + "@rules_rust//rust/platform:aarch64-unknown-linux-gnu": [], + "@rules_rust//rust/platform:wasm32-unknown-unknown": [], + "@rules_rust//rust/platform:x86_64-apple-darwin": [], + "@rules_rust//rust/platform:x86_64-pc-windows-msvc": [], + "@rules_rust//rust/platform:x86_64-unknown-linux-gnu": [], + "//conditions:default": ["@platforms//:incompatible"], + }), + version = "0.0.2", + deps = [ + "@cargo_vendor__bytes-1.10.1//:bytes", + "@cargo_vendor__h3-0.0.8//:h3", + "@cargo_vendor__pin-project-lite-0.2.16//:pin_project_lite", + ], +) diff --git a/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel b/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel index 6b7e328f5..adafe61dd 100644 --- a/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel +++ b/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel @@ -35,6 +35,7 @@ rust_library( ], ), crate_features = [ + "datagram", "tracing", ], crate_root = "src/lib.rs", @@ -67,6 +68,7 @@ rust_library( "@cargo_vendor__bytes-1.10.1//:bytes", "@cargo_vendor__futures-0.3.31//:futures", "@cargo_vendor__h3-0.0.8//:h3", + "@cargo_vendor__h3-datagram-0.0.2//:h3_datagram", "@cargo_vendor__quinn-0.11.9//:quinn", "@cargo_vendor__tokio-1.47.1//:tokio", "@cargo_vendor__tokio-util-0.7.16//:tokio_util", diff --git a/vendor/cargo/BUILD.h3-webtransport-0.1.2.bazel b/vendor/cargo/BUILD.h3-webtransport-0.1.2.bazel new file mode 100644 index 000000000..eb095280d --- /dev/null +++ b/vendor/cargo/BUILD.h3-webtransport-0.1.2.bazel @@ -0,0 +1,73 @@ +############################################################################### +# @generated +# DO NOT MODIFY: This file is auto-generated by a crate_universe tool. To +# regenerate this file, run the following: +# +# bazel run @@//vendor:cargo_vendor +############################################################################### + +load("@rules_rust//cargo:defs.bzl", "cargo_toml_env_vars") +load("@rules_rust//rust:defs.bzl", "rust_library") + +package(default_visibility = ["//visibility:public"]) + +cargo_toml_env_vars( + name = "cargo_toml_env_vars", + src = "Cargo.toml", +) + +rust_library( + name = "h3_webtransport", + srcs = glob( + include = ["**/*.rs"], + allow_empty = True, + ), + compile_data = glob( + include = ["**"], + allow_empty = True, + exclude = [ + "**/* *", + ".tmp_git_root/**/*", + "BUILD", + "BUILD.bazel", + "WORKSPACE", + "WORKSPACE.bazel", + ], + ), + crate_root = "src/lib.rs", + edition = "2021", + rustc_env_files = [ + ":cargo_toml_env_vars", + ], + rustc_flags = [ + "--cap-lints=allow", + ], + tags = [ + "cargo-bazel", + "crate-name=h3-webtransport", + "manual", + "noclippy", + "norustfmt", + ], + target_compatible_with = select({ + "@rules_rust//rust/platform:aarch64-apple-darwin": [], + "@rules_rust//rust/platform:aarch64-pc-windows-msvc": [], + "@rules_rust//rust/platform:aarch64-unknown-linux-gnu": [], + "@rules_rust//rust/platform:wasm32-unknown-unknown": [], + "@rules_rust//rust/platform:x86_64-apple-darwin": [], + "@rules_rust//rust/platform:x86_64-pc-windows-msvc": [], + "@rules_rust//rust/platform:x86_64-unknown-linux-gnu": [], + "//conditions:default": ["@platforms//:incompatible"], + }), + version = "0.1.2", + deps = [ + "@cargo_vendor__bytes-1.10.1//:bytes", + "@cargo_vendor__futures-util-0.3.31//:futures_util", + "@cargo_vendor__h3-0.0.8//:h3", + "@cargo_vendor__h3-datagram-0.0.2//:h3_datagram", + "@cargo_vendor__http-1.3.1//:http", + "@cargo_vendor__pin-project-lite-0.2.16//:pin_project_lite", + "@cargo_vendor__tokio-1.47.1//:tokio", + "@cargo_vendor__tracing-0.1.41//:tracing", + ], +) diff --git a/vendor/cargo/defs.bzl b/vendor/cargo/defs.bzl index b2634a1b6..7ceb0b866 100644 --- a/vendor/cargo/defs.bzl +++ b/vendor/cargo/defs.bzl @@ -943,6 +943,11 @@ _NORMAL_DEPENDENCIES = { "tracing": Label("@cargo_vendor//:tracing-0.1.41"), }, }, + "webtransport": { + _COMMON_CONDITION: { + "h3-webtransport": Label("@cargo_vendor//:h3-webtransport-0.1.2"), + }, + }, }, "crates/metrics": { _REQUIRED_FEATURE: { @@ -1717,6 +1722,10 @@ _NORMAL_ALIASES = { _COMMON_CONDITION: { }, }, + "webtransport": { + _COMMON_CONDITION: { + }, + }, }, "crates/metrics": { _REQUIRED_FEATURE: { @@ -4354,6 +4363,9 @@ _FEATURE_FLAGS = { ], "tracing": [ ], + "webtransport": [ + "http3", + ], }, "crates/metrics": { "default": [ @@ -7028,6 +7040,16 @@ def crate_repositories(): build_file = Label("//vendor/cargo:BUILD.h3-0.0.8.bazel"), ) + maybe( + http_archive, + name = "cargo_vendor__h3-datagram-0.0.2", + sha256 = "9d2c9f77921668673721ae40f17c729fc48b9e38a663858097cea547484fdf0f", + type = "tar.gz", + urls = ["https://static.crates.io/crates/h3-datagram/0.0.2/download"], + strip_prefix = "h3-datagram-0.0.2", + build_file = Label("//vendor/cargo:BUILD.h3-datagram-0.0.2.bazel"), + ) + maybe( http_archive, name = "cargo_vendor__h3-quinn-0.0.10", @@ -7038,6 +7060,16 @@ def crate_repositories(): build_file = Label("//vendor/cargo:BUILD.h3-quinn-0.0.10.bazel"), ) + maybe( + http_archive, + name = "cargo_vendor__h3-webtransport-0.1.2", + sha256 = "2d91a50fd582a5d67b1f756fba3cd9c66367ff4f23e1017c882f664d63b350a7", + type = "tar.gz", + urls = ["https://static.crates.io/crates/h3-webtransport/0.1.2/download"], + strip_prefix = "h3-webtransport-0.1.2", + build_file = Label("//vendor/cargo:BUILD.h3-webtransport-0.1.2.bazel"), + ) + maybe( http_archive, name = "cargo_vendor__half-1.8.3", @@ -11887,6 +11919,7 @@ def crate_repositories(): struct(repo = "cargo_vendor__guppy-0.17.22", is_dev_dep = False), struct(repo = "cargo_vendor__h3-0.0.8", is_dev_dep = False), struct(repo = "cargo_vendor__h3-quinn-0.0.10", is_dev_dep = False), + struct(repo = "cargo_vendor__h3-webtransport-0.1.2", is_dev_dep = False), struct(repo = "cargo_vendor__heck-0.5.0", is_dev_dep = False), struct(repo = "cargo_vendor__hex-0.4.3", is_dev_dep = False), struct(repo = "cargo_vendor__hmac-0.12.1", is_dev_dep = False), From ef5e3aa6412c32e5f0fb708ce40bda27c04a77f1 Mon Sep 17 00:00:00 2001 From: SimaoMoreira5228 Date: Sat, 11 Oct 2025 14:14:17 +0100 Subject: [PATCH 02/25] Adds WebTransport datagram support over HTTP/3 Introduces WebTransport session management with unified handling for bidirectional streams, unidirectional streams, and datagrams over HTTP/3. Improves WebTransport demo to showcase datagram echo and logging, and refactors backend logic to simplify session integration. --- crates/http/Cargo.toml | 3 +- crates/http/examples/webtransport_tls.rs | 143 +++++- crates/http/src/backend/h3.rs | 356 ++++----------- crates/http/src/backend/h3/webtransport.rs | 506 +++++++++++++++++++++ crates/http/src/body.rs | 5 + misc/toolchains/rust.MODULE.bazel | 1 + vendor/cargo/BUILD.bazel | 12 + vendor/cargo/defs.bzl | 2 + 8 files changed, 741 insertions(+), 287 deletions(-) create mode 100644 crates/http/src/backend/h3/webtransport.rs diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index aeca4e363..7ebb85607 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -62,7 +62,7 @@ http2 = [ ## Enables http3 support http3 = ["dep:quinn", "dep:h3-quinn", "dep:h3"] ## Enables WebTransport over HTTP/3 support (requires http3) -webtransport = ["http3", "dep:h3-webtransport", "h3-quinn?/datagram"] +webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram", "h3-quinn?/datagram"] ## Enables tls via rustls tls-rustls = ["dep:tokio-rustls"] ## Alias for ["http3", "tls-rustls"] @@ -95,6 +95,7 @@ libc = { default-features = false, optional = true, version = "0.2" } # QUIC + HTTP/3 h3 = { default-features = false, optional = true, version = "0.0.8" } +h3-datagram = { default-features = false, optional = true, version = "0.0.2" } h3-quinn = { default-features = false, optional = true, version = "0.0.10" } quinn = { default-features = false, features = ["platform-verifier", "runtime-tokio", "rustls-aws-lc-rs"], optional = true, version = "0.11" } h3-webtransport = { default-features = false, optional = true, version = "0.1" } diff --git a/crates/http/examples/webtransport_tls.rs b/crates/http/examples/webtransport_tls.rs index f953b3744..b829ffed5 100644 --- a/crates/http/examples/webtransport_tls.rs +++ b/crates/http/examples/webtransport_tls.rs @@ -50,40 +50,62 @@ async fn main() {

WebTransport demo

+
@@ -97,7 +119,110 @@ async fn main() { .unwrap(); Ok::<_, Infallible>(resp) } else if req.uri().path() == "/wt" && req.method() == Method::CONNECT { - Ok::<_, Infallible>(http::Response::builder().status(StatusCode::NO_CONTENT).body(String::new()).unwrap()) + // Extract the WebTransport session from the request + if let Some(session) = req.extensions().get::() { + let session = session.clone(); + tracing::info!("WebTransport session established"); + + // Spawn a task to handle incoming bidirectional streams + tokio::spawn({ + let session = session.clone(); + async move { + use http_srv::backend::h3::webtransport::AcceptedBi; + while let Some(Ok(accepted)) = session.accept_bi().await { + match accepted { + AcceptedBi::BidiStream(mut stream) => { + tokio::spawn(async move { + // Echo server: read all data and send it back + match stream.read_to_end(64 * 1024).await { + Ok(data) => { + tracing::info!("Received {} bytes on bidi stream, echoing back", data.len()); + if let Err(e) = stream.write(data.clone()).await { + tracing::warn!("Failed to write to bidi stream: {}", e); + } else if let Err(e) = stream.finish().await { + tracing::warn!("Failed to finish bidi stream: {}", e); + } + } + Err(e) => { + tracing::warn!("Failed to read from bidi stream: {}", e); + } + } + }); + } + AcceptedBi::Request(_req, _stream) => { + tracing::info!("Received HTTP request over WebTransport"); + // Handle HTTP-over-WebTransport requests if needed + } + } + } + tracing::info!("Bidi stream acceptor finished"); + } + }); + + // Spawn a task to handle incoming unidirectional streams + tokio::spawn({ + let session = session.clone(); + async move { + while let Some(Ok((_id, mut stream))) = session.accept_uni().await { + tokio::spawn(async move { + match stream.read_to_end(64 * 1024).await { + Ok(data) => { + tracing::info!("Received {} bytes on uni stream: {:?}", + data.len(), + String::from_utf8_lossy(&data)); + } + Err(e) => { + tracing::warn!("Failed to read from uni stream: {}", e); + } + } + }); + } + tracing::info!("Uni stream acceptor finished"); + } + }); + + // Spawn a task to handle incoming datagrams + tokio::spawn({ + let session = session.clone(); + async move { + let mut datagram_reader = session.datagram_reader(); + let mut datagram_sender = session.datagram_sender(); + + loop { + match datagram_reader.read_datagram().await { + Ok(datagram) => { + let payload = datagram.into_payload(); + tracing::info!("Received datagram: {} bytes", payload.len()); + let response = format!("Echo: {}", String::from_utf8_lossy(&payload)); + if let Err(e) = datagram_sender.send_datagram(bytes::Bytes::from(response)) { + tracing::warn!("Failed to send datagram response: {}", e); + break; + } + } + Err(e) => { + tracing::warn!("Failed to read datagram: {}", e); + break; + } + } + } + tracing::info!("Datagram handler finished"); + } + }); + + return Ok::<_, Infallible>( + http::Response::builder() + .status(StatusCode::OK) + .body(String::new()) + .unwrap() + ); + } + + Ok::<_, Infallible>( + http::Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("WebTransport session not found".to_string()) + .unwrap() + ) } else { Ok::<_, Infallible>(http::Response::builder().status(StatusCode::NOT_FOUND).body(String::new()).unwrap()) } diff --git a/crates/http/src/backend/h3.rs b/crates/http/src/backend/h3.rs index 9e472bfb5..dbf9a5b90 100644 --- a/crates/http/src/backend/h3.rs +++ b/crates/http/src/backend/h3.rs @@ -17,6 +17,8 @@ use crate::service::{HttpService, HttpServiceFactory}; pub mod body; mod utils; +#[cfg(feature = "webtransport")] +pub mod webtransport; /// A backend that handles incoming HTTP3 connections. /// @@ -84,24 +86,23 @@ where let socket = socket.try_clone().expect("failed to clone socket"); let runtime = Arc::clone(&runtime); - let worker_fut = - async move { - let endpoint = h3_quinn::quinn::Endpoint::new( - h3_quinn::quinn::EndpointConfig::default(), - Some(server_config), - socket, - runtime, - )?; + let worker_fut = async move { + let endpoint = h3_quinn::quinn::Endpoint::new( + h3_quinn::quinn::EndpointConfig::default(), + Some(server_config), + socket, + runtime, + )?; - #[cfg(feature = "tracing")] - tracing::trace!("waiting for connections"); + #[cfg(feature = "tracing")] + tracing::trace!("waiting for connections"); - while let Some(Some(new_conn)) = endpoint.accept().with_context(&ctx).await { - let mut service_factory = service_factory.clone(); - let ctx = ctx.clone(); + while let Some(Some(new_conn)) = endpoint.accept().with_context(&ctx).await { + let mut service_factory = service_factory.clone(); + let ctx = ctx.clone(); - tokio::spawn(async move { - let _res: Result<_, HttpError> = async move { + tokio::spawn(async move { + let _res: Result<_, HttpError> = async move { let Some(conn) = new_conn.with_context(&ctx).await.transpose()? else { #[cfg(feature = "tracing")] tracing::trace!("context done while accepting connection"); @@ -133,7 +134,7 @@ where } // make a new service for this connection - let mut http_service = service_factory + let http_service = service_factory .new_service(addr) .await .map_err(|e| HttpError::ServiceFactoryError(e))?; @@ -162,21 +163,63 @@ where #[cfg(feature = "tracing")] tracing::debug!("starting WebTransport session"); - match drive_webtransport_session::( - req, stream, h3_conn, ctx, &mut http_service - ).await { - Ok(_) => { + // Store the original request for passing to the service + let (parts, _) = req.into_parts(); + + // Accept the WebTransport session + let session = match h3wt::server::WebTransportSession::accept( + http::Request::from_parts(parts.clone(), ()), + stream, + h3_conn, + ) + .await + { + Ok(session) => session, + Err(_err) => { #[cfg(feature = "tracing")] - tracing::debug!("WebTransport session ended"); + tracing::warn!(err = %_err, "failed to accept WebTransport session"); + break; } - Err(err) => { + }; + + let wt_session = + webtransport::WebTransportSession::new(std::sync::Arc::new(session)); + + // Create an empty body for the WebTransport request + // Since WebTransport operates on streams, not the request body + let empty_body = crate::body::IncomingBody::Empty; + + // Reconstruct the request with the session in extensions + let mut wt_req = http::Request::from_parts(parts, empty_body); + wt_req.extensions_mut().insert(wt_session); // Call the service with the WebTransport request + tokio::spawn({ + let ctx = ctx.clone(); + let mut http_service = http_service.clone(); + async move { + let _res: Result<_, HttpError> = async move { + let _resp = http_service + .call(wt_req) + .await + .map_err(|e| HttpError::ServiceError(e))?; + + #[cfg(feature = "tracing")] + tracing::debug!("WebTransport session handler completed"); + + Ok(()) + } + .await; + #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "WebTransport session error"); + if let Err(e) = _res { + tracing::warn!(err = %e, "WebTransport session handler error"); + } + + drop(ctx); } - } + }); + break; } - let (mut send, recv) = stream.split(); let size_hint = req @@ -243,19 +286,19 @@ where } .await; - #[cfg(feature = "tracing")] - if let Err(err) = _res { - tracing::warn!(err = %err, "error handling connection"); - } - }); - } + #[cfg(feature = "tracing")] + if let Err(err) = _res { + tracing::warn!(err = %err, "error handling connection"); + } + }); + } - // shut down gracefully - // wait for connections to be closed before exiting - endpoint.wait_idle().await; + // shut down gracefully + // wait for connections to be closed before exiting + endpoint.wait_idle().await; - Ok::<_, crate::error::HttpError>(()) - }; + Ok::<_, crate::error::HttpError>(()) + }; #[cfg(feature = "tracing")] let worker_fut = worker_fut.instrument(tracing::trace_span!("worker", n = _n)); @@ -277,244 +320,3 @@ where Ok(()) } } - -#[cfg(feature = "webtransport")] -async fn drive_webtransport_session( - req: http::Request<()>, - stream: h3::server::RequestStream, bytes::Bytes>, - h3_conn: h3::server::Connection, - ctx: scuffle_context::Context, - http_service: &mut F::Service, -) -> Result<(), crate::error::HttpError> -where - F: HttpServiceFactory + Clone + Send + 'static, - F::Error: std::error::Error + Send, - F::Service: Clone + Send + 'static, - ::Error: std::error::Error + Send + Sync, - ::ResBody: Send, - <::ResBody as http_body::Body>::Data: Send, - <::ResBody as http_body::Body>::Error: std::error::Error + Send + Sync, -{ - // Accept the WebTransport session - let session = std::sync::Arc::new(h3wt::server::WebTransportSession::accept(req, stream, h3_conn).await?); - - // Spawn task to handle unidirectional streams - let uni_handle = tokio::spawn({ - let ctx = ctx.clone(); - let session = session.clone(); - async move { - loop { - match session.accept_uni().with_context(&ctx).await { - Some(Ok(Some((sid, stream)))) => { - #[cfg(feature = "tracing")] - tracing::debug!(session_id = ?sid, "received WebTransport uni stream"); - - tokio::spawn({ - let ctx = ctx.clone(); - async move { - if let Err(err) = handle_webtransport_uni_stream(stream, ctx).await { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "error handling WebTransport uni stream"); - } - } - }); - } - Some(Ok(None)) => break, - Some(Err(err)) => { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "WebTransport uni stream accept error"); - break; - } - None => break, - } - } - } - }); - - // Handle bidirectional streams and requests - loop { - match session.accept_bi().with_context(&ctx).await { - Some(Ok(Some(h3wt::server::AcceptedBi::Request(req, stream)))) => { - let (mut send, recv) = stream.split(); - let size_hint = req - .headers() - .get(http::header::CONTENT_LENGTH) - .and_then(|len| len.to_str().ok().and_then(|x| x.parse().ok())); - let body = QuicIncomingBody::new(recv, size_hint); - let req = req.map(|_| crate::body::IncomingBody::from(body)); - - let resp = match http_service.call(req).await { - Ok(r) => r, - Err(_e) => { - #[cfg(feature = "tracing")] - tracing::warn!(err = %_e, "service error in WebTransport request"); - let _ = send - .send_response( - http::Response::builder() - .status(http::StatusCode::INTERNAL_SERVER_ERROR) - .body(()) - .unwrap(), - ) - .await; - continue; - } - }; - - let (parts, body) = resp.into_parts(); - if let Err(err) = send.send_response(http::Response::from_parts(parts, ())).await { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "failed to send WebTransport response headers"); - continue; - } - if let Err(err) = copy_response_body::<_, F>(send, body).await { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "failed to send WebTransport response body"); - } - } - Some(Ok(Some(h3wt::server::AcceptedBi::BidiStream(sid, bidi)))) => { - // Handle raw bidirectional WebTransport stream - #[cfg(feature = "tracing")] - tracing::debug!(session_id = ?sid, "received WebTransport bidi stream"); - - tokio::spawn({ - let ctx = ctx.clone(); - async move { - if let Err(err) = handle_webtransport_bidi_stream(bidi, ctx).await { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "error handling WebTransport bidi stream"); - } - } - }); - } - Some(Ok(None)) => break, - Some(Err(err)) => { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "WebTransport session error"); - break; - } - None => break, - } - } - - // Wait for uni stream handler to complete - let _ = uni_handle.await; - - Ok(()) -} - -#[cfg(feature = "webtransport")] -async fn handle_webtransport_uni_stream( - mut stream: h3wt::stream::RecvStream, - ctx: scuffle_context::Context, -) -> Result<(), Box> { - use bytes::Buf; - use h3::quic::RecvStream; - - let mut buffer = Vec::new(); - loop { - match std::future::poll_fn(|cx| stream.poll_data(cx)).with_context(&ctx).await { - Some(Ok(Some(mut chunk))) => { - let chunk_size = chunk.remaining(); - let chunk_bytes = chunk.copy_to_bytes(chunk_size); - buffer.extend_from_slice(&chunk_bytes); - - #[cfg(feature = "tracing")] - tracing::debug!( - bytes = chunk_size, - total = buffer.len(), - "received data on WebTransport uni stream" - ); - } - Some(Ok(None)) => { - #[cfg(feature = "tracing")] - tracing::debug!( - total_bytes = buffer.len(), - data = ?String::from_utf8_lossy(&buffer), - "WebTransport uni stream finished" - ); - break; - } - Some(Err(err)) => { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "error reading from WebTransport uni stream"); - return Err(err.into()); - } - None => { - #[cfg(feature = "tracing")] - tracing::trace!("context done while reading WebTransport uni stream"); - return Ok(()); - } - } - } - - Ok(()) -} - -#[cfg(feature = "webtransport")] -async fn handle_webtransport_bidi_stream( - mut stream: h3wt::stream::BidiStream, bytes::Bytes>, - ctx: scuffle_context::Context, -) -> Result<(), Box> { - use bytes::Buf; - use h3::quic::RecvStream; - - // Read data from the receive side - let mut buffer = Vec::new(); - loop { - match std::future::poll_fn(|cx| stream.poll_data(cx)).with_context(&ctx).await { - Some(Ok(Some(mut chunk))) => { - let chunk_size = chunk.remaining(); - let chunk_bytes = chunk.copy_to_bytes(chunk_size); - buffer.extend_from_slice(&chunk_bytes); - - #[cfg(feature = "tracing")] - tracing::debug!( - bytes = chunk_size, - total = buffer.len(), - "received data on WebTransport bidi stream" - ); - } - Some(Ok(None)) => { - // Stream finished - #[cfg(feature = "tracing")] - tracing::debug!(total_bytes = buffer.len(), "WebTransport bidi stream finished reading"); - break; - } - Some(Err(err)) => { - #[cfg(feature = "tracing")] - tracing::warn!(err = %err, "error reading from WebTransport bidi stream"); - return Err(err.into()); - } - None => { - // Context cancelled - #[cfg(feature = "tracing")] - tracing::trace!("context done while reading WebTransport bidi stream"); - return Ok(()); - } - } - } - - // Echo back the received data - if !buffer.is_empty() { - #[cfg(feature = "tracing")] - tracing::debug!(bytes = buffer.len(), "echoing data back on WebTransport bidi stream"); - - let response_msg = format!("Echo: received {} bytes", buffer.len()); - let response_bytes = bytes::Bytes::from(response_msg); - - // Send the response using SendStreamUnframed - use h3::quic::{SendStream, SendStreamUnframed}; - let mut bytes_buf = response_bytes.clone(); - while bytes_buf.has_remaining() { - let written = std::future::poll_fn(|cx| stream.poll_send(cx, &mut bytes_buf)).await?; - if written == 0 { - break; - } - } - std::future::poll_fn(|cx| stream.poll_finish(cx)).await?; - #[cfg(feature = "tracing")] - tracing::debug!("successfully echoed data on WebTransport bidi stream"); - } - - Ok(()) -} diff --git a/crates/http/src/backend/h3/webtransport.rs b/crates/http/src/backend/h3/webtransport.rs new file mode 100644 index 000000000..acd40b5f0 --- /dev/null +++ b/crates/http/src/backend/h3/webtransport.rs @@ -0,0 +1,506 @@ +//! WebTransport session management for HTTP/3. +//! +//! This module provides types for handling WebTransport sessions over HTTP/3. +//! WebTransport allows bidirectional streams and datagrams to be established over QUIC. + +use std::sync::Arc; +use std::{fmt, io}; + +use bytes::Bytes; +use h3::quic::StreamErrorIncoming; +use h3_webtransport::server::{AcceptedBi as H3AcceptedBi, WebTransportSession as H3WebTransportSession}; +use h3_webtransport::stream::{BidiStream, RecvStream as WtRecvStream, SendStream as WtSendStream}; + +/// A WebTransport session handle. +/// +/// This type provides access to bidirectional and unidirectional streams +/// for a WebTransport session established over HTTP/3. +/// +/// The session can be retrieved from the request extensions when handling +/// a WebTransport CONNECT request. +/// +/// # Example +/// +/// ```rust,ignore +/// use scuffle_http::{IncomingRequest, Response}; +/// use scuffle_http::backend::h3::webtransport::WebTransportSession; +/// +/// async fn handle_webtransport(req: IncomingRequest) -> Result, std::convert::Infallible> { +/// if let Some(session) = req.extensions().get::() { +/// // Handle WebTransport session +/// tokio::spawn({ +/// let session = session.clone(); +/// async move { +/// while let Some(Ok(accepted)) = session.accept_bi().await { +/// // Handle bidirectional streams +/// } +/// } +/// }); +/// +/// return Ok(Response::builder() +/// .status(200) +/// .body(String::new()) +/// .unwrap()); +/// } +/// +/// Ok(Response::builder() +/// .status(404) +/// .body(String::new()) +/// .unwrap()) +/// } +/// ``` +#[derive(Clone)] +pub struct WebTransportSession { + session: Arc>, +} + +impl WebTransportSession { + /// Create a new WebTransport session from an h3-webtransport session. + pub(crate) fn new(session: Arc>) -> Self { + Self { session } + } + + /// Accept the next incoming bidirectional stream or request. + /// + /// Returns `None` when the session is closed or no more streams are available. + /// + /// # Example + /// + /// ```rust,ignore + /// use scuffle_http::backend::h3::webtransport::{WebTransportSession, AcceptedBi}; + /// + /// async fn handle_session(session: WebTransportSession) { + /// while let Some(Ok(accepted)) = session.accept_bi().await { + /// match accepted { + /// AcceptedBi::BidiStream(stream) => { + /// // Handle raw bidirectional stream + /// } + /// AcceptedBi::Request(req, stream) => { + /// // Handle HTTP request over WebTransport + /// } + /// } + /// } + /// } + /// ``` + pub async fn accept_bi(&self) -> Option> { + match self.session.accept_bi().await { + Ok(Some(H3AcceptedBi::BidiStream(id, stream))) => { + Some(Ok(AcceptedBi::BidiStream(WebTransportBidiStream { stream, _id: id }))) + } + Ok(Some(H3AcceptedBi::Request(req, stream))) => { + Some(Ok(AcceptedBi::Request(req, WebTransportRequestStream { stream }))) + } + Ok(None) => None, + Err(e) => Some(Err(e)), + } + } + + /// Accept the next incoming unidirectional stream. + /// + /// Returns `None` when the session is closed or no more streams are available. + pub async fn accept_uni( + &self, + ) -> Option> { + match self.session.accept_uni().await { + Ok(Some((id, stream))) => Some(Ok((WebTransportStreamId(id), WebTransportRecvStream { stream }))), + Ok(None) => None, + Err(e) => Some(Err(e)), + } + } + + /// Open a new bidirectional stream. + /// + /// # Example + /// + /// ```rust,ignore + /// let (mut send, mut recv) = session.open_bi().await?; + /// send.write(Bytes::from("Hello")).await?; + /// send.finish().await?; + /// ``` + pub async fn open_bi(&self) -> Result<(WebTransportSendStream, WebTransportRecvStream), h3::error::StreamError> { + let stream = self.session.open_bi(WebTransportStreamId::next_session_id()).await?; + use h3::quic::BidiStream; + let (send, recv) = stream.split(); + Ok(( + WebTransportSendStream { stream: send }, + WebTransportRecvStream { stream: recv }, + )) + } + + /// Open a new unidirectional stream. + /// + /// # Example + /// + /// ```rust,ignore + /// let mut send = session.open_uni().await?; + /// send.write(Bytes::from("Hello")).await?; + /// send.finish().await?; + /// ``` + pub async fn open_uni(&self) -> Result { + let send = self.session.open_uni(WebTransportStreamId::next_session_id()).await?; + Ok(WebTransportSendStream { stream: send }) + } + + /// Get the session ID for this WebTransport session. + pub fn session_id(&self) -> h3_webtransport::SessionId { + self.session.session_id() + } + + /// Get a datagram sender for sending datagrams over this session. + /// + /// Datagrams are unreliable and unordered messages. + /// + /// # Example + /// + /// ```rust,ignore + /// let mut sender = session.datagram_sender(); + /// sender.send_datagram(Bytes::from("Hello"))?; + /// ``` + pub fn datagram_sender( + &self, + ) -> h3_datagram::datagram_handler::DatagramSender< + >::SendDatagramHandler, + Bytes, + > { + self.session.datagram_sender() + } + + /// Get a datagram reader for receiving datagrams over this session. + /// + /// # Example + /// + /// ```rust,ignore + /// let mut reader = session.datagram_reader(); + /// while let Ok(datagram) = reader.read_datagram().await { + /// println!("Received: {} bytes", datagram.payload().len()); + /// } + /// ``` + pub fn datagram_reader( + &self, + ) -> h3_datagram::datagram_handler::DatagramReader< + >::RecvDatagramHandler, + > { + self.session.datagram_reader() + } +} + +impl fmt::Debug for WebTransportSession { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WebTransportSession").finish_non_exhaustive() + } +} + +/// An accepted bidirectional stream or request. +#[derive(Debug)] +pub enum AcceptedBi { + /// A raw bidirectional stream. + BidiStream(WebTransportBidiStream), + /// An HTTP request over WebTransport. + Request(http::Request<()>, WebTransportRequestStream), +} + +/// A bidirectional WebTransport stream. +pub struct WebTransportBidiStream { + stream: BidiStream, Bytes>, + _id: h3_webtransport::SessionId, +} + +impl WebTransportBidiStream { + /// Split this stream into separate send and receive halves. + /// + /// # Example + /// + /// ```rust,ignore + /// let (mut send, mut recv) = bidi_stream.split(); + /// tokio::spawn(async move { + /// while let Ok(Some(data)) = recv.read().await { + /// println!("Received: {:?}", data); + /// } + /// }); + /// send.write(Bytes::from("Hello")).await?; + /// ``` + pub fn split(self) -> (WebTransportSendStream, WebTransportRecvStream) { + use h3::quic::BidiStream; + let (send, recv) = self.stream.split(); + ( + WebTransportSendStream { stream: send }, + WebTransportRecvStream { stream: recv }, + ) + } + + /// Read data from the receive side of the stream. + pub async fn read(&mut self) -> Result, StreamErrorIncoming> { + use h3::quic::RecvStream; + std::future::poll_fn(|cx| self.stream.poll_data(cx)).await + } + + /// Read all remaining data from the receive side until the stream is finished. + /// + /// # Example + /// + /// ```rust,ignore + /// let data = bidi_stream.read_to_end(1024 * 1024).await?; // max 1MB + /// ``` + pub async fn read_to_end(&mut self, max_size: usize) -> Result { + let mut chunks = Vec::new(); + let mut total_size = 0; + + while let Some(chunk) = self + .read() + .await + .map_err(|e| io::Error::other(format!("stream read error: {}", e)))? + { + total_size += chunk.len(); + if total_size > max_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "stream data too large: {} bytes exceeds maximum of {} bytes", + total_size, max_size + ), + )); + } + chunks.push(chunk); + } + + if chunks.is_empty() { + Ok(Bytes::new()) + } else if chunks.len() == 1 { + Ok(chunks.into_iter().next().unwrap()) + } else { + let mut combined = bytes::BytesMut::with_capacity(total_size); + for chunk in chunks { + combined.extend_from_slice(&chunk); + } + Ok(combined.freeze()) + } + } + + /// Write data to the send side of the stream. + pub async fn write(&mut self, data: Bytes) -> Result<(), StreamErrorIncoming> { + use bytes::Buf; + use h3::quic::{SendStream, SendStreamUnframed}; + + std::future::poll_fn(|cx| self.stream.poll_ready(cx)).await?; + let mut buf = data; + while buf.has_remaining() { + let written = std::future::poll_fn(|cx| self.stream.poll_send(cx, &mut buf)).await?; + if written == 0 { + break; + } + } + Ok(()) + } + + /// Finish writing to the stream. + pub async fn finish(&mut self) -> Result<(), StreamErrorIncoming> { + use h3::quic::SendStream; + std::future::poll_fn(|cx| self.stream.poll_finish(cx)).await + } +} + +impl fmt::Debug for WebTransportBidiStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WebTransportBidiStream").finish() + } +} + +/// A receive-only WebTransport stream. +pub struct WebTransportRecvStream { + stream: WtRecvStream, +} + +impl WebTransportRecvStream { + /// Read data from the stream. + /// + /// Returns `Ok(None)` when the stream is finished. + /// + /// # Example + /// + /// ```rust,ignore + /// while let Ok(Some(data)) = recv_stream.read().await { + /// println!("Received {} bytes", data.len()); + /// } + /// ``` + pub async fn read(&mut self) -> Result, StreamErrorIncoming> { + use h3::quic::RecvStream; + std::future::poll_fn(|cx| self.stream.poll_data(cx)).await + } + + /// Read all remaining data from the stream until it's finished. + /// + /// This collects all chunks into a single `Bytes` object. + /// + /// # Example + /// + /// ```rust,ignore + /// let data = recv_stream.read_to_end(1024 * 1024).await?; // max 1MB + /// println!("Received complete message: {} bytes", data.len()); + /// ``` + pub async fn read_to_end(&mut self, max_size: usize) -> Result { + let mut chunks = Vec::new(); + let mut total_size = 0; + + while let Some(chunk) = self + .read() + .await + .map_err(|e| io::Error::other(format!("stream read error: {}", e)))? + { + total_size += chunk.len(); + if total_size > max_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "stream data too large: {} bytes exceeds maximum of {} bytes", + total_size, max_size + ), + )); + } + chunks.push(chunk); + } + + if chunks.is_empty() { + Ok(Bytes::new()) + } else if chunks.len() == 1 { + Ok(chunks.into_iter().next().unwrap()) + } else { + // Combine all chunks into a single buffer + let mut combined = bytes::BytesMut::with_capacity(total_size); + for chunk in chunks { + combined.extend_from_slice(&chunk); + } + Ok(combined.freeze()) + } + } + + /// Stop receiving data on this stream with an error code. + pub fn stop_sending(&mut self, error_code: u64) { + use h3::quic::RecvStream; + self.stream.stop_sending(error_code) + } +} + +impl fmt::Debug for WebTransportRecvStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WebTransportRecvStream").finish_non_exhaustive() + } +} + +/// A send-only WebTransport stream. +pub struct WebTransportSendStream { + stream: WtSendStream, Bytes>, +} + +impl WebTransportSendStream { + /// Write data to the stream. + /// + /// # Example + /// + /// ```rust,ignore + /// send_stream.write(Bytes::from("Hello, world!")).await?; + /// send_stream.finish().await?; + /// ``` + pub async fn write(&mut self, data: Bytes) -> Result<(), StreamErrorIncoming> { + use bytes::Buf; + use h3::quic::{SendStream, SendStreamUnframed}; + + std::future::poll_fn(|cx| self.stream.poll_ready(cx)).await?; + let mut buf = data; + while buf.has_remaining() { + let written = std::future::poll_fn(|cx| self.stream.poll_send(cx, &mut buf)).await?; + if written == 0 { + break; + } + } + Ok(()) + } + + /// Write all data and finish the stream in one operation. + /// + /// This is a convenience method that writes the data and then finishes the stream. + /// + /// # Example + /// + /// ```rust,ignore + /// send_stream.write_all(Bytes::from("Complete message")).await?; + /// ``` + pub async fn write_all(&mut self, data: Bytes) -> Result<(), StreamErrorIncoming> { + self.write(data).await?; + self.finish().await + } + + /// Finish writing to the stream. + /// + /// This signals that no more data will be sent on this stream. + pub async fn finish(&mut self) -> Result<(), StreamErrorIncoming> { + use h3::quic::SendStream; + std::future::poll_fn(|cx| self.stream.poll_finish(cx)).await + } + + /// Reset the stream with an error code. + pub fn reset(&mut self, reset_code: u64) { + use h3::quic::SendStream; + self.stream.reset(reset_code) + } +} +impl fmt::Debug for WebTransportSendStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WebTransportSendStream").finish_non_exhaustive() + } +} + +/// A stream for handling HTTP requests over WebTransport. +pub struct WebTransportRequestStream { + stream: h3::server::RequestStream, Bytes>, +} + +impl WebTransportRequestStream { + /// Split this stream into separate send and receive halves. + pub fn split( + self, + ) -> ( + h3::server::RequestStream, Bytes>, + h3::server::RequestStream, + ) { + self.stream.split() + } +} + +impl fmt::Debug for WebTransportRequestStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WebTransportRequestStream").finish_non_exhaustive() + } +} + +/// A WebTransport stream identifier. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct WebTransportStreamId(h3_webtransport::SessionId); + +impl WebTransportStreamId { + fn next_session_id() -> h3_webtransport::SessionId { + use std::sync::atomic::{AtomicU64, Ordering}; + static COUNTER: AtomicU64 = AtomicU64::new(0); + let id = COUNTER.fetch_add(1, Ordering::Relaxed); + // SessionId is created from a VarInt-encoded StreamId + let varint = h3::proto::varint::VarInt::from_u64(id).expect("valid varint"); + let stream_id = h3::quic::StreamId::from(varint); + h3_webtransport::SessionId::from(stream_id) + } + + /// Get the inner session ID. + pub fn inner(&self) -> h3_webtransport::SessionId { + self.0 + } +} + +impl From for WebTransportStreamId { + fn from(id: h3_webtransport::SessionId) -> Self { + Self(id) + } +} + +impl fmt::Display for WebTransportStreamId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.0) + } +} diff --git a/crates/http/src/body.rs b/crates/http/src/body.rs index 02b98c400..4b2c2444b 100644 --- a/crates/http/src/body.rs +++ b/crates/http/src/body.rs @@ -31,6 +31,8 @@ pub enum IncomingBody { /// The body of an incoming h3 request. #[cfg(feature = "http3")] Quic(crate::backend::h3::body::QuicIncomingBody), + /// An empty body (used for WebTransport sessions). + Empty, } #[cfg(any(feature = "http1", feature = "http2"))] @@ -57,6 +59,7 @@ impl http_body::Body for IncomingBody { IncomingBody::Hyper(body) => body.is_end_stream(), #[cfg(feature = "http3")] IncomingBody::Quic(body) => body.is_end_stream(), + IncomingBody::Empty => true, #[cfg(not(any(feature = "http1", feature = "http2", feature = "http3")))] _ => false, } @@ -71,6 +74,7 @@ impl http_body::Body for IncomingBody { IncomingBody::Hyper(body) => std::pin::Pin::new(body).poll_frame(_cx).map_err(Into::into), #[cfg(feature = "http3")] IncomingBody::Quic(body) => std::pin::Pin::new(body).poll_frame(_cx).map_err(Into::into), + IncomingBody::Empty => std::task::Poll::Ready(None), #[cfg(not(any(feature = "http1", feature = "http2", feature = "http3")))] _ => std::task::Poll::Ready(None), } @@ -82,6 +86,7 @@ impl http_body::Body for IncomingBody { IncomingBody::Hyper(body) => body.size_hint(), #[cfg(feature = "http3")] IncomingBody::Quic(body) => body.size_hint(), + IncomingBody::Empty => http_body::SizeHint::with_exact(0), #[cfg(not(any(feature = "http1", feature = "http2", feature = "http3")))] _ => http_body::SizeHint::default(), } diff --git a/misc/toolchains/rust.MODULE.bazel b/misc/toolchains/rust.MODULE.bazel index 09e74c5be..cf340e405 100644 --- a/misc/toolchains/rust.MODULE.bazel +++ b/misc/toolchains/rust.MODULE.bazel @@ -222,6 +222,7 @@ use_repo( "cargo_vendor__glob-0.3.3", "cargo_vendor__guppy-0.17.22", "cargo_vendor__h3-0.0.8", + "cargo_vendor__h3-datagram-0.0.2", "cargo_vendor__h3-quinn-0.0.10", "cargo_vendor__h3-webtransport-0.1.2", "cargo_vendor__heck-0.5.0", diff --git a/vendor/cargo/BUILD.bazel b/vendor/cargo/BUILD.bazel index 27ec34363..759bfc053 100644 --- a/vendor/cargo/BUILD.bazel +++ b/vendor/cargo/BUILD.bazel @@ -609,6 +609,18 @@ transition_alias_opt( tags = ["manual"], ) +transition_alias_opt( + name = "h3-datagram-0.0.2", + actual = "@cargo_vendor__h3-datagram-0.0.2//:h3_datagram", + tags = ["manual"], +) + +transition_alias_opt( + name = "h3-datagram", + actual = "@cargo_vendor__h3-datagram-0.0.2//:h3_datagram", + tags = ["manual"], +) + transition_alias_opt( name = "h3-quinn-0.0.10", actual = "@cargo_vendor__h3-quinn-0.0.10//:h3_quinn", diff --git a/vendor/cargo/defs.bzl b/vendor/cargo/defs.bzl index 7ceb0b866..3c07ef655 100644 --- a/vendor/cargo/defs.bzl +++ b/vendor/cargo/defs.bzl @@ -945,6 +945,7 @@ _NORMAL_DEPENDENCIES = { }, "webtransport": { _COMMON_CONDITION: { + "h3-datagram": Label("@cargo_vendor//:h3-datagram-0.0.2"), "h3-webtransport": Label("@cargo_vendor//:h3-webtransport-0.1.2"), }, }, @@ -11918,6 +11919,7 @@ def crate_repositories(): struct(repo = "cargo_vendor__glob-0.3.3", is_dev_dep = False), struct(repo = "cargo_vendor__guppy-0.17.22", is_dev_dep = False), struct(repo = "cargo_vendor__h3-0.0.8", is_dev_dep = False), + struct(repo = "cargo_vendor__h3-datagram-0.0.2", is_dev_dep = False), struct(repo = "cargo_vendor__h3-quinn-0.0.10", is_dev_dep = False), struct(repo = "cargo_vendor__h3-webtransport-0.1.2", is_dev_dep = False), struct(repo = "cargo_vendor__heck-0.5.0", is_dev_dep = False), From 3e2a1dbe797d5c3d55984b406004a2f534e85230 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sat, 11 Oct 2025 15:44:46 +0200 Subject: [PATCH 03/25] refactor(http): webtransport client html into file --- .../http/examples/web_transport_client.html | 65 +++++++++++++++++ crates/http/examples/webtransport_tls.rs | 70 +------------------ 2 files changed, 68 insertions(+), 67 deletions(-) create mode 100644 crates/http/examples/web_transport_client.html diff --git a/crates/http/examples/web_transport_client.html b/crates/http/examples/web_transport_client.html new file mode 100644 index 000000000..948fd07fd --- /dev/null +++ b/crates/http/examples/web_transport_client.html @@ -0,0 +1,65 @@ + + + +

WebTransport demo

+
+ + + diff --git a/crates/http/examples/webtransport_tls.rs b/crates/http/examples/webtransport_tls.rs index b829ffed5..f06b9366f 100644 --- a/crates/http/examples/webtransport_tls.rs +++ b/crates/http/examples/webtransport_tls.rs @@ -36,6 +36,8 @@ fn rustls_config() -> rustls::ServerConfig { .expect("failed to build config") } +const WT_CLIENT_HTML: &str = include_str!("web_transport_client.html"); + #[tokio::main] async fn main() { tracing_subscriber::fmt() @@ -46,76 +48,10 @@ async fn main() { let service = fn_http_service(|req: http_srv::IncomingRequest| async move { if req.uri().path() == "/" && req.method() == Method::GET { - let html = r#" - - -

WebTransport demo

-
- - - - "#; let resp = http::Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/html; charset=utf-8") - .body(html.to_string()) + .body(WT_CLIENT_HTML) .unwrap(); Ok::<_, Infallible>(resp) } else if req.uri().path() == "/wt" && req.method() == Method::CONNECT { From cdf8c0b5805076823bc62ebc763407bfb45d4eee Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sat, 11 Oct 2025 15:48:35 +0200 Subject: [PATCH 04/25] docs(http): fix --- crates/http/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 7ebb85607..f05b99ad9 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -61,8 +61,8 @@ http2 = [ ] ## Enables http3 support http3 = ["dep:quinn", "dep:h3-quinn", "dep:h3"] -## Enables WebTransport over HTTP/3 support (requires http3) -webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram", "h3-quinn?/datagram"] +## Enables WebTransport over HTTP/3 support (enables http3) +webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram", "h3-quinn/datagram"] ## Enables tls via rustls tls-rustls = ["dep:tokio-rustls"] ## Alias for ["http3", "tls-rustls"] From ab34f151a5b6e8bcdbfd6c283f58dab340bb9a59 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sat, 11 Oct 2025 15:56:09 +0200 Subject: [PATCH 05/25] fix(http): remove unused h3_quinn feature --- crates/http/Cargo.toml | 2 +- vendor/cargo/BUILD.h3-quinn-0.0.10.bazel | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index f05b99ad9..84368fdf2 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -62,7 +62,7 @@ http2 = [ ## Enables http3 support http3 = ["dep:quinn", "dep:h3-quinn", "dep:h3"] ## Enables WebTransport over HTTP/3 support (enables http3) -webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram", "h3-quinn/datagram"] +webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram"] ## Enables tls via rustls tls-rustls = ["dep:tokio-rustls"] ## Alias for ["http3", "tls-rustls"] diff --git a/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel b/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel index adafe61dd..6b7e328f5 100644 --- a/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel +++ b/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel @@ -35,7 +35,6 @@ rust_library( ], ), crate_features = [ - "datagram", "tracing", ], crate_root = "src/lib.rs", @@ -68,7 +67,6 @@ rust_library( "@cargo_vendor__bytes-1.10.1//:bytes", "@cargo_vendor__futures-0.3.31//:futures", "@cargo_vendor__h3-0.0.8//:h3", - "@cargo_vendor__h3-datagram-0.0.2//:h3_datagram", "@cargo_vendor__quinn-0.11.9//:quinn", "@cargo_vendor__tokio-1.47.1//:tokio", "@cargo_vendor__tokio-util-0.7.16//:tokio_util", From b093ab296a9c071b167bd5d759b88e4a9647d4b2 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sat, 11 Oct 2025 16:00:08 +0200 Subject: [PATCH 06/25] Revert "fix(http): remove unused h3_quinn feature" This reverts commit 8632fbcbf3f7b3b45d9883d75830b79c938da08d. --- crates/http/Cargo.toml | 2 +- vendor/cargo/BUILD.h3-quinn-0.0.10.bazel | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 84368fdf2..f05b99ad9 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -62,7 +62,7 @@ http2 = [ ## Enables http3 support http3 = ["dep:quinn", "dep:h3-quinn", "dep:h3"] ## Enables WebTransport over HTTP/3 support (enables http3) -webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram"] +webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram", "h3-quinn/datagram"] ## Enables tls via rustls tls-rustls = ["dep:tokio-rustls"] ## Alias for ["http3", "tls-rustls"] diff --git a/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel b/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel index 6b7e328f5..adafe61dd 100644 --- a/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel +++ b/vendor/cargo/BUILD.h3-quinn-0.0.10.bazel @@ -35,6 +35,7 @@ rust_library( ], ), crate_features = [ + "datagram", "tracing", ], crate_root = "src/lib.rs", @@ -67,6 +68,7 @@ rust_library( "@cargo_vendor__bytes-1.10.1//:bytes", "@cargo_vendor__futures-0.3.31//:futures", "@cargo_vendor__h3-0.0.8//:h3", + "@cargo_vendor__h3-datagram-0.0.2//:h3_datagram", "@cargo_vendor__quinn-0.11.9//:quinn", "@cargo_vendor__tokio-1.47.1//:tokio", "@cargo_vendor__tokio-util-0.7.16//:tokio_util", From e03276eb1e984d5838854f844000d0ed529bda02 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sat, 11 Oct 2025 16:10:36 +0200 Subject: [PATCH 07/25] fix(http): test --- crates/http/examples/webtransport_tls.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/http/examples/webtransport_tls.rs b/crates/http/examples/webtransport_tls.rs index f06b9366f..a0cfc9cb0 100644 --- a/crates/http/examples/webtransport_tls.rs +++ b/crates/http/examples/webtransport_tls.rs @@ -13,10 +13,10 @@ fn assets_path(item: &str) -> std::path::PathBuf { } } -fn rustls_config() -> rustls::ServerConfig { +fn rustls_config() -> tokio_rustls::rustls::ServerConfig { static ONCE: std::sync::Once = std::sync::Once::new(); ONCE.call_once(|| { - rustls::crypto::aws_lc_rs::default_provider() + tokio_rustls::rustls::crypto::aws_lc_rs::default_provider() .install_default() .expect("failed to install aws lc provider"); }); @@ -30,7 +30,7 @@ fn rustls_config() -> rustls::ServerConfig { .expect("failed to load key") .expect("no key found"); - rustls::ServerConfig::builder() + tokio_rustls::rustls::ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key) .expect("failed to build config") @@ -51,7 +51,7 @@ async fn main() { let resp = http::Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/html; charset=utf-8") - .body(WT_CLIENT_HTML) + .body(WT_CLIENT_HTML.to_string()) .unwrap(); Ok::<_, Infallible>(resp) } else if req.uri().path() == "/wt" && req.method() == Method::CONNECT { From ff21176c9eedc76111141a26a9f204ca55791663 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 14:29:28 +0200 Subject: [PATCH 08/25] feat(http): some improvements to the wt example --- crates/http/examples/web_transport_client.html | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/http/examples/web_transport_client.html b/crates/http/examples/web_transport_client.html index 948fd07fd..cdeda4b9e 100644 --- a/crates/http/examples/web_transport_client.html +++ b/crates/http/examples/web_transport_client.html @@ -2,11 +2,11 @@

WebTransport demo

-
+

     
-
+                    log("All tests completed successfully!");
+                } catch (e) {
+                    console.error(e);
+                    log("WebTransport failed: " + e);
+                }
+            })();
+        
+    
 
diff --git a/vendor/cargo/defs.bzl b/vendor/cargo/defs.bzl
index 3c07ef655..3e1c518c6 100644
--- a/vendor/cargo/defs.bzl
+++ b/vendor/cargo/defs.bzl
@@ -946,6 +946,7 @@ _NORMAL_DEPENDENCIES = {
         "webtransport": {
             _COMMON_CONDITION: {
                 "h3-datagram": Label("@cargo_vendor//:h3-datagram-0.0.2"),
+                "h3-quinn": Label("@cargo_vendor//:h3-quinn-0.0.10"),
                 "h3-webtransport": Label("@cargo_vendor//:h3-webtransport-0.1.2"),
             },
         },

From aacab99f105d133d8ad5d6931032aec3b586e6e5 Mon Sep 17 00:00:00 2001
From: Lennart Kloock 
Date: Sun, 12 Oct 2025 16:42:46 +0200
Subject: [PATCH 10/25] fix(http): webtransport connections

---
 .../http/examples/web_transport_client.html   |  8 +++----
 crates/http/src/backend/h3.rs                 | 23 ++++++++++++++++---
 2 files changed, 24 insertions(+), 7 deletions(-)

diff --git a/crates/http/examples/web_transport_client.html b/crates/http/examples/web_transport_client.html
index 618351704..c2ed11e1a 100644
--- a/crates/http/examples/web_transport_client.html
+++ b/crates/http/examples/web_transport_client.html
@@ -19,16 +19,16 @@ 

WebTransport demo

return; } try { - const url = "https://" + location.host - + "/wt"; + const url = "https://" + location.host + "/wt"; log("Connecting to " + url); const wt = new WebTransport(url); await wt.ready; log("WebTransport connected"); // Test unidirectional stream - const uniWriter = await wt - .createUnidirectionalStream(); + const writableStream = await wt.createUnidirectionalStream(); + const uniWriter = writableStream.getWriter(); + const encoder = new TextEncoder(); await uniWriter.write( encoder.encode("Hello from uni stream"), diff --git a/crates/http/src/backend/h3.rs b/crates/http/src/backend/h3.rs index dbf9a5b90..9241254dc 100644 --- a/crates/http/src/backend/h3.rs +++ b/crates/http/src/backend/h3.rs @@ -1,5 +1,5 @@ //! HTTP3 backend. -use std::fmt::Debug; +use std::{fmt::Debug, time::Duration}; use std::io; use std::net::SocketAddr; use std::sync::Arc; @@ -68,7 +68,10 @@ where // not quite sure why this is necessary but it is self.rustls_config.max_early_data_size = u32::MAX; let crypto = h3_quinn::quinn::crypto::rustls::QuicServerConfig::try_from(self.rustls_config)?; - let server_config = h3_quinn::quinn::ServerConfig::with_crypto(Arc::new(crypto)); + let mut server_config = h3_quinn::quinn::ServerConfig::with_crypto(Arc::new(crypto)); + let mut transport_config = quinn::TransportConfig::default(); + transport_config.keep_alive_interval(Some(Duration::from_secs(2))); + server_config.transport = Arc::new(transport_config); // Bind the UDP socket let socket = std::net::UdpSocket::bind(self.bind)?; @@ -117,7 +120,21 @@ where tracing::debug!(addr = %addr, "accepted quic connection"); let connection_fut = async move { - let Some(mut h3_conn) = h3::server::Connection::new(h3_quinn::Connection::new(conn)) + #[cfg(not(feature = "webtransport"))] + let h3_conn_builder = h3::server::builder(); + + #[cfg(feature = "webtransport")] + let h3_conn_builder = { + let mut builder = h3::server::builder(); + builder.enable_webtransport(true) + .enable_extended_connect(true) + .enable_datagram(true) + .max_webtransport_sessions(1) + .send_grease(true); + builder + }; + + let Some(mut h3_conn) = h3_conn_builder.build(h3_quinn::Connection::new(conn)) .with_context(&ctx) .await .transpose()? From d7f79037b86fb4a3f8902c97806c15a2ffb2f51b Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 17:05:46 +0200 Subject: [PATCH 11/25] feat(http): add webtransport server options --- crates/http/src/backend/h3.rs | 38 +++++++++++++++++---- crates/http/src/server.rs | 63 ++++++++++++++++++++++++++++++----- 2 files changed, 86 insertions(+), 15 deletions(-) diff --git a/crates/http/src/backend/h3.rs b/crates/http/src/backend/h3.rs index 9241254dc..1d1049fc0 100644 --- a/crates/http/src/backend/h3.rs +++ b/crates/http/src/backend/h3.rs @@ -40,6 +40,13 @@ pub struct Http3Backend { /// Use `[::]` for a dual-stack listener. /// For example, use `[::]:80` to bind to port 80 on both IPv4 and IPv6. bind: SocketAddr, + /// Enable WebTransport support. + #[builder(default = false)] + #[cfg(feature = "webtransport")] + enable_webtransport: bool, + #[builder(default = 1, setters(vis = "", name = max_webtransport_sessions_internal))] + #[cfg(feature = "webtransport")] + max_webtransport_sessions: u64, /// rustls config. /// /// Use this field to set the server into TLS mode. @@ -47,6 +54,23 @@ pub struct Http3Backend { rustls_config: tokio_rustls::rustls::ServerConfig, } +#[cfg(feature = "webtransport")] +impl Http3BackendBuilder +where + S: http3_backend_builder::State, + S::MaxWebtransportSessions: http3_backend_builder::IsUnset, + S::EnableWebtransport: http3_backend_builder::IsSet, +{ + /// Set the maximum number of concurrent WebTransport sessions. + /// + /// Corresponds to [h3::server::Builder::max_webtransport_sessions]. + /// + /// Default is 1 when WebTransport is enabled. + pub fn max_webtransport_sessions(self, max_webtransport_sessions: u64) -> Http3BackendBuilder> { + self.max_webtransport_sessions_internal(max_webtransport_sessions) + } +} + impl Http3Backend where F: HttpServiceFactory + Clone + Send + 'static, @@ -122,17 +146,17 @@ where let connection_fut = async move { #[cfg(not(feature = "webtransport"))] let h3_conn_builder = h3::server::builder(); + #[cfg(feature = "webtransport")] + let mut h3_conn_builder = h3::server::builder(); #[cfg(feature = "webtransport")] - let h3_conn_builder = { - let mut builder = h3::server::builder(); - builder.enable_webtransport(true) + if self.enable_webtransport { + h3_conn_builder.enable_webtransport(true) .enable_extended_connect(true) .enable_datagram(true) - .max_webtransport_sessions(1) + .max_webtransport_sessions(self.max_webtransport_sessions) .send_grease(true); - builder - }; + } let Some(mut h3_conn) = h3_conn_builder.build(h3_quinn::Connection::new(conn)) .with_context(&ctx) @@ -174,7 +198,7 @@ where // Check if this is a WebTransport CONNECT request #[cfg(feature = "webtransport")] - if req.extensions().get::() == Some(&Protocol::WEB_TRANSPORT) + if self.enable_webtransport && req.extensions().get::() == Some(&Protocol::WEB_TRANSPORT) && req.method() == http::Method::CONNECT { #[cfg(feature = "tracing")] diff --git a/crates/http/src/server.rs b/crates/http/src/server.rs index 969bc6d20..e8670af0c 100644 --- a/crates/http/src/server.rs +++ b/crates/http/src/server.rs @@ -37,6 +37,12 @@ pub struct HttpServer { #[builder(default = false, setters(vis = "", name = enable_http3_internal))] #[cfg(feature = "http3")] enable_http3: bool, + #[builder(default = false, setters(vis = "", name = enable_webtransport_internal))] + #[cfg(feature = "webtransport")] + enable_webtransport: bool, + #[builder(default = 1, setters(vis = "", name = max_webtransport_sessions_internal))] + #[cfg(feature = "webtransport")] + max_webtransport_sessions: u64, /// rustls config. /// /// Use this field to set the server into TLS mode. @@ -60,6 +66,38 @@ where } } +#[cfg(feature = "webtransport")] +impl HttpServerBuilder +where + S: http_server_builder::State, + S::EnableWebtransport: http_server_builder::IsUnset, + S::EnableHttp3: http_server_builder::IsSet, +{ + /// Enable WebTransport support. + /// + /// First enable HTTP/3 by calling [`enable_http3`](HttpServerBuilder::enable_http3) to enable WebTransport. + pub fn enable_webtransport(self, enable_webtransport: bool) -> HttpServerBuilder> { + self.enable_webtransport_internal(enable_webtransport) + } +} + +#[cfg(feature = "webtransport")] +impl HttpServerBuilder +where + S: http_server_builder::State, + S::MaxWebtransportSessions: http_server_builder::IsUnset, + S::EnableWebtransport: http_server_builder::IsSet, +{ + /// Set the maximum number of concurrent WebTransport sessions. + /// + /// Corresponds to [h3::server::Builder::max_webtransport_sessions]. + /// + /// Default is 1 when WebTransport is enabled. + pub fn max_webtransport_sessions(self, max_webtransport_sessions: u64) -> HttpServerBuilder> { + self.max_webtransport_sessions_internal(max_webtransport_sessions) + } +} + #[cfg(feature = "tower")] impl HttpServerBuilder, S> where @@ -165,6 +203,10 @@ where #[cfg(feature = "http3")] if self.enable_http3 { rustls_config.alpn_protocols.push(b"h3".to_vec()); + rustls_config.alpn_protocols.push(b"h3-32".to_vec()); + rustls_config.alpn_protocols.push(b"h3-31".to_vec()); + rustls_config.alpn_protocols.push(b"h3-30".to_vec()); + rustls_config.alpn_protocols.push(b"h3-29".to_vec()); } } } @@ -199,15 +241,17 @@ where match (start_tcp_backend, enable_http3) { #[cfg(feature = "http3")] (false, true) => { - let backend = crate::backend::h3::Http3Backend::builder() + let builder = crate::backend::h3::Http3Backend::builder() .ctx(self.ctx) .worker_tasks(self.worker_tasks) .service_factory(self.service_factory) .bind(self.bind) - .rustls_config(_rustls_config) - .build(); + .rustls_config(_rustls_config); - return backend.run().await; + #[cfg(feature = "webtransport")] + let builder = builder.enable_webtransport(self.enable_webtransport).max_webtransport_sessions(self.max_webtransport_sessions); + + return builder.build().run().await; } #[cfg(any(feature = "http1", feature = "http2"))] (true, false) => { @@ -243,14 +287,17 @@ where let hyper = std::pin::pin!(builder.build().run()); - let http3 = crate::backend::h3::Http3Backend::builder() + let http3_builder = crate::backend::h3::Http3Backend::builder() .ctx(self.ctx) .worker_tasks(self.worker_tasks) .service_factory(self.service_factory) .bind(self.bind) - .rustls_config(_rustls_config) - .build() - .run(); + .rustls_config(_rustls_config); + + #[cfg(feature = "webtransport")] + let http3_builder = http3_builder.enable_webtransport(self.enable_webtransport).max_webtransport_sessions(self.max_webtransport_sessions); + + let http3 = http3_builder.build().run(); let http3 = std::pin::pin!(http3); let res = futures::future::select(hyper, http3).await; From acf3ece45946e0c418a69cac6eadd5af89f907b7 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 17:08:08 +0200 Subject: [PATCH 12/25] chore: fmt --- crates/http/examples/web_transport_client.html | 9 ++++++--- crates/http/src/backend/h3.rs | 17 ++++++++++++----- crates/http/src/server.rs | 18 ++++++++++++++---- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/crates/http/examples/web_transport_client.html b/crates/http/examples/web_transport_client.html index c2ed11e1a..9448fa34b 100644 --- a/crates/http/examples/web_transport_client.html +++ b/crates/http/examples/web_transport_client.html @@ -19,15 +19,18 @@

WebTransport demo

return; } try { - const url = "https://" + location.host + "/wt"; + const url = "https://" + location.host + + "/wt"; log("Connecting to " + url); const wt = new WebTransport(url); await wt.ready; log("WebTransport connected"); // Test unidirectional stream - const writableStream = await wt.createUnidirectionalStream(); - const uniWriter = writableStream.getWriter(); + const writableStream = await wt + .createUnidirectionalStream(); + const uniWriter = writableStream + .getWriter(); const encoder = new TextEncoder(); await uniWriter.write( diff --git a/crates/http/src/backend/h3.rs b/crates/http/src/backend/h3.rs index 1d1049fc0..3c4d357c3 100644 --- a/crates/http/src/backend/h3.rs +++ b/crates/http/src/backend/h3.rs @@ -1,8 +1,9 @@ //! HTTP3 backend. -use std::{fmt::Debug, time::Duration}; +use std::fmt::Debug; use std::io; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use body::QuicIncomingBody; use scuffle_context::ContextFutExt; @@ -66,7 +67,10 @@ where /// Corresponds to [h3::server::Builder::max_webtransport_sessions]. /// /// Default is 1 when WebTransport is enabled. - pub fn max_webtransport_sessions(self, max_webtransport_sessions: u64) -> Http3BackendBuilder> { + pub fn max_webtransport_sessions( + self, + max_webtransport_sessions: u64, + ) -> Http3BackendBuilder> { self.max_webtransport_sessions_internal(max_webtransport_sessions) } } @@ -151,14 +155,16 @@ where #[cfg(feature = "webtransport")] if self.enable_webtransport { - h3_conn_builder.enable_webtransport(true) + h3_conn_builder + .enable_webtransport(true) .enable_extended_connect(true) .enable_datagram(true) .max_webtransport_sessions(self.max_webtransport_sessions) .send_grease(true); } - let Some(mut h3_conn) = h3_conn_builder.build(h3_quinn::Connection::new(conn)) + let Some(mut h3_conn) = h3_conn_builder + .build(h3_quinn::Connection::new(conn)) .with_context(&ctx) .await .transpose()? @@ -198,7 +204,8 @@ where // Check if this is a WebTransport CONNECT request #[cfg(feature = "webtransport")] - if self.enable_webtransport && req.extensions().get::() == Some(&Protocol::WEB_TRANSPORT) + if self.enable_webtransport + && req.extensions().get::() == Some(&Protocol::WEB_TRANSPORT) && req.method() == http::Method::CONNECT { #[cfg(feature = "tracing")] diff --git a/crates/http/src/server.rs b/crates/http/src/server.rs index e8670af0c..76bbb4be2 100644 --- a/crates/http/src/server.rs +++ b/crates/http/src/server.rs @@ -76,7 +76,10 @@ where /// Enable WebTransport support. /// /// First enable HTTP/3 by calling [`enable_http3`](HttpServerBuilder::enable_http3) to enable WebTransport. - pub fn enable_webtransport(self, enable_webtransport: bool) -> HttpServerBuilder> { + pub fn enable_webtransport( + self, + enable_webtransport: bool, + ) -> HttpServerBuilder> { self.enable_webtransport_internal(enable_webtransport) } } @@ -93,7 +96,10 @@ where /// Corresponds to [h3::server::Builder::max_webtransport_sessions]. /// /// Default is 1 when WebTransport is enabled. - pub fn max_webtransport_sessions(self, max_webtransport_sessions: u64) -> HttpServerBuilder> { + pub fn max_webtransport_sessions( + self, + max_webtransport_sessions: u64, + ) -> HttpServerBuilder> { self.max_webtransport_sessions_internal(max_webtransport_sessions) } } @@ -249,7 +255,9 @@ where .rustls_config(_rustls_config); #[cfg(feature = "webtransport")] - let builder = builder.enable_webtransport(self.enable_webtransport).max_webtransport_sessions(self.max_webtransport_sessions); + let builder = builder + .enable_webtransport(self.enable_webtransport) + .max_webtransport_sessions(self.max_webtransport_sessions); return builder.build().run().await; } @@ -295,7 +303,9 @@ where .rustls_config(_rustls_config); #[cfg(feature = "webtransport")] - let http3_builder = http3_builder.enable_webtransport(self.enable_webtransport).max_webtransport_sessions(self.max_webtransport_sessions); + let http3_builder = http3_builder + .enable_webtransport(self.enable_webtransport) + .max_webtransport_sessions(self.max_webtransport_sessions); let http3 = http3_builder.build().run(); let http3 = std::pin::pin!(http3); From 0a5a83bee33f7cf8489b1357d94056eab2c38faa Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 19:10:10 +0200 Subject: [PATCH 13/25] fix: sync readme and feature gate --- crates/http/README.md | 2 +- crates/http/src/body.rs | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/http/README.md b/crates/http/README.md index b51965b33..a6b6d9e83 100644 --- a/crates/http/README.md +++ b/crates/http/README.md @@ -32,7 +32,7 @@ See the [changelog](./CHANGELOG.md) for a full release history. * **`http1`** *(enabled by default)* — Enables http1 support * **`http2`** *(enabled by default)* — Enabled http2 support * **`http3`** — Enables http3 support -* **`webtransport`** — Enables WebTransport over HTTP/3 support (requires http3) +* **`webtransport`** — Enables WebTransport over HTTP/3 support (enables http3) * **`tls-rustls`** — Enables tls via rustls * **`http3-tls-rustls`** — Alias for \[“http3”, “tls-rustls”\] * **`tower`** *(enabled by default)* — Enables tower service support diff --git a/crates/http/src/body.rs b/crates/http/src/body.rs index 4b2c2444b..9afc26753 100644 --- a/crates/http/src/body.rs +++ b/crates/http/src/body.rs @@ -32,6 +32,7 @@ pub enum IncomingBody { #[cfg(feature = "http3")] Quic(crate::backend::h3::body::QuicIncomingBody), /// An empty body (used for WebTransport sessions). + #[cfg(feature = "webtransport")] Empty, } @@ -59,6 +60,7 @@ impl http_body::Body for IncomingBody { IncomingBody::Hyper(body) => body.is_end_stream(), #[cfg(feature = "http3")] IncomingBody::Quic(body) => body.is_end_stream(), + #[cfg(feature = "webtransport")] IncomingBody::Empty => true, #[cfg(not(any(feature = "http1", feature = "http2", feature = "http3")))] _ => false, @@ -74,6 +76,7 @@ impl http_body::Body for IncomingBody { IncomingBody::Hyper(body) => std::pin::Pin::new(body).poll_frame(_cx).map_err(Into::into), #[cfg(feature = "http3")] IncomingBody::Quic(body) => std::pin::Pin::new(body).poll_frame(_cx).map_err(Into::into), + #[cfg(feature = "webtransport")] IncomingBody::Empty => std::task::Poll::Ready(None), #[cfg(not(any(feature = "http1", feature = "http2", feature = "http3")))] _ => std::task::Poll::Ready(None), @@ -86,6 +89,7 @@ impl http_body::Body for IncomingBody { IncomingBody::Hyper(body) => body.size_hint(), #[cfg(feature = "http3")] IncomingBody::Quic(body) => body.size_hint(), + #[cfg(feature = "webtransport")] IncomingBody::Empty => http_body::SizeHint::with_exact(0), #[cfg(not(any(feature = "http1", feature = "http2", feature = "http3")))] _ => http_body::SizeHint::default(), From 9744bf749031d6b46e95a527c663cb42b55d86a8 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 20:51:27 +0200 Subject: [PATCH 14/25] fix(http): doctests --- crates/http/src/backend/h3/webtransport.rs | 55 ++++++++++++++++++++-- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/crates/http/src/backend/h3/webtransport.rs b/crates/http/src/backend/h3/webtransport.rs index acd40b5f0..9c71b990d 100644 --- a/crates/http/src/backend/h3/webtransport.rs +++ b/crates/http/src/backend/h3/webtransport.rs @@ -22,9 +22,8 @@ use h3_webtransport::stream::{BidiStream, RecvStream as WtRecvStream, SendStream /// # Example /// /// ```rust,ignore -/// use scuffle_http::{IncomingRequest, Response}; -/// use scuffle_http::backend::h3::webtransport::WebTransportSession; -/// +/// # use scuffle_http::{IncomingRequest, Response}; +/// # use scuffle_http::backend::h3::webtransport::WebTransportSession; /// async fn handle_webtransport(req: IncomingRequest) -> Result, std::convert::Infallible> { /// if let Some(session) = req.extensions().get::() { /// // Handle WebTransport session @@ -67,8 +66,7 @@ impl WebTransportSession { /// # Example /// /// ```rust,ignore - /// use scuffle_http::backend::h3::webtransport::{WebTransportSession, AcceptedBi}; - /// + /// # use scuffle_http::backend::h3::webtransport::{WebTransportSession, AcceptedBi}; /// async fn handle_session(session: WebTransportSession) { /// while let Some(Ok(accepted)) = session.accept_bi().await { /// match accepted { @@ -113,9 +111,14 @@ impl WebTransportSession { /// # Example /// /// ```rust,ignore + /// # use bytes::Bytes; + /// # use scuffle_http::backend::h3::webtransport::WebTransportSession; + /// # async fn dummy(session: WebTransportSession) -> Result<(), Box> { /// let (mut send, mut recv) = session.open_bi().await?; /// send.write(Bytes::from("Hello")).await?; /// send.finish().await?; + /// # Ok(()) + /// # } /// ``` pub async fn open_bi(&self) -> Result<(WebTransportSendStream, WebTransportRecvStream), h3::error::StreamError> { let stream = self.session.open_bi(WebTransportStreamId::next_session_id()).await?; @@ -132,9 +135,14 @@ impl WebTransportSession { /// # Example /// /// ```rust,ignore + /// # use bytes::Bytes; + /// # use scuffle_http::backend::h3::webtransport::WebTransportSession; + /// # async fn dummy(session: WebTransportSession) -> Result<(), Box> { /// let mut send = session.open_uni().await?; /// send.write(Bytes::from("Hello")).await?; /// send.finish().await?; + /// # Ok(()) + /// # } /// ``` pub async fn open_uni(&self) -> Result { let send = self.session.open_uni(WebTransportStreamId::next_session_id()).await?; @@ -153,8 +161,13 @@ impl WebTransportSession { /// # Example /// /// ```rust,ignore + /// # use bytes::Bytes; + /// # use scuffle_http::backend::h3::webtransport::WebTransportSession; + /// # async fn dummy(session: WebTransportSession) -> Result<(), h3_datagram::datagram_handler::SendDatagramError> { /// let mut sender = session.datagram_sender(); /// sender.send_datagram(Bytes::from("Hello"))?; + /// # Ok(()) + /// # } /// ``` pub fn datagram_sender( &self, @@ -170,10 +183,13 @@ impl WebTransportSession { /// # Example /// /// ```rust,ignore + /// # use scuffle_http::backend::h3::webtransport::WebTransportSession; + /// # async fn dummy(session: WebTransportSession) { /// let mut reader = session.datagram_reader(); /// while let Ok(datagram) = reader.read_datagram().await { /// println!("Received: {} bytes", datagram.payload().len()); /// } + /// # } /// ``` pub fn datagram_reader( &self, @@ -211,6 +227,10 @@ impl WebTransportBidiStream { /// # Example /// /// ```rust,ignore + /// # use bytes::Bytes; + /// # use h3::quic::StreamErrorIncoming; + /// # use scuffle_http::backend::h3::webtransport::WebTransportBidiStream; + /// # async fn dummy(bidi_stream: WebTransportBidiStream) -> Result<(), StreamErrorIncoming> { /// let (mut send, mut recv) = bidi_stream.split(); /// tokio::spawn(async move { /// while let Ok(Some(data)) = recv.read().await { @@ -218,6 +238,8 @@ impl WebTransportBidiStream { /// } /// }); /// send.write(Bytes::from("Hello")).await?; + /// # Ok(()) + /// # } /// ``` pub fn split(self) -> (WebTransportSendStream, WebTransportRecvStream) { use h3::quic::BidiStream; @@ -239,7 +261,11 @@ impl WebTransportBidiStream { /// # Example /// /// ```rust,ignore + /// # use scuffle_http::backend::h3::webtransport::WebTransportBidiStream; + /// # async fn dummy(mut bidi_stream: WebTransportBidiStream) -> Result<(), std::io::Error> { /// let data = bidi_stream.read_to_end(1024 * 1024).await?; // max 1MB + /// # Ok(()) + /// # } /// ``` pub async fn read_to_end(&mut self, max_size: usize) -> Result { let mut chunks = Vec::new(); @@ -318,9 +344,12 @@ impl WebTransportRecvStream { /// # Example /// /// ```rust,ignore + /// # use scuffle_http::backend::h3::webtransport::WebTransportRecvStream; + /// # async fn dummy(mut recv_stream: WebTransportRecvStream) { /// while let Ok(Some(data)) = recv_stream.read().await { /// println!("Received {} bytes", data.len()); /// } + /// # } /// ``` pub async fn read(&mut self) -> Result, StreamErrorIncoming> { use h3::quic::RecvStream; @@ -334,8 +363,12 @@ impl WebTransportRecvStream { /// # Example /// /// ```rust,ignore + /// # use scuffle_http::backend::h3::webtransport::WebTransportRecvStream; + /// # async fn dummy(mut recv_stream: WebTransportRecvStream) -> Result<(), std::io::Error> { /// let data = recv_stream.read_to_end(1024 * 1024).await?; // max 1MB /// println!("Received complete message: {} bytes", data.len()); + /// # Ok(()) + /// # } /// ``` pub async fn read_to_end(&mut self, max_size: usize) -> Result { let mut chunks = Vec::new(); @@ -397,8 +430,14 @@ impl WebTransportSendStream { /// # Example /// /// ```rust,ignore + /// # use bytes::Bytes; + /// # use h3::quic::StreamErrorIncoming; + /// # use scuffle_http::backend::h3::webtransport::WebTransportSendStream; + /// # async fn dummy(mut send_stream: WebTransportSendStream) -> Result<(), StreamErrorIncoming> { /// send_stream.write(Bytes::from("Hello, world!")).await?; /// send_stream.finish().await?; + /// # Ok(()) + /// # } /// ``` pub async fn write(&mut self, data: Bytes) -> Result<(), StreamErrorIncoming> { use bytes::Buf; @@ -422,7 +461,13 @@ impl WebTransportSendStream { /// # Example /// /// ```rust,ignore + /// # use bytes::Bytes; + /// # use h3::quic::StreamErrorIncoming; + /// # use scuffle_http::backend::h3::webtransport::WebTransportSendStream; + /// # async fn dummy(mut send_stream: WebTransportSendStream) -> Result<(), StreamErrorIncoming> { /// send_stream.write_all(Bytes::from("Complete message")).await?; + /// # Ok(()) + /// # } /// ``` pub async fn write_all(&mut self, data: Bytes) -> Result<(), StreamErrorIncoming> { self.write(data).await?; From a266c115deeb66a95fe11e2c1546d77a2456251c Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 20:53:42 +0200 Subject: [PATCH 15/25] docs: add changelog file --- changes.d/pr-621.toml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changes.d/pr-621.toml diff --git a/changes.d/pr-621.toml b/changes.d/pr-621.toml new file mode 100644 index 000000000..ccc08d66d --- /dev/null +++ b/changes.d/pr-621.toml @@ -0,0 +1,4 @@ +[[scuffle-http]] +category = "feat" +description = "WebTransport over HTTP/3 support" +authors = ["@SimaoMoreira5228"] From fddc4375abede80cabf4364e4731901ee836c638 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 21:44:04 +0200 Subject: [PATCH 16/25] fix(http): doctests --- crates/http/src/backend/h3/webtransport.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/http/src/backend/h3/webtransport.rs b/crates/http/src/backend/h3/webtransport.rs index 9c71b990d..cfb3b88fe 100644 --- a/crates/http/src/backend/h3/webtransport.rs +++ b/crates/http/src/backend/h3/webtransport.rs @@ -24,7 +24,7 @@ use h3_webtransport::stream::{BidiStream, RecvStream as WtRecvStream, SendStream /// ```rust,ignore /// # use scuffle_http::{IncomingRequest, Response}; /// # use scuffle_http::backend::h3::webtransport::WebTransportSession; -/// async fn handle_webtransport(req: IncomingRequest) -> Result, std::convert::Infallible> { +/// async fn handle_webtransport(req: IncomingRequest) -> Result, std::convert::Infallible> { /// if let Some(session) = req.extensions().get::() { /// // Handle WebTransport session /// tokio::spawn({ @@ -38,13 +38,13 @@ use h3_webtransport::stream::{BidiStream, RecvStream as WtRecvStream, SendStream /// /// return Ok(Response::builder() /// .status(200) -/// .body(String::new()) +/// .body(()) /// .unwrap()); /// } /// /// Ok(Response::builder() /// .status(404) -/// .body(String::new()) +/// .body(()) /// .unwrap()) /// } /// ``` From 15601ccb24235d51e49cc35cd072ec111822a8d6 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 22:15:51 +0200 Subject: [PATCH 17/25] refactor(http): rename example --- crates/http/Cargo.toml | 4 ++-- crates/http/examples/README.md | 7 ++++--- .../http/examples/{webtransport_tls.rs => webtransport.rs} | 2 +- ...{web_transport_client.html => webtransport_client.html} | 0 4 files changed, 7 insertions(+), 6 deletions(-) rename crates/http/examples/{webtransport_tls.rs => webtransport.rs} (99%) rename crates/http/examples/{web_transport_client.html => webtransport_client.html} (100%) diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index cc8927469..b8bb8c9d1 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -29,8 +29,8 @@ path = "examples/axum.rs" required-features = ["default", "tls-rustls", "http3", "tower", "tracing"] [[example]] -name = "scuffle-http-webtransport-tls" -path = "examples/webtransport_tls.rs" +name = "scuffle-http-webtransport" +path = "examples/webtransport.rs" required-features = ["default", "tls-rustls", "http3", "webtransport", "tracing"] [features] diff --git a/crates/http/examples/README.md b/crates/http/examples/README.md index 99f987811..2152801ad 100644 --- a/crates/http/examples/README.md +++ b/crates/http/examples/README.md @@ -2,6 +2,7 @@ Examples of using the `scuffle-http` crate. -- [echo](./src/echo.rs) - A simple echo server. -- [echo-tls](./src/echo_tls.rs) - A simple echo server with encryption (HTTPS) and HTTP/3. -- [axum](./src/axum.rs) - Example of using the `axum` web framework with `scuffle-http`. +- [echo](./echo.rs) - A simple echo server. +- [echo-tls](./echo_tls.rs) - A simple echo server with encryption (HTTPS) and HTTP/3. +- [axum](./axum.rs) - Example of using the `axum` web framework with `scuffle-http`. +- [webtransport](./webtransport.rs) - Example of setting up a WebTransport server. diff --git a/crates/http/examples/webtransport_tls.rs b/crates/http/examples/webtransport.rs similarity index 99% rename from crates/http/examples/webtransport_tls.rs rename to crates/http/examples/webtransport.rs index a0cfc9cb0..e2fa9dfb2 100644 --- a/crates/http/examples/webtransport_tls.rs +++ b/crates/http/examples/webtransport.rs @@ -36,7 +36,7 @@ fn rustls_config() -> tokio_rustls::rustls::ServerConfig { .expect("failed to build config") } -const WT_CLIENT_HTML: &str = include_str!("web_transport_client.html"); +const WT_CLIENT_HTML: &str = include_str!("webtransport_client.html"); #[tokio::main] async fn main() { diff --git a/crates/http/examples/web_transport_client.html b/crates/http/examples/webtransport_client.html similarity index 100% rename from crates/http/examples/web_transport_client.html rename to crates/http/examples/webtransport_client.html From b27e9d3d2a828c50f032c5c630e8fac797e56aad Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Sun, 12 Oct 2025 23:09:20 +0200 Subject: [PATCH 18/25] fix(http): clean up wt example Similar to #637 --- crates/http/examples/webtransport.rs | 39 +++++++++++++++------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/crates/http/examples/webtransport.rs b/crates/http/examples/webtransport.rs index e2fa9dfb2..1e65e47b8 100644 --- a/crates/http/examples/webtransport.rs +++ b/crates/http/examples/webtransport.rs @@ -4,6 +4,7 @@ use std::net::SocketAddr; use http::{Method, StatusCode}; use scuffle_http as http_srv; use scuffle_http::service::{fn_http_service, service_clone_factory}; +use tokio_rustls::rustls::pki_types::{CertificateDer, PrivateKeyDer, pem::PemObject}; fn assets_path(item: &str) -> std::path::PathBuf { if let Some(env) = std::env::var_os("ASSETS_DIR") { @@ -21,14 +22,11 @@ fn rustls_config() -> tokio_rustls::rustls::ServerConfig { .expect("failed to install aws lc provider"); }); - let certfile = std::fs::File::open(assets_path("cert.pem")).expect("cert not found"); - let certs = rustls_pemfile::certs(&mut std::io::BufReader::new(certfile)) + let certs = CertificateDer::pem_file_iter(assets_path("cert.pem")) + .expect("failed to load certfile") .collect::, _>>() - .expect("failed to load certs"); - let keyfile = std::fs::File::open(assets_path("key.pem")).expect("key not found"); - let key = rustls_pemfile::private_key(&mut std::io::BufReader::new(keyfile)) - .expect("failed to load key") - .expect("no key found"); + .expect("failed to load cert"); + let key = PrivateKeyDer::from_pem_file(assets_path("key.pem")).expect("failed to load key"); tokio_rustls::rustls::ServerConfig::builder() .with_no_client_auth() @@ -56,7 +54,10 @@ async fn main() { Ok::<_, Infallible>(resp) } else if req.uri().path() == "/wt" && req.method() == Method::CONNECT { // Extract the WebTransport session from the request - if let Some(session) = req.extensions().get::() { + if let Some(session) = req + .extensions() + .get::() + { let session = session.clone(); tracing::info!("WebTransport session established"); @@ -103,9 +104,11 @@ async fn main() { tokio::spawn(async move { match stream.read_to_end(64 * 1024).await { Ok(data) => { - tracing::info!("Received {} bytes on uni stream: {:?}", + tracing::info!( + "Received {} bytes on uni stream: {:?}", data.len(), - String::from_utf8_lossy(&data)); + String::from_utf8_lossy(&data) + ); } Err(e) => { tracing::warn!("Failed to read from uni stream: {}", e); @@ -145,22 +148,22 @@ async fn main() { } }); - return Ok::<_, Infallible>( - http::Response::builder() - .status(StatusCode::OK) - .body(String::new()) - .unwrap() - ); + return Ok::<_, Infallible>(http::Response::builder().status(StatusCode::OK).body(String::new()).unwrap()); } Ok::<_, Infallible>( http::Response::builder() .status(StatusCode::BAD_REQUEST) .body("WebTransport session not found".to_string()) - .unwrap() + .unwrap(), ) } else { - Ok::<_, Infallible>(http::Response::builder().status(StatusCode::NOT_FOUND).body(String::new()).unwrap()) + Ok::<_, Infallible>( + http::Response::builder() + .status(StatusCode::NOT_FOUND) + .body(String::new()) + .unwrap(), + ) } }); From 20f93796169d21108be154a00c2b5dcf825d40a0 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Mon, 13 Oct 2025 09:50:30 +0200 Subject: [PATCH 19/25] feat(http): add into_inner functions --- crates/http/src/backend/h3/webtransport.rs | 64 +++++++++++++++++----- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/crates/http/src/backend/h3/webtransport.rs b/crates/http/src/backend/h3/webtransport.rs index cfb3b88fe..e692b1499 100644 --- a/crates/http/src/backend/h3/webtransport.rs +++ b/crates/http/src/backend/h3/webtransport.rs @@ -30,7 +30,7 @@ use h3_webtransport::stream::{BidiStream, RecvStream as WtRecvStream, SendStream /// tokio::spawn({ /// let session = session.clone(); /// async move { -/// while let Some(Ok(accepted)) = session.accept_bi().await { +/// while let Ok(Some(accepted)) = session.accept_bi().await { /// // Handle bidirectional streams /// } /// } @@ -68,7 +68,7 @@ impl WebTransportSession { /// ```rust,ignore /// # use scuffle_http::backend::h3::webtransport::{WebTransportSession, AcceptedBi}; /// async fn handle_session(session: WebTransportSession) { - /// while let Some(Ok(accepted)) = session.accept_bi().await { + /// while let Ok(Some(accepted)) = session.accept_bi().await { /// match accepted { /// AcceptedBi::BidiStream(stream) => { /// // Handle raw bidirectional stream @@ -80,16 +80,16 @@ impl WebTransportSession { /// } /// } /// ``` - pub async fn accept_bi(&self) -> Option> { + pub async fn accept_bi(&self) -> Result, h3::error::StreamError> { match self.session.accept_bi().await { Ok(Some(H3AcceptedBi::BidiStream(id, stream))) => { - Some(Ok(AcceptedBi::BidiStream(WebTransportBidiStream { stream, _id: id }))) + Ok(Some(AcceptedBi::BidiStream(WebTransportBidiStream { stream, _id: id }))) } Ok(Some(H3AcceptedBi::Request(req, stream))) => { - Some(Ok(AcceptedBi::Request(req, WebTransportRequestStream { stream }))) + Ok(Some(AcceptedBi::Request(req, WebTransportRequestStream { stream }))) } - Ok(None) => None, - Err(e) => Some(Err(e)), + Ok(None) => Ok(None), + Err(e) => Err(e), } } @@ -98,12 +98,11 @@ impl WebTransportSession { /// Returns `None` when the session is closed or no more streams are available. pub async fn accept_uni( &self, - ) -> Option> { - match self.session.accept_uni().await { - Ok(Some((id, stream))) => Some(Ok((WebTransportStreamId(id), WebTransportRecvStream { stream }))), - Ok(None) => None, - Err(e) => Some(Err(e)), - } + ) -> Result, h3::error::ConnectionError> { + self.session + .accept_uni() + .await + .map(|o| o.map(|(id, stream)| (WebTransportStreamId(id), WebTransportRecvStream { stream }))) } /// Open a new bidirectional stream. @@ -222,6 +221,13 @@ pub struct WebTransportBidiStream { } impl WebTransportBidiStream { + /// Get the inner [`h3_webtransport::stream::BidiStream`]. + /// + /// Can be used to access lower-level functionality. + pub fn into_inner(self) -> BidiStream, Bytes> { + self.stream + } + /// Split this stream into separate send and receive halves. /// /// # Example @@ -258,6 +264,11 @@ impl WebTransportBidiStream { /// Read all remaining data from the receive side until the stream is finished. /// + /// This collects all chunks into a single [`Bytes`] object. + /// + /// Returns an [`io::Error`] if the total size exceeds `max_size` or any [`read`](WebTransportBidiStream::read) + /// call errors. + /// /// # Example /// /// ```rust,ignore @@ -337,6 +348,13 @@ pub struct WebTransportRecvStream { } impl WebTransportRecvStream { + /// Get the inner [`h3_webtransport::stream::RecvStream`]. + /// + /// Can be used to access lower-level functionality. + pub fn into_inner(self) -> WtRecvStream { + self.stream + } + /// Read data from the stream. /// /// Returns `Ok(None)` when the stream is finished. @@ -358,7 +376,10 @@ impl WebTransportRecvStream { /// Read all remaining data from the stream until it's finished. /// - /// This collects all chunks into a single `Bytes` object. + /// This collects all chunks into a single [`Bytes`] object. + /// + /// Returns an [`io::Error`] if the total size exceeds `max_size` or any [`read`](WebTransportRecvStream::read) + /// call errors. /// /// # Example /// @@ -425,6 +446,13 @@ pub struct WebTransportSendStream { } impl WebTransportSendStream { + /// Get the inner [`h3_webtransport::stream::SendStream`]. + /// + /// Can be used to access lower-level functionality. + pub fn into_inner(self) -> WtSendStream, Bytes> { + self.stream + } + /// Write data to the stream. /// /// # Example @@ -488,6 +516,7 @@ impl WebTransportSendStream { self.stream.reset(reset_code) } } + impl fmt::Debug for WebTransportSendStream { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("WebTransportSendStream").finish_non_exhaustive() @@ -500,6 +529,13 @@ pub struct WebTransportRequestStream { } impl WebTransportRequestStream { + /// Get the inner [`h3::server::RequestStream`]. + /// + /// Can be used to access lower-level functionality. + pub fn into_inner(self) -> h3::server::RequestStream, Bytes> { + self.stream + } + /// Split this stream into separate send and receive halves. pub fn split( self, From 5c3998b607e5324708039a85efebf76d106c3193 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Mon, 13 Oct 2025 18:48:28 +0200 Subject: [PATCH 20/25] chore: vendor --- Cargo.lock | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index e5e578869..c9e9664af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2377,6 +2377,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3-datagram" +version = "0.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d2c9f77921668673721ae40f17c729fc48b9e38a663858097cea547484fdf0f" +dependencies = [ + "bytes", + "h3", + "pin-project-lite", +] + [[package]] name = "h3-quinn" version = "0.0.10" @@ -2386,12 +2397,29 @@ dependencies = [ "bytes", "futures", "h3", + "h3-datagram", "quinn", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "h3-webtransport" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d91a50fd582a5d67b1f756fba3cd9c66367ff4f23e1017c882f664d63b350a7" +dependencies = [ + "bytes", + "futures-util", + "h3", + "h3-datagram", + "http", + "pin-project-lite", + "tokio", + "tracing", +] + [[package]] name = "half" version = "1.8.3" @@ -2841,7 +2869,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" dependencies = [ "equivalent", - "hashbrown 0.15.5", + "hashbrown 0.16.0", "serde", "serde_core", ] @@ -5772,7 +5800,9 @@ dependencies = [ "document-features", "futures", "h3", + "h3-datagram", "h3-quinn", + "h3-webtransport", "http", "http-body", "hyper", From 5d8a3b3de15757c7aac29d6c20ea26e64dae277e Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Mon, 13 Oct 2025 18:49:17 +0200 Subject: [PATCH 21/25] fix(http): add webtransport example bazel target --- crates/http/BUILD.bazel | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/http/BUILD.bazel b/crates/http/BUILD.bazel index 392044b17..acfce83a7 100644 --- a/crates/http/BUILD.bazel +++ b/crates/http/BUILD.bazel @@ -36,3 +36,8 @@ scuffle_example( name = "examples_echo_tls", srcs = ["examples/echo_tls.rs"], ) + +scuffle_example( + name = "examples_webtransport", + srcs = ["examples/webtransport.rs"], +) From 07a3339b1710f5a6785c69ba9ccacd3fc115430b Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Mon, 13 Oct 2025 20:11:46 +0200 Subject: [PATCH 22/25] fix(http): wt example --- crates/http/BUILD.bazel | 1 + crates/http/examples/webtransport.rs | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/http/BUILD.bazel b/crates/http/BUILD.bazel index acfce83a7..3cabb4bae 100644 --- a/crates/http/BUILD.bazel +++ b/crates/http/BUILD.bazel @@ -40,4 +40,5 @@ scuffle_example( scuffle_example( name = "examples_webtransport", srcs = ["examples/webtransport.rs"], + data = ["examples/webtransport_client.html"], ) diff --git a/crates/http/examples/webtransport.rs b/crates/http/examples/webtransport.rs index 1e65e47b8..dd63cae05 100644 --- a/crates/http/examples/webtransport.rs +++ b/crates/http/examples/webtransport.rs @@ -66,7 +66,7 @@ async fn main() { let session = session.clone(); async move { use http_srv::backend::h3::webtransport::AcceptedBi; - while let Some(Ok(accepted)) = session.accept_bi().await { + while let Ok(Some(accepted)) = session.accept_bi().await { match accepted { AcceptedBi::BidiStream(mut stream) => { tokio::spawn(async move { @@ -100,7 +100,7 @@ async fn main() { tokio::spawn({ let session = session.clone(); async move { - while let Some(Ok((_id, mut stream))) = session.accept_uni().await { + while let Ok(Some((_id, mut stream))) = session.accept_uni().await { tokio::spawn(async move { match stream.read_to_end(64 * 1024).await { Ok(data) => { From ee41cb019fe4b819cb692401a8d37a3af339f17b Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Mon, 13 Oct 2025 20:55:47 +0200 Subject: [PATCH 23/25] fix(http): clean up feature flags --- crates/http/Cargo.toml | 4 ++-- crates/http/src/lib.rs | 3 +++ vendor/cargo/defs.bzl | 2 -- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index b8bb8c9d1..57e140154 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -61,8 +61,8 @@ http2 = [ ] ## Enables http3 support http3 = ["dep:quinn", "dep:h3-quinn", "dep:h3"] -## Enables WebTransport over HTTP/3 support (enables http3) -webtransport = ["http3", "dep:h3-webtransport", "dep:h3-datagram", "h3-quinn/datagram"] +## Enables WebTransport over HTTP/3 support +webtransport = ["dep:h3-webtransport", "dep:h3-datagram", "h3-quinn?/datagram"] ## Enables tls via rustls tls-rustls = ["dep:tokio-rustls"] ## Alias for ["http3", "tls-rustls"] diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs index 05d018eac..213c62687 100644 --- a/crates/http/src/lib.rs +++ b/crates/http/src/lib.rs @@ -61,6 +61,9 @@ #[cfg(all(feature = "http3", not(feature = "tls-rustls")))] compile_error!("feature \"tls-rustls\" must be enabled when \"http3\" is enabled."); +#[cfg(all(feature = "webtransport", not(feature = "http3")))] +compile_error!("feature \"http3\" must be enabled when \"webtransport\" is enabled."); + #[cfg(any(feature = "http1", feature = "http2", feature = "http3"))] pub mod backend; pub mod body; diff --git a/vendor/cargo/defs.bzl b/vendor/cargo/defs.bzl index 3e1c518c6..72f7bfaf4 100644 --- a/vendor/cargo/defs.bzl +++ b/vendor/cargo/defs.bzl @@ -946,7 +946,6 @@ _NORMAL_DEPENDENCIES = { "webtransport": { _COMMON_CONDITION: { "h3-datagram": Label("@cargo_vendor//:h3-datagram-0.0.2"), - "h3-quinn": Label("@cargo_vendor//:h3-quinn-0.0.10"), "h3-webtransport": Label("@cargo_vendor//:h3-webtransport-0.1.2"), }, }, @@ -4366,7 +4365,6 @@ _FEATURE_FLAGS = { "tracing": [ ], "webtransport": [ - "http3", ], }, "crates/metrics": { From f4267a725776308d02312a99e71512542f2cd5ae Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Wed, 15 Oct 2025 16:55:41 +0200 Subject: [PATCH 24/25] feat(http): include wt session as an extension --- crates/http/src/extensions.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/http/src/extensions.rs b/crates/http/src/extensions.rs index 0f8f0963e..0096d0d41 100644 --- a/crates/http/src/extensions.rs +++ b/crates/http/src/extensions.rs @@ -2,6 +2,9 @@ use std::ops::Deref; +#[cfg(feature = "webtransport")] +pub use crate::backend::h3::webtransport::WebTransportSession; + /// This extension is always present on the request and contains the remote address of the client. #[derive(Clone, Debug)] pub struct ClientAddr(pub std::net::SocketAddr); From 737074fb66b5713ec350bfe306717a5e33037cf3 Mon Sep 17 00:00:00 2001 From: Lennart Kloock Date: Wed, 15 Oct 2025 17:07:52 +0200 Subject: [PATCH 25/25] fix(http): sync readme --- crates/http/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/http/README.md b/crates/http/README.md index a6b6d9e83..aae010684 100644 --- a/crates/http/README.md +++ b/crates/http/README.md @@ -32,7 +32,7 @@ See the [changelog](./CHANGELOG.md) for a full release history. * **`http1`** *(enabled by default)* — Enables http1 support * **`http2`** *(enabled by default)* — Enabled http2 support * **`http3`** — Enables http3 support -* **`webtransport`** — Enables WebTransport over HTTP/3 support (enables http3) +* **`webtransport`** — Enables WebTransport over HTTP/3 support * **`tls-rustls`** — Enables tls via rustls * **`http3-tls-rustls`** — Alias for \[“http3”, “tls-rustls”\] * **`tower`** *(enabled by default)* — Enables tower service support