From df3494dd1723d0e89a4191d23657f0e4ed757465 Mon Sep 17 00:00:00 2001 From: Markus Stange Date: Mon, 12 Feb 2024 19:31:06 -0500 Subject: [PATCH] Update hyper to 1.1.0. We now build hyper twice because reqwest still depends on hyper 0.14. This will be solved once reqwest is on hyper 1. https://github.com/seanmonstar/reqwest/issues/2039 --- Cargo.lock | 117 +++++++++++++++++++++++--- samply/Cargo.toml | 6 +- samply/src/server.rs | 195 ++++++++++++++++++++++++------------------- 3 files changed, 218 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19b4d35c..a1e4e58a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -794,7 +794,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.11", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", "indexmap", "slab", "tokio", @@ -834,6 +853,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -841,7 +871,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.11", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -867,9 +920,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.24", + "http 0.2.11", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -881,6 +934,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.2", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -888,13 +961,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.11", + "hyper 0.14.28", "rustls", "tokio", "tokio-rustls", ] +[[package]] +name = "hyper-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2", + "tokio", +] + [[package]] name = "idna" version = "0.5.0" @@ -1463,10 +1552,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.24", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.28", "hyper-rustls", "ipnet", "js-sys", @@ -1596,9 +1685,12 @@ dependencies = [ "dirs", "flate2", "framehop", + "futures-util", "fxhash", "fxprof-processed-profile", - "hyper", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "lazy_static", "libc", "linux-perf-data", @@ -1623,6 +1715,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-util", "uname", "uuid", "wholesym", diff --git a/samply/Cargo.toml b/samply/Cargo.toml index 1ae11692..970cc96e 100644 --- a/samply/Cargo.toml +++ b/samply/Cargo.toml @@ -18,7 +18,11 @@ framehop = "0.9.0" linux-perf-data = "0.8.2" tokio = { version = "1.26.0", features = ["rt", "rt-multi-thread", "macros"] } -hyper = { version = "0.14.25", features = ["full"] } +tokio-util = "0.7.10" +hyper = { version = "1", features = ["full"] } +hyper-util = { version = "0.1.3", features = ["server", "http1", "tokio"] } +http-body-util = "0.1" +futures-util = "0.3" clap = { version = "4", features = ["derive"] } byteorder = "1.4.3" debugid = "0.8.0" diff --git a/samply/src/server.rs b/samply/src/server.rs index 0df02439..c831f6cf 100644 --- a/samply/src/server.rs +++ b/samply/src/server.rs @@ -1,19 +1,21 @@ use flate2::read::GzDecoder; -use hyper::body::Bytes; -use hyper::server::conn::AddrIncoming; -use hyper::server::Builder; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{header, Body, Request, Response, Server}; -use hyper::{Method, StatusCode}; +use futures_util::TryStreamExt; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Either, StreamBody}; +use hyper::body::{Bytes, Frame}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{header, Method, Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; use percent_encoding::{utf8_percent_encode, AsciiSet, CONTROLS}; use rand::RngCore; use serde_derive::Deserialize; -use tokio::io::AsyncReadExt; +use tokio::net::TcpListener; +use tokio_util::io::ReaderStream; use wholesym::debugid::DebugId; use wholesym::{CodeId, LibraryInfo, SymbolManager, SymbolManagerConfig}; use std::collections::HashMap; -use std::convert::Infallible; use std::ffi::{OsStr, OsString}; use std::io::BufReader; use std::net::SocketAddr; @@ -85,7 +87,7 @@ async fn start_server( HashMap::new() }; - let (builder, addr) = make_builder_at_port(port_selection); + let (listener, addr) = make_listener(port_selection).await; let token = generate_token(); let path_prefix = format!("/{token}"); @@ -137,25 +139,14 @@ async fn start_server( symbol_manager.add_known_library(lib_info); } let symbol_manager = Arc::new(symbol_manager); - let new_service = make_service_fn(move |_conn| { - let symbol_manager = symbol_manager.clone(); - let profile_filename = profile_filename.map(PathBuf::from); - let template_values = template_values.clone(); - let path_prefix = path_prefix.clone(); - async { - Ok::<_, Infallible>(service_fn(move |req| { - symbolication_service( - req, - template_values.clone(), - symbol_manager.clone(), - profile_filename.clone(), - path_prefix.clone(), - ) - })) - } - }); - let server = builder.serve(new_service); + let server = tokio::task::spawn(run_server( + listener, + symbol_manager, + profile_filename.map(PathBuf::from), + template_values, + path_prefix, + )); eprintln!("Local server listening at {server_origin}"); if !open_in_browser { @@ -223,12 +214,12 @@ fn generate_token() -> String { nix_base32::to_nix_base32(&bytes) } -fn make_builder_at_port(port_selection: PortSelection) -> (Builder, SocketAddr) { +async fn make_listener(port_selection: PortSelection) -> (TcpListener, SocketAddr) { match port_selection { PortSelection::OnePort(port) => { let addr = SocketAddr::from(([127, 0, 0, 1], port)); - match Server::try_bind(&addr) { - Ok(builder) => (builder, addr), + match TcpListener::bind(&addr).await { + Ok(listener) => (listener, addr), Err(e) => { eprintln!("Could not bind to port {port}: {e}"); std::process::exit(1) @@ -239,8 +230,8 @@ fn make_builder_at_port(port_selection: PortSelection) -> (Builder let mut error = None; for port in range.clone() { let addr = SocketAddr::from(([127, 0, 0, 1], port)); - match Server::try_bind(&addr) { - Ok(builder) => return (builder, addr), + match TcpListener::bind(&addr).await { + Ok(listener) => return (listener, addr), Err(e) => { error.get_or_insert(e); } @@ -289,41 +280,83 @@ const TEMPLATE_WITHOUT_PROFILE: &str = r#" "#; +async fn run_server( + listener: TcpListener, + symbol_manager: Arc, + profile_filename: Option, + template_values: Arc>, + path_prefix: String, +) -> Result<(), Box> { + // We start a loop to continuously accept incoming connections + loop { + let (stream, _) = listener.accept().await?; + + // Use an adapter to access something implementing `tokio::io` traits as if they implement + // `hyper::rt` IO traits. + let io = TokioIo::new(stream); + + let symbol_manager = symbol_manager.clone(); + let profile_filename = profile_filename.clone(); + let template_values = template_values.clone(); + let path_prefix = path_prefix.clone(); + + // Spawn a tokio task to serve multiple connections concurrently + tokio::task::spawn(async move { + // Finally, we bind the incoming connection to our service + if let Err(err) = http1::Builder::new() + // `service_fn` converts our function in a `Service` + .serve_connection( + io, + service_fn(move |req| { + symbolication_service( + req, + template_values.clone(), + symbol_manager.clone(), + profile_filename.clone(), + path_prefix.clone(), + ) + }), + ) + .await + { + println!("Error serving connection: {:?}", err); + } + }); + } +} + async fn symbolication_service( - req: Request, + req: Request, template_values: Arc>, symbol_manager: Arc, profile_filename: Option, path_prefix: String, -) -> Result, hyper::Error> { +) -> Result>>, hyper::Error> { let has_profile = profile_filename.is_some(); let method = req.method(); let path = req.uri().path(); - let mut response = Response::new(Body::empty()); - - let path_without_prefix = match path.strip_prefix(&path_prefix) { - None => { - // The secret prefix was not part of the URL. Do not send CORS headers. - match (method, path) { - (&Method::GET, "/") => { - response.headers_mut().insert( - header::CONTENT_TYPE, - header::HeaderValue::from_static("text/html"), - ); - let template = match has_profile { - true => TEMPLATE_WITH_PROFILE, - false => TEMPLATE_WITHOUT_PROFILE, - }; - *response.body_mut() = - Body::from(substitute_template(template, &template_values)); - } - _ => { - *response.status_mut() = StatusCode::NOT_FOUND; - } + let mut response = Response::new(Either::Left(String::new())); + + let Some(path_without_prefix) = path.strip_prefix(&path_prefix) else { + // The secret prefix was not part of the URL. Do not send CORS headers. + match (method, path) { + (&Method::GET, "/") => { + response.headers_mut().insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static("text/html"), + ); + let template = match has_profile { + true => TEMPLATE_WITH_PROFILE, + false => TEMPLATE_WITHOUT_PROFILE, + }; + *response.body_mut() = + Either::Left(substitute_template(template, &template_values)); + } + _ => { + *response.status_mut() = StatusCode::NOT_FOUND; } - return Ok(response); } - Some(path_without_prefix) => path_without_prefix, + return Ok(response); }; // If we get here, then the secret prefix was part of the URL. @@ -379,32 +412,18 @@ async fn symbolication_service( header::CONTENT_TYPE, header::HeaderValue::from_static("application/json; charset=UTF-8"), ); - let (mut sender, body) = Body::channel(); - *response.body_mut() = body; - - // Stream the file out to the response body, asynchronously, after this function has returned. - tokio::spawn(async move { - let mut file = tokio::fs::File::open(&profile_filename) - .await - .expect("couldn't open profile file"); - let mut contents = vec![0; 1024 * 1024]; - loop { - let data_len = file - .read(&mut contents) - .await - .expect("couldn't read profile file"); - if data_len == 0 { - break; - } - let send_result = sender - .send_data(Bytes::copy_from_slice(&contents[..data_len])) - .await; - if send_result.is_err() { - // The other side may have closed the channel. Stop sending. - break; - } - } - }); + + // Stream the file. This follows the send_file example from the hyper repo. + // https://github.com/hyperium/hyper/blob/7206fe30302937075c51c16a69d1eb3bbce6a671/examples/send_file.rs + let file = tokio::fs::File::open(&profile_filename) + .await + .expect("couldn't open profile file"); + + // Wrap in a tokio_util::io::ReaderStream + let reader_stream = ReaderStream::new(file); + + let stream_body = StreamBody::new(reader_stream.map_ok(Frame::data)); + *response.body_mut() = Either::Right(stream_body.boxed()); } (&Method::POST, path, _) => { response.headers_mut().insert( @@ -412,12 +431,14 @@ async fn symbolication_service( header::HeaderValue::from_static("application/json"), ); let path = path.to_string(); - // Await the full body to be concatenated into a single `Bytes`... - let full_body = hyper::body::to_bytes(req.into_body()).await?; - let full_body = String::from_utf8(full_body.to_vec()).expect("invalid utf-8"); + // Await the full body to be concatenated into a `Collected`. + let full_body = req.into_body().collect().await?; + // Convert the `Collected` into a `String`. + let full_body = + String::from_utf8(full_body.to_bytes().to_vec()).expect("invalid utf-8"); let response_json = symbol_manager.query_json_api(&path, &full_body).await; - *response.body_mut() = response_json.into(); + *response.body_mut() = Either::Left(response_json); } _ => { *response.status_mut() = StatusCode::NOT_FOUND;