From 23669519a00ff34d7fd2d17f3caf5d843873c51d Mon Sep 17 00:00:00 2001 From: Eloi DEMOLIS Date: Fri, 29 Sep 2023 15:18:25 +0200 Subject: [PATCH] Introduce timeouts in Mux State Signed-off-by: Eloi DEMOLIS --- lib/src/http.rs | 191 ++++++++++++++++++++---------------- lib/src/https.rs | 160 ++++++++++++++++++++++-------- lib/src/protocol/mux/h1.rs | 4 +- lib/src/protocol/mux/h2.rs | 2 + lib/src/protocol/mux/mod.rs | 135 +++++++++++++++++++++++-- 5 files changed, 356 insertions(+), 136 deletions(-) diff --git a/lib/src/http.rs b/lib/src/http.rs index 73ff18bb4..0c53de322 100644 --- a/lib/src/http.rs +++ b/lib/src/http.rs @@ -41,13 +41,12 @@ use crate::{ }, mux::{self, Mux}, proxy_protocol::expect::ExpectProxyProtocol, - Http, Pipe, SessionState, + Pipe, SessionState, }, router::{Route, Router}, server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager}, socket::server_bind, timer::TimeoutContainer, - util::UnwrapLog, AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder, StateResult, @@ -127,15 +126,21 @@ impl HttpSession { gauge_add!("protocol.http", 1); let session_address = sock.peer_addr().ok(); + let frontend = mux::Connection::new_h1_server(sock, container_frontend_timeout); + let router = mux::Router::new( + configured_backend_timeout, + configured_connect_timeout, + listener.clone(), + ); let mut context = mux::Context::new(pool.clone()); context .create_stream(request_id, 1 << 16) .ok_or(AcceptError::BufferCapacityReached)?; - let frontend = mux::Connection::new_h1_server(sock); HttpStateMachine::Mux(Mux { + configured_frontend_timeout, frontend_token: token, frontend, - router: mux::Router::new(listener.clone()), + router, public_address, peer_address: session_address, sticky_name: sticky_name.clone(), @@ -208,115 +213,131 @@ impl HttpSession { .map(|add| (add.destination(), add.source())) { Some((Some(public_address), Some(session_address))) => { - let mut http = Http::new( - self.answers.clone(), + let frontend = mux::Connection::new_h1_server( + expect.frontend, + expect.container_frontend_timeout, + ); + let router = mux::Router::new( self.configured_backend_timeout, self.configured_connect_timeout, - self.configured_frontend_timeout, - expect.container_frontend_timeout, - expect.frontend, - expect.frontend_token, self.listener.clone(), - self.pool.clone(), - Protocol::HTTP, + ); + let mut context = mux::Context::new(self.pool.clone()); + context.create_stream(expect.request_id, 1 << 16)?; + let mut mux = Mux { + configured_frontend_timeout: self.configured_frontend_timeout, + frontend_token: self.frontend_token, + frontend, + router, public_address, - expect.request_id, - Some(session_address), - self.sticky_name.clone(), - ) - .ok()?; - http.frontend_readiness.event = expect.frontend_readiness.event; + peer_address: Some(session_address), + sticky_name: self.sticky_name.clone(), + context, + }; + mux.frontend.readiness_mut().event = expect.frontend_readiness.event; gauge_add!("protocol.proxy.expect", -1); gauge_add!("protocol.http", 1); - unimplemented!(); - // Some(HttpStateMachine::Http(http)) + Some(HttpStateMachine::Mux(mux)) } _ => None, } } - fn upgrade_http(&mut self, http: Http) -> Option { - debug!("http switching to ws"); - let front_token = self.frontend_token; - let back_token = unwrap_msg!(http.backend_token); - let ws_context = http.context.websocket_context(); - - let mut container_frontend_timeout = http.container_frontend_timeout; - let mut container_backend_timeout = http.container_backend_timeout; - container_frontend_timeout.reset(); - container_backend_timeout.reset(); - - let mut pipe = Pipe::new( - http.response_stream.storage.buffer, - http.backend_id, - http.backend_socket, - http.backend, - Some(container_backend_timeout), - Some(container_frontend_timeout), - http.cluster_id, - http.request_stream.storage.buffer, - front_token, - http.frontend_socket, - self.listener.clone(), - Protocol::HTTP, - http.context.id, - http.context.session_address, - Some(ws_context), - ); - - pipe.frontend_readiness.event = http.frontend_readiness.event; - pipe.backend_readiness.event = http.backend_readiness.event; - pipe.set_back_token(back_token); - - gauge_add!("protocol.http", -1); - gauge_add!("protocol.ws", 1); - gauge_add!("http.active_requests", -1); - gauge_add!("websocket.active_requests", 1); - Some(HttpStateMachine::WebSocket(pipe)) - } + // fn upgrade_http(&mut self, http: Http) -> Option { + // debug!("http switching to ws"); + // let front_token = self.frontend_token; + // let back_token = unwrap_msg!(http.backend_token); + // let ws_context = http.context.websocket_context(); + + // let mut container_frontend_timeout = http.container_frontend_timeout; + // let mut container_backend_timeout = http.container_backend_timeout; + // container_frontend_timeout.reset(); + // container_backend_timeout.reset(); + + // let mut pipe = Pipe::new( + // http.response_stream.storage.buffer, + // http.backend_id, + // http.backend_socket, + // http.backend, + // Some(container_backend_timeout), + // Some(container_frontend_timeout), + // http.cluster_id, + // http.request_stream.storage.buffer, + // front_token, + // http.frontend_socket, + // self.listener.clone(), + // Protocol::HTTP, + // http.context.id, + // http.context.session_address, + // Some(ws_context), + // ); + + // pipe.frontend_readiness.event = http.frontend_readiness.event; + // pipe.backend_readiness.event = http.backend_readiness.event; + // pipe.set_back_token(back_token); + + // gauge_add!("protocol.http", -1); + // gauge_add!("protocol.ws", 1); + // gauge_add!("http.active_requests", -1); + // gauge_add!("websocket.active_requests", 1); + // Some(HttpStateMachine::WebSocket(pipe)) + // } fn upgrade_mux(&mut self, mut mux: Mux) -> Option { debug!("mux switching to ws"); let stream = mux.context.streams.pop().unwrap(); - let (frontend_readiness, frontend_socket) = match mux.frontend { - mux::Connection::H1(mux::ConnectionH1 { - readiness, socket, .. - }) => (readiness, socket), - // only h1<->h1 connections can upgrade to websocket - mux::Connection::H2(_) => unreachable!(), - }; + let (frontend_readiness, frontend_socket, mut container_frontend_timeout) = + match mux.frontend { + mux::Connection::H1(mux::ConnectionH1 { + readiness, + socket, + timeout_container, + .. + }) => (readiness, socket, timeout_container), + mux::Connection::H2(_) => { + error!("Only h1<->h1 connections can upgrade to websocket"); + return None; + } + }; - let mux::StreamState::Linked(back_token) = stream.state else { unreachable!() }; - let backend = mux.router.backends.remove(&back_token).unwrap(); - let (cluster_id, backend_readiness, backend_socket) = match backend { - mux::Connection::H1(mux::ConnectionH1 { - position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)), - readiness, - socket, - .. - }) => (cluster_id, readiness, socket), - // the backend disconnected just after upgrade, abort - mux::Connection::H1(_) => return None, - // only h1<->h1 connections can upgrade to websocket - mux::Connection::H2(_) => unreachable!(), + let mux::StreamState::Linked(back_token) = stream.state else { + error!("Upgrading stream should be linked to a backend"); + return None; }; + let backend = mux.router.backends.remove(&back_token).unwrap(); + let (cluster_id, backend_readiness, backend_socket, mut container_backend_timeout) = + match backend { + mux::Connection::H1(mux::ConnectionH1 { + position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)), + readiness, + socket, + timeout_container, + .. + }) => (cluster_id, readiness, socket, timeout_container), + mux::Connection::H1(_) => { + error!("The backend disconnected just after upgrade, abort"); + return None; + } + mux::Connection::H2(_) => { + error!("Only h1<->h1 connections can upgrade to websocket"); + return None; + } + }; let ws_context = stream.context.websocket_context(); - // let mut container_frontend_timeout = http.container_frontend_timeout; - // let mut container_backend_timeout = http.container_backend_timeout; - // container_frontend_timeout.reset(); - // container_backend_timeout.reset(); + container_frontend_timeout.reset(); + container_backend_timeout.reset(); let mut pipe = Pipe::new( stream.back.storage.buffer, None, Some(backend_socket), None, - None, - None, + Some(container_backend_timeout), + Some(container_frontend_timeout), Some(cluster_id), stream.front.storage.buffer, self.frontend_token, diff --git a/lib/src/https.rs b/lib/src/https.rs index 2877a7112..904304a23 100644 --- a/lib/src/https.rs +++ b/lib/src/https.rs @@ -49,7 +49,6 @@ use crate::{ backends::BackendMap, pool::Pool, protocol::{ - h2::Http2, http::{ answers::HttpAnswers, parser::{hostname_and_port, Method}, @@ -57,7 +56,7 @@ use crate::{ mux::{self, Mux}, proxy_protocol::expect::ExpectProxyProtocol, rustls::TlsHandshake, - Http, Pipe, SessionState, + Pipe, SessionState, }, router::{Route, Router}, server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager, SessionToken}, @@ -88,9 +87,9 @@ StateMachineBuilder! { Expect(ExpectProxyProtocol, ServerConnection), Handshake(TlsHandshake), Mux(Mux), - Http(Http), + // Http(Http), WebSocket(Pipe), - Http2(Http2) -> todo!("H2"), + // Http2(Http2) -> todo!("H2"), } } @@ -184,9 +183,9 @@ impl HttpsSession { let new_state = match self.state.take() { HttpsStateMachine::Expect(expect, ssl) => self.upgrade_expect(expect, ssl), HttpsStateMachine::Handshake(handshake) => self.upgrade_handshake(handshake), - HttpsStateMachine::Http(http) => self.upgrade_http(http), - HttpsStateMachine::Mux(_) => unimplemented!(), - HttpsStateMachine::Http2(_) => self.upgrade_http2(), + // HttpsStateMachine::Http(http) => self.upgrade_http(http), + HttpsStateMachine::Mux(mux) => self.upgrade_mux(mux), + // HttpsStateMachine::Http2(_) => self.upgrade_http2(), HttpsStateMachine::WebSocket(wss) => self.upgrade_websocket(wss), HttpsStateMachine::FailedUpgrade(_) => unreachable!(), }; @@ -279,57 +278,143 @@ impl HttpsSession { gauge_add!("protocol.tls.handshake", -1); + let router = mux::Router::new( + self.configured_backend_timeout, + self.configured_connect_timeout, + self.listener.clone(), + ); let mut context = mux::Context::new(self.pool.clone()); let mut frontend = match alpn { AlpnProtocol::Http11 => { context.create_stream(handshake.request_id, 1 << 16)?; - mux::Connection::new_h1_server(front_stream) + mux::Connection::new_h1_server(front_stream, handshake.container_frontend_timeout) } - AlpnProtocol::H2 => mux::Connection::new_h2_server(front_stream, self.pool.clone())?, + AlpnProtocol::H2 => mux::Connection::new_h2_server( + front_stream, + self.pool.clone(), + handshake.container_frontend_timeout, + )?, }; frontend.readiness_mut().event = handshake.frontend_readiness.event; Some(HttpsStateMachine::Mux(Mux { + configured_frontend_timeout: self.configured_frontend_timeout, frontend_token: self.frontend_token, frontend, context, - router: mux::Router::new(self.listener.clone()), + router, public_address: self.public_address, peer_address: self.peer_address, sticky_name: self.sticky_name.clone(), })) } - fn upgrade_http(&self, http: Http) -> Option { - debug!("https switching to wss"); - let front_token = self.frontend_token; - let back_token = unwrap_msg!(http.backend_token); - let ws_context = http.context.websocket_context(); + // fn upgrade_http(&self, http: Http) -> Option { + // debug!("https switching to wss"); + // let front_token = self.frontend_token; + // let back_token = unwrap_msg!(http.backend_token); + // let ws_context = http.context.websocket_context(); + + // let mut container_frontend_timeout = http.container_frontend_timeout; + // let mut container_backend_timeout = http.container_backend_timeout; + // container_frontend_timeout.reset(); + // container_backend_timeout.reset(); + + // let mut pipe = Pipe::new( + // http.response_stream.storage.buffer, + // http.backend_id, + // http.backend_socket, + // http.backend, + // Some(container_backend_timeout), + // Some(container_frontend_timeout), + // http.cluster_id, + // http.request_stream.storage.buffer, + // front_token, + // http.frontend_socket, + // self.listener.clone(), + // Protocol::HTTP, + // http.context.id, + // http.context.session_address, + // Some(ws_context), + // ); + + // pipe.frontend_readiness.event = http.frontend_readiness.event; + // pipe.backend_readiness.event = http.backend_readiness.event; + // pipe.set_back_token(back_token); + + // gauge_add!("protocol.https", -1); + // gauge_add!("protocol.wss", 1); + // gauge_add!("http.active_requests", -1); + // gauge_add!("websocket.active_requests", 1); + // Some(HttpsStateMachine::WebSocket(pipe)) + // } + + fn upgrade_mux(&self, mut mux: Mux) -> Option { + debug!("mux switching to wss"); + let stream = mux.context.streams.pop().unwrap(); + + let (frontend_readiness, frontend_socket, mut container_frontend_timeout) = + match mux.frontend { + mux::Connection::H1(mux::ConnectionH1 { + readiness, + socket, + timeout_container, + .. + }) => (readiness, socket, timeout_container), + mux::Connection::H2(_) => { + error!("Only h1<->h1 connections can upgrade to websocket"); + return None; + } + }; + + let mux::StreamState::Linked(back_token) = stream.state else { + error!("Upgrading stream should be linked to a backend"); + return None; + }; + let backend = mux.router.backends.remove(&back_token).unwrap(); + let (cluster_id, backend_readiness, backend_socket, mut container_backend_timeout) = + match backend { + mux::Connection::H1(mux::ConnectionH1 { + position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)), + readiness, + socket, + timeout_container, + .. + }) => (cluster_id, readiness, socket, timeout_container), + mux::Connection::H1(_) => { + error!("The backend disconnected just after upgrade, abort"); + return None; + } + mux::Connection::H2(_) => { + error!("Only h1<->h1 connections can upgrade to websocket"); + return None; + } + }; + + let ws_context = stream.context.websocket_context(); - let mut container_frontend_timeout = http.container_frontend_timeout; - let mut container_backend_timeout = http.container_backend_timeout; container_frontend_timeout.reset(); container_backend_timeout.reset(); let mut pipe = Pipe::new( - http.response_stream.storage.buffer, - http.backend_id, - http.backend_socket, - http.backend, + stream.back.storage.buffer, + None, + Some(backend_socket), + None, Some(container_backend_timeout), Some(container_frontend_timeout), - http.cluster_id, - http.request_stream.storage.buffer, - front_token, - http.frontend_socket, + Some(cluster_id), + stream.front.storage.buffer, + self.frontend_token, + frontend_socket, self.listener.clone(), - Protocol::HTTP, - http.context.id, - http.context.session_address, + Protocol::HTTPS, + stream.context.id, + stream.context.session_address, Some(ws_context), ); - pipe.frontend_readiness.event = http.frontend_readiness.event; - pipe.backend_readiness.event = http.backend_readiness.event; + pipe.frontend_readiness.event = frontend_readiness.event; + pipe.backend_readiness.event = backend_readiness.event; pipe.set_back_token(back_token); gauge_add!("protocol.https", -1); @@ -339,10 +424,6 @@ impl HttpsSession { Some(HttpsStateMachine::WebSocket(pipe)) } - fn upgrade_http2(&self) -> Option { - todo!() - } - fn upgrade_websocket( &self, wss: Pipe, @@ -366,13 +447,12 @@ impl ProxySession for HttpsSession { match self.state.marker() { StateMarker::Expect => gauge_add!("protocol.proxy.expect", -1), StateMarker::Handshake => gauge_add!("protocol.tls.handshake", -1), - StateMarker::Http => gauge_add!("protocol.https", -1), + // StateMarker::Http => gauge_add!("protocol.https", -1), StateMarker::Mux => gauge_add!("protocol.https", -1), StateMarker::WebSocket => { gauge_add!("protocol.wss", -1); gauge_add!("websocket.active_requests", -1); - } - StateMarker::Http2 => gauge_add!("protocol.http2", -1), + } // StateMarker::Http2 => gauge_add!("protocol.http2", -1), } if self.state.failed() { @@ -727,9 +807,9 @@ impl HttpsListener { let protocols = config .alpn .iter() - .filter_map(|protocol| match AlpnProtocol::from_i32(*protocol) { - Some(AlpnProtocol::Http11) => Some("http/1.1"), - Some(AlpnProtocol::H2) => Some("h2"), + .filter_map(|protocol| match AlpnProtocol::try_from(*protocol) { + Ok(AlpnProtocol::Http11) => Some("http/1.1"), + Ok(AlpnProtocol::H2) => Some("h2"), other_protocol => { error!("unsupported ALPN protocol: {:?}", other_protocol); None diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index 33c806e31..753cb3517 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -8,16 +8,18 @@ use crate::{ Position, StreamState, }, socket::SocketHandler, + timer::TimeoutContainer, Readiness, }; pub struct ConnectionH1 { pub position: Position, pub readiness: Readiness, + pub requests: usize, pub socket: Front, /// note: a Server H1 will always reference stream 0, but a client can reference any stream pub stream: GlobalStreamId, - pub requests: usize, + pub timeout_container: TimeoutContainer, } impl std::fmt::Debug for ConnectionH1 { diff --git a/lib/src/protocol/mux/h2.rs b/lib/src/protocol/mux/h2.rs index 51e87acb8..a2977d43c 100644 --- a/lib/src/protocol/mux/h2.rs +++ b/lib/src/protocol/mux/h2.rs @@ -13,6 +13,7 @@ use crate::{ GlobalStreamId, MuxResult, Position, StreamId, StreamState, }, socket::SocketHandler, + timer::TimeoutContainer, Readiness, }; @@ -94,6 +95,7 @@ pub struct ConnectionH2 { pub socket: Front, pub state: H2State, pub streams: HashMap, + pub timeout_container: TimeoutContainer, pub window: u32, pub zero: GenericHttpStream, } diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 53b259133..1ef8946d7 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -9,6 +9,7 @@ use std::{ use mio::{net::TcpStream, Interest, Token}; use rusty_ulid::Ulid; use sozu_command::{proto::command::ListenerType, ready::Ready}; +use time::Duration; mod converter; mod h1; @@ -28,6 +29,7 @@ use crate::{ router::Route, server::CONN_RETRIES, socket::{SocketHandler, SocketResult}, + timer::TimeoutContainer, BackendConnectionError, L7ListenerHandler, L7Proxy, ProxySession, Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult, }; @@ -177,19 +179,27 @@ pub enum Connection { } impl Connection { - pub fn new_h1_server(front_stream: Front) -> Connection { + pub fn new_h1_server( + front_stream: Front, + timeout_container: TimeoutContainer, + ) -> Connection { Connection::H1(ConnectionH1 { - socket: front_stream, position: Position::Server, readiness: Readiness { interest: Ready::READABLE | Ready::HUP | Ready::ERROR, event: Ready::EMPTY, }, - stream: 0, requests: 0, + socket: front_stream, + stream: 0, + timeout_container, }) } - pub fn new_h1_client(front_stream: Front, cluster_id: String) -> Connection { + pub fn new_h1_client( + front_stream: Front, + cluster_id: String, + timeout_container: TimeoutContainer, + ) -> Connection { Connection::H1(ConnectionH1 { socket: front_stream, position: Position::Client(BackendStatus::Connecting(cluster_id)), @@ -199,12 +209,14 @@ impl Connection { }, stream: 0, requests: 0, + timeout_container, }) } pub fn new_h2_server( front_stream: Front, pool: Weak>, + timeout_container: TimeoutContainer, ) -> Option> { let buffer = pool .upgrade() @@ -226,6 +238,7 @@ impl Connection { socket: front_stream, state: H2State::ClientPreface, streams: HashMap::new(), + timeout_container, window: 1 << 16, zero: kawa::Kawa::new(kawa::Kind::Request, kawa::Buffer::new(buffer)), })) @@ -234,6 +247,7 @@ impl Connection { front_stream: Front, cluster_id: String, pool: Weak>, + timeout_container: TimeoutContainer, ) -> Option> { let buffer = pool .upgrade() @@ -255,6 +269,7 @@ impl Connection { socket: front_stream, state: H2State::ClientPreface, streams: HashMap::new(), + timeout_container, window: 1 << 16, zero: kawa::Kawa::new(kawa::Kind::Request, kawa::Buffer::new(buffer)), })) @@ -284,6 +299,12 @@ impl Connection { Connection::H2(c) => &mut c.position, } } + pub fn timeout_container(&mut self) -> &mut TimeoutContainer { + match self { + Connection::H1(c) => &mut c.timeout_container, + Connection::H2(c) => &mut c.timeout_container, + } + } pub fn socket(&self) -> &TcpStream { match self { Connection::H1(c) => c.socket.socket_ref(), @@ -594,15 +615,23 @@ impl Context { } pub struct Router { - pub listener: Rc>, pub backends: HashMap>, + pub configured_backend_timeout: Duration, + pub configured_connect_timeout: Duration, + pub listener: Rc>, } impl Router { - pub fn new(listener: Rc>) -> Self { + pub fn new( + configured_backend_timeout: Duration, + configured_connect_timeout: Duration, + listener: Rc>, + ) -> Self { Self { - listener, backends: HashMap::new(), + configured_backend_timeout, + configured_connect_timeout, + listener, } } @@ -715,13 +744,19 @@ impl Router { error!("error registering back socket({:?}): {:?}", socket, e); } + let timeout_container = TimeoutContainer::new(self.configured_connect_timeout, token); let connection = if h2 { - match Connection::new_h2_client(socket, cluster_id, context.pool.clone()) { + match Connection::new_h2_client( + socket, + cluster_id, + context.pool.clone(), + timeout_container, + ) { Some(connection) => connection, None => return Err(BackendConnectionError::MaxBuffers), } } else { - Connection::new_h1_client(socket, cluster_id) + Connection::new_h1_client(socket, cluster_id, timeout_container) }; self.backends.insert(token, connection); token @@ -842,6 +877,7 @@ impl Router { } pub struct Mux { + pub configured_frontend_timeout: Duration, pub frontend_token: Token, pub frontend: Connection, pub router: Router, @@ -1033,11 +1069,90 @@ impl SessionState for Mux { fn timeout(&mut self, token: Token, _metrics: &mut SessionMetrics) -> StateResult { println_!("MuxState::timeout({token:?})"); - StateResult::CloseSession + let front_is_h2 = match self.frontend { + Connection::H1(_) => false, + Connection::H2(_) => true, + }; + let mut is_to_be_closed = true; + if self.frontend_token == token { + self.frontend.timeout_container().triggered(); + let front_readiness = self.frontend.readiness_mut(); + for stream in &mut self.context.streams { + match stream.state { + StreamState::Idle => { + // In h1 an Idle stream is always the first request, so we can send a 408 + // In h2 an Idle stream doesn't necessarily hold a request yet, + // in most cases it was just reserved, so we can just ignore them. + if !front_is_h2 { + set_default_answer(stream, front_readiness, 408); + is_to_be_closed = false; + } + } + StreamState::Link => { + // This is an unusual case, as we have both a complete request and no + // available backend yet. For now, we answer with 503 + set_default_answer(stream, front_readiness, 503); + is_to_be_closed = false; + } + StreamState::Linked(_) => { + // A stream Linked to a backend is waiting for the response, not the answer. + // For streaming or malformed requests, it is possible that the request is not + // terminated at this point. For now, we do nothing and + is_to_be_closed = false; + } + StreamState::Unlinked => { + // A stream Unlinked already has a response and its backend closed. + // In case it hasn't finished proxying we wait. Otherwise it is a stream + // kept alive for a new request, which can be killed. + if !stream.back.is_completed() { + is_to_be_closed = false; + } + } + StreamState::Recycle => { + // A recycled stream is an h2 stream which doesn't hold a request anymore. + // We can ignore it. + } + } + } + } else if let Some(backend) = self.router.backends.get_mut(&token) { + backend.timeout_container().triggered(); + let front_readiness = self.frontend.readiness_mut(); + for stream_id in 0..self.context.streams.len() { + let stream = &mut self.context.streams[stream_id]; + if let StreamState::Linked(back_token) = stream.state { + if token == back_token { + // This stream is linked to the backend that timedout. + if stream.back.is_terminated() || stream.back.is_error() { + // Nothing to do, simply wait for the remaining bytes to be proxied + if !stream.back.is_completed() { + is_to_be_closed = false; + } + } else if stream.back.is_initial() { + // The response has not started yet + set_default_answer(stream, front_readiness, 504); + is_to_be_closed = false; + } else { + forcefully_terminate_answer(stream, front_readiness); + is_to_be_closed = false; + } + backend.end_stream(0, &mut self.context); + } + } + } + } + if is_to_be_closed { + StateResult::CloseSession + } else { + StateResult::Continue + } } fn cancel_timeouts(&mut self) { println_!("MuxState::cancel_timeouts"); + self.frontend.timeout_container().cancel(); + for backend in self.router.backends.values_mut() { + backend.timeout_container().cancel(); + } } fn print_state(&self, context: &str) {