-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
ambiguous ChannelClosed
errors due to a race in hyper-util
#2649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Here's another set of repro. steps which shows that the error We have here 3 blocking requests, then 3 async ones, then 3 blocking ones again. Only the 3 async ones fail and it matter with what error they fail. Output when not patched: (click me to expand)
You saw those 3 Output when patched: (click me to expand)
You saw those 3 The code used for the above is this:
[package]
name = "requester_tokio"
version = "0.1.0"
edition = "2024"
[dependencies]
reqwest={ version="0.12.15", features=["blocking"] }
tokio={ version="1.44.2", features=["full"] }
http = "1.1"
tracing = "0.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
[profile.dev]
debug=1
debug-assertions = true
overflow-checks = true
codegen-units = 256
opt-level=0
[patch.crates-io]
hyper-util = { path = "/home/user/SOURCE/my/channelclosed.reqwest/hyper-util" }
use reqwest;
use tokio;
use std::error::Error;
use std::io;
//use reqwest::Client;
//use tokio::process::Command;
//use std::process::Output;
//use tokio::io::AsyncWriteExt;
use std::io::Write;
use std::time::Duration;
use tokio::task::JoinHandle;
fn flush() { //-> Result<(), Box<dyn std::error::Error>> {
std::io::stderr().flush().unwrap();
std::io::stdout().flush().unwrap();
// tokio::io::stderr().flush().await?;
// tokio::io::stdout().flush().await?;
// Ok(())
}
async fn run_blocking() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Run blocking reqwest calls in a separate thread
let blocking_result = tokio::task::spawn_blocking(move || {
// Create a blocking Reqwest client
let bclient = reqwest::blocking::Client::builder()
.tcp_keepalive(Duration::from_secs(30))
.connection_verbose(true)
.timeout(Duration::from_millis(500)) // Quick timeout to fail fast
.build()?;
std::thread::sleep(Duration::from_millis(100));
// Retry the blocking request with delays
let max_attempts = 3;
let mut attempts = 0;
let final_response = loop {
attempts += 1;
eprintln!("---------- req: {} ", attempts);
match bclient.get("http://localhost:8080/").send() {
Ok(response) => match response.text() {
Ok(text) => {
eprintln!("Got '{}'", text);
if attempts>=max_attempts {
break text;
}
},
Err(e) => eprintln!("!!!!!!!!!! Failed to read response text: {}", e),
},
Err(e) => {
eprintln!("!!!!!!! Request attempt {}/{} failed: {}", attempts, max_attempts, e);
if attempts >= max_attempts {
eprintln!("Giving up after {} attempts", max_attempts);
return Err(Box::new(io::Error::new(
io::ErrorKind::Other,
format!("!!! Failed to connect after {} attempts: {}", max_attempts, e),
)) as Box<dyn std::error::Error + Send + Sync>);
}
}
}
std::thread::sleep(Duration::from_millis(1000));
};
Ok(final_response)
});
// Explicitly handle the spawn_blocking result
let blocking_result = match blocking_result.await {
Ok(Ok(result)) => result,
Ok(Err(e)) => return Err(e),
Err(e) => return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>),
};
println!("========== Final (blocking) request, response: {}", blocking_result);
Ok(())
}
#[tokio::main]
//async fn main() {
//async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
//env_logger::init();
use tracing_subscriber;
tracing_subscriber::fmt()
//.with_env_filter("hyper=trace,reqwest=trace,h2=trace")
.with_env_filter("trace")
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
// // Print the current working directory
// match std::env::current_dir() {
// Ok(cwd) => eprintln!("Current working directory: {}", cwd.display()),
// Err(e) => eprintln!("Failed to get current working directory: {}", e),
// }
let handle: JoinHandle<Result<_, _>> = tokio::spawn(async {
//tokio::spawn(
eprintln!("spawned");
let cwd = std::env::current_dir()?;
// script is in ../server.sh not in current dir, assuming cwd is project dir!
let script_path = cwd.join("..").join("server.sh");
// let script_path = cwd.join("..").join("server.py");
let output = tokio::process::Command::new(script_path)
.kill_on_drop(true)
.output().await;
eprintln!("returned");
if let Err(ref e) = output {
eprintln!("Server error: {}", e);
}
flush();
output
// ,);
});
// Allow some time for the server to start
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
eprintln!("main, after waited");
flush();
run_blocking().await?;
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
// Create a Reqwest client using ClientBuilder
let client = reqwest::Client::builder()
.tcp_keepalive(std::time::Duration::from_secs(30))
//.pool_max_idle_per_host(0) //no effect, https://github.com/getsentry/uptime-checker/pull/204
.pool_max_idle_per_host(1000)
.connection_verbose(true)
.http1_only()
.timeout(Duration::from_millis(500))
// .no_keep_alive() //not a thing
.build()?;
// // Replace the original reqwest::get with client.get
// //XXX: good:
// let final_response = client
// .get("http://localhost:8080/")
// .send()
// .await?
// .text()
// .await?;
// Retry the final request with delays
let max_attempts = 3;
let mut attempts = 0;
let final_response = loop {
attempts += 1;
println!("---------- async req {}", attempts);
match client.get("http://localhost:8080/").send().await {
Ok(response) => {
match response.text().await {
Ok(text) => {
eprintln!("+++++++++++++ Got '{}'", text);
if attempts>=max_attempts {
break text;
}
},
// Ok(text) => {
// break text
// }, // Success, exit loop with the response text
Err(e) => eprintln!("!!!!!!!!!!!!! Failed to read response text: {}", e),
}
}
Err(e) => {
eprintln!("!!!!!!!!!! Request attempt {}/{} failed: {}, {:#?}", attempts, max_attempts, e, e.source());
if attempts >= max_attempts {
eprintln!("Giving up after {} attempts", max_attempts);
break "<failed>".to_string();//Err(format!("Failed to connect after {} attempts: {}", max_attempts, e).into());
}
}
}
// Wait before retrying
tokio::time::sleep(Duration::from_millis(1000)).await;
};
println!("Final (async) request, respone: {}", final_response);
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
run_blocking().await?;
//reqwest::get("http://localhost:8080/").await.unwrap();
// Check the result of the spawned task
match handle.await? {
Ok(output) => {
if !output.status.success() {
eprintln!("Server failed with status: {}", output.status);
if !output.stderr.is_empty() {
eprintln!("Server stderr: {}", String::from_utf8_lossy(&output.stderr));
}
}
}
Err(e) => eprintln!("Server error: {}", e),
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
Ok(())
}
#!/bin/sh
hello() {
echo -e "HTTP/1.1 200 OK\r\nConnection: Close\r\nContent-Length: 12\r\nContent-Type: text/plain\r\n\r\nHello World!"
}
let 'count=0'
while true; do
#hello | busybox nc -l -p 8080
echo "started $0"
#XXX: this sends the reply before it receives the GET request!
hello | nc -l -p 8080
echo "done"
let 'count=count+1'
sleep 0.2
if test "$count" -ge "9"; then
break
fi
done this is inspired from OP of #1808 run like: export RUST_LOG=trace
cargo run (you don't have to run
#!/usr/bin/python3
import socket
import time
HOST = "127.0.0.1"
PORT = 8080
# Set to False to send response on connect(mimimcs netcat); True to require valid GET request
# XXX: set to True to get rid of the issue in OP aka https://github.com/seanmonstar/reqwest/issues/1808
# True makes it align with HTTP/1.1 and makes the async requests (in the rust client) succeed.
# run client with: $ cargo run --example client --features=full -- http://127.0.0.1:8080
# ^ in hyper's repo! Repeatedly run it to see different (racey) errors (add a delay below if hitting same error)
REQUIRE_GET = False
#REQUIRE_GET = True
# Delay in milliseconds before closing connection after response; 0 for immediate close
#DELAY_CLOSE_MS = 250
DELAY_CLOSE_MS = 0
#with delay 250, the rust program can TRACE hyper::proto::h1::conn: shut down IO complete
#with delay 0 tho, it yields:
#2025-04-18T19:29:21.872159Z DEBUG hyper::proto::h1::conn: error shutting down IO: Transport endpoint is not connected (os error 107)
#!!! recv_msg: no callback or queued request, propagating error err=hyper::Error(Shutdown, Os { code: 107, kind: NotConnected, message: "Transport endpoint is not connected" })
#2025-04-18T19:29:21.872204Z DEBUG hyper_util::client::legacy::client: client connection error: hyper::Error(Shutdown, Os { code: 107, kind: NotConnected, message: "Transport endpoint is not connected" })
#DELAY_SEND_MS = 600
DELAY_SEND_MS = 0
RESPONSE = (
b"HTTP/1.1 200 OK\r\n"
b"Connection: Close\r\n"
b"Content-Length: 12\r\n"
b"Content-Type: text/plain\r\n"
b"\r\n"
b"Hello World!\n"
)
def is_valid_get_request(data):
"""Check if the received data is a valid HTTP GET request for /."""
#print(f"Raw request: {data!r}") # Log raw bytes
try:
# Decode first line of request
first_line = data.decode("utf-8").split("\r\n")[0]
#print(f"First line: {first_line!r}")
method, path, version = first_line.split(" ")
#print(f"Parsed: method={method!r}, path={path!r}, version={version!r}")
return (
method == "GET"
and path == "/"
and version.startswith("HTTP/1.")
)
except (UnicodeDecodeError, ValueError, IndexError):
print(f"Validation failed: {e}")
return False
def main():
# Create TCP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST, PORT))
server_socket.listen(1)
mode = "require GET request" if REQUIRE_GET else "send response on connect"
print(f"Server started on {HOST}:{PORT} (mode: {mode})")
while True:
try:
# Accept client connection
client_socket, addr = server_socket.accept()
print(f"Connection from {addr}")
if REQUIRE_GET:
# Receive request (up to 1024 bytes)
data = client_socket.recv(1024)
# Validate GET request
if is_valid_get_request(data):
print("Received valid GET request, sending response")
client_socket.sendall(RESPONSE)
else:
print("Received invalid request, closing connection")
bad_request = (
b"HTTP/1.1 400 Bad Request\r\n"
b"Connection: Close\r\n"
b"Content-Length: 0\r\n"
b"\r\n"
)
client_socket.sendall(bad_request)
else:
# Send response immediately on connect ?
if DELAY_SEND_MS > 0:
print(f"Delaying send by {DELAY_SEND_MS}ms")
time.sleep(DELAY_SEND_MS / 1000.0) # Convert ms to seconds
print("Sending response on connect")
client_socket.sendall(RESPONSE)
# Close connection immediately or delay first:
if DELAY_CLOSE_MS > 0:
print(f"Delaying connection close by {DELAY_CLOSE_MS}ms")
time.sleep(DELAY_CLOSE_MS / 1000.0) # Convert ms to seconds
client_socket.close()
print("Connection closed")
# Small delay to ensure client processes response
time.sleep(0.2)
except KeyboardInterrupt:
print("\nShutting down server")
break
except Exception as e:
print(f"Error: {e}")
continue
server_socket.close()
if __name__ == "__main__":
main() EDIT: Actually, you might get less tracing output unless you do additionally: export RUSTFLAGS='--cfg hyper_unstable_tracing' before [patch.crates-io]
reqwest = { path = "/home/user/SOURCE/my/channelclosed.reqwest/reqwest" }
hyper = { path = "/home/user/SOURCE/my/channelclosed.reqwest/hyper" } in addition to the
which is very telling. In those paths you just |
Fix a race condition in the legacy HTTP client's connection setup where connection errors (e.g., TLS failures, unexpected server responses) were discarded, resulting in vague ChannelClosed errors. seanmonstar/reqwest#2649
Fix a race condition in the legacy HTTP client's connection setup where connection errors (e.g., TLS failures, unexpected server responses) were discarded, resulting in vague ChannelClosed errors. seanmonstar/reqwest#2649
Fix a race condition in the legacy HTTP client's connection setup where connection errors (e.g., TLS failures, unexpected server responses) were discarded, resulting in vague ChannelClosed errors. seanmonstar/reqwest#2649
Fix a race condition in the legacy HTTP client's connection setup where connection errors (e.g., TLS failures, unexpected server responses) were discarded, resulting in vague ChannelClosed errors. seanmonstar/reqwest#2649
Fix a race condition in the legacy HTTP client's connection setup where connection errors (e.g., TLS failures, unexpected server responses) were discarded, resulting in vague ChannelClosed errors. seanmonstar/reqwest#2649
Fix a race condition in the legacy HTTP client's connection setup where connection errors (e.g., TLS failures, unexpected server responses) were discarded, resulting in vague ChannelClosed errors. seanmonstar/reqwest#2649
Fix a race condition in the legacy HTTP client's connection setup where connection errors (e.g., TLS failures, unexpected server responses) were discarded, resulting in vague ChannelClosed errors. seanmonstar/reqwest#2649
hyper-util v0.1.13 has been released with the fix for this, thanks! |
Uh oh!
There was an error while loading. Please reload this page.
When using reqwest (v0.12) with hyper-util’s legacy HTTP client (backed by hyper v1.6.0), connection errors during HTTP/1.1 connection setup (e.g., TLS handshake failures, unexpected server responses) are sometimes masked as vague
hyper::Error(ChannelClosed)
errors. This occurs due to a race condition in hyper-util’s connection handling, where errors from the background connection task are discarded if the connection channel closes before readiness is confirmed. This makes debugging challenging, especially in scenarios like mTLS setups or servers sending unsolicited responses.code to reproduce the issue (click me to expand)
Generate certs:
openssl req -x509 -newkey rsa:2048 -nodes -days 365 -keyout server.key -out server.crt -subj "/CN=localhost"
now you have
server.crt
andserver.key
in current dir.Run this
mtls_server.py
python server which will serve errors(so to speak):Make a rust project (
cargo new mtls_test && cd mtls_test
)this will be a TLS client that doesn't give client certs,
and replace the two files:
Cargo.toml
src/main.rs
:outputs (click me to expand):
cargo run
:Those
Ok
are a placeholder for the correct error which is this:reqwest::Error { kind: Request, url: "https://localhost:8443/", source: hyper_util::client::legacy::Error(SendRequest, hyper::Error(Io, Custom { kind: Other, error: Error { code: ErrorCode(1), cause: Some(Ssl(ErrorStack([Error { code: 167773276, library: "SSL routines", function: "ssl3_read_bytes", reason: "tlsv13 alert certificate required", file: "ssl/record/rec_layer_s3.c", line: 911, data: "SSL alert number 116" }]))) } })) }
instead of the wrong error which is:
reqwest::Error { kind: Request, url: "https://localhost:8443/", source: hyper_util::client::legacy::Error(SendRequest, hyper::Error(ChannelClosed)) }
re #1808 (comment)
PR is here hyperium/hyper-util#184
Thanks to Grok 31 (created by xAI) for helping me debug the ChannelClosed issue, develop this patch, and test it with reqwest and custom clients.
Footnotes
It literally would've been impossible without an AI/Grok3 because I would've given up long ago, heck I didn't even know what futures are and all that polling and async (not that I know now, but it's better than nothing). ↩
The text was updated successfully, but these errors were encountered: