diff --git a/e2e/src/mock/async_backend.rs b/e2e/src/mock/async_backend.rs index 56c452b62..b0ff7d25c 100644 --- a/e2e/src/mock/async_backend.rs +++ b/e2e/src/mock/async_backend.rs @@ -35,7 +35,7 @@ impl BackendHandle { let name = name.into(); let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1); let (mut aggregator_tx, aggregator_rx) = mpsc::channel::(1); - let listener = TcpListener::bind(address).expect("could not bind"); + let listener = TcpListener::bind(address).expect(&format!("could not bind on: {address}")); let mut clients = Vec::new(); let thread_name = name.to_owned(); diff --git a/e2e/src/mock/client.rs b/e2e/src/mock/client.rs index 2a1adae60..4b89bcc81 100644 --- a/e2e/src/mock/client.rs +++ b/e2e/src/mock/client.rs @@ -39,7 +39,7 @@ impl Client { /// Establish a TCP connection with its address, /// register the yielded TCP stream, apply timeouts pub fn connect(&mut self) { - let stream = TcpStream::connect(self.address).expect("could not connect"); + let stream = TcpStream::connect(self.address).expect(&format!("could not connect to: {}", self.address)); stream .set_read_timeout(Some(Duration::from_millis(100))) .expect("could not set read timeout"); diff --git a/e2e/src/mock/sync_backend.rs b/e2e/src/mock/sync_backend.rs index 14712bb7f..ecd770149 100644 --- a/e2e/src/mock/sync_backend.rs +++ b/e2e/src/mock/sync_backend.rs @@ -44,7 +44,7 @@ impl Backend { /// Binds itself to its address, stores the yielded TCP listener pub fn connect(&mut self) { - let listener = TcpListener::bind(self.address).expect("could not bind"); + let listener = TcpListener::bind(self.address).expect(&format!("could not bind on: {}", self.address)); let timeout = Duration::from_millis(100); let timeout = libc::timeval { tv_sec: 0, diff --git a/lib/src/protocol/mux/h1.rs b/lib/src/protocol/mux/h1.rs index 189f39b5e..0ed80a03f 100644 --- a/lib/src/protocol/mux/h1.rs +++ b/lib/src/protocol/mux/h1.rs @@ -39,6 +39,7 @@ impl ConnectionH1 { E: Endpoint, { println_!("======= MUX H1 READABLE {:?}", self.position); + self.timeout_container.reset(); let stream = &mut context.streams[self.stream]; let parts = stream.split(&self.position); let kawa = parts.rbuffer; @@ -67,6 +68,7 @@ impl ConnectionH1 { return MuxResult::Continue; } if kawa.is_terminated() { + self.timeout_container.cancel(); self.readiness.interest.remove(Ready::READABLE); } if kawa.is_main_phase() { @@ -95,6 +97,7 @@ impl ConnectionH1 { E: Endpoint, { println_!("======= MUX H1 WRITABLE {:?}", self.position); + self.timeout_container.reset(); let stream = &mut context.streams[self.stream]; let kawa = stream.wbuffer(&self.position); kawa.prepare(&mut kawa::h1::BlockConverter); @@ -121,12 +124,12 @@ impl ConnectionH1 { match kawa.detached.status_line { kawa::StatusLine::Response { code: 101, .. } => { println!("============== HANDLE UPGRADE!"); - // unimplemented!(); return MuxResult::Upgrade; } kawa::StatusLine::Response { code: 100, .. } => { println!("============== HANDLE CONTINUE!"); // after a 100 continue, we expect the client to continue with its request + self.timeout_container.reset(); self.readiness.interest.insert(Ready::READABLE); kawa.clear(); return MuxResult::Continue; @@ -134,7 +137,7 @@ impl ConnectionH1 { kawa::StatusLine::Response { code: 103, .. } => { println!("============== HANDLE EARLY HINT!"); if let StreamState::Linked(token) = stream.state { - // after a 103 early hints, we expect the server to send its response + // after a 103 early hints, we expect the backend to send its response endpoint .readiness_mut(token) .interest @@ -149,6 +152,7 @@ impl ConnectionH1 { } let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked); if stream.context.keep_alive_frontend { + self.timeout_container.reset(); println!("{old_state:?} {:?}", self.readiness); if let StreamState::Linked(token) = old_state { println!("{:?}", endpoint.readiness(token)); @@ -160,7 +164,7 @@ impl ConnectionH1 { stream.back.clear(); stream.back.storage.clear(); stream.front.clear(); - // do not clear stream.front.storage because of H1 pipelining + // do not stream.front.storage.clear() because of H1 pipelining stream.attempts = 0; } else { return MuxResult::CloseSession; diff --git a/lib/src/protocol/mux/h2.rs b/lib/src/protocol/mux/h2.rs index 15ca91c8f..1f40274dc 100644 --- a/lib/src/protocol/mux/h2.rs +++ b/lib/src/protocol/mux/h2.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, str::from_utf8_unchecked}; +use std::collections::HashMap; use rusty_ulid::Ulid; use sozu_command::ready::Ready; @@ -60,7 +60,7 @@ impl Default for H2Settings { Self { settings_header_table_size: 4096, settings_enable_push: true, - settings_max_concurrent_streams: 256, + settings_max_concurrent_streams: 100, settings_initial_window_size: (1 << 16) - 1, settings_max_frame_size: 1 << 14, settings_max_header_list_size: u32::MAX, @@ -128,6 +128,7 @@ impl ConnectionH2 { E: Endpoint, { println_!("======= MUX H2 READABLE {:?}", self.position); + self.timeout_container.reset(); let (stream_id, kawa) = if let Some((stream_id, amount)) = self.expect_read { let kawa = match stream_id { H2StreamId::Zero => &mut self.zero, @@ -269,7 +270,7 @@ impl ConnectionH2 { self.expect_read = Some((stream_id, header.payload_len as usize)); self.state = H2State::Frame(header); } - Err(e) => panic!("stream error: {:?}", error_nom_to_h2(e)), + Err(_) => return self.goaway(H2Error::ProtocolError), }; } (H2State::ContinuationHeader(headers), _) => { @@ -287,7 +288,7 @@ impl ConnectionH2 { headers.header_block_fragment.len += header.payload_len; self.state = H2State::ContinuationFrame(headers); } - Err(e) => panic!("stream error: {:?}", error_nom_to_h2(e)), + Err(_) => return self.goaway(H2Error::ProtocolError), }; } (H2State::Frame(header), _) => { @@ -325,6 +326,7 @@ impl ConnectionH2 { E: Endpoint, { println_!("======= MUX H2 WRITABLE {:?}", self.position); + self.timeout_container.reset(); if let Some(H2StreamId::Zero) = self.expect_write { let kawa = &mut self.zero; println_!("{:?}", kawa.storage.data()); @@ -607,7 +609,19 @@ impl ConnectionH2 { rst_stream.error_code, error_code_to_str(rst_stream.error_code) ); - self.streams.remove(&rst_stream.stream_id); + if let Some(stream_id) = self.streams.remove(&rst_stream.stream_id) { + let stream = &mut context.streams[stream_id]; + if let StreamState::Linked(token) = stream.state { + endpoint.end_stream(token, stream_id, context); + } + let stream = &mut context.streams[stream_id]; + match self.position { + Position::Client(_) => {} + Position::Server => { + stream.state = StreamState::Recycle; + } + } + } } Frame::Settings(settings) => { if settings.ack { diff --git a/lib/src/protocol/mux/mod.rs b/lib/src/protocol/mux/mod.rs index 4ac1df974..33a57c771 100644 --- a/lib/src/protocol/mux/mod.rs +++ b/lib/src/protocol/mux/mod.rs @@ -1,8 +1,8 @@ use std::{ cell::RefCell, collections::HashMap, - io::Write, - net::SocketAddr, + io::{ErrorKind, Write}, + net::{Shutdown, SocketAddr}, rc::{Rc, Weak}, }; @@ -47,7 +47,7 @@ macro_rules! println_ { }; } fn debug_kawa(_kawa: &GenericHttpStream) { - kawa::debug_kawa(_kawa); + // kawa::debug_kawa(_kawa); } /// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer @@ -299,18 +299,24 @@ impl Connection { Connection::H2(c) => &mut c.position, } } - pub fn timeout_container(&mut self) -> &mut TimeoutContainer { - match self { - Connection::H1(c) => &mut c.timeout_container, - Connection::H2(c) => &mut c.timeout_container, - } - } pub fn socket(&self) -> &TcpStream { match self { Connection::H1(c) => c.socket.socket_ref(), Connection::H2(c) => c.socket.socket_ref(), } } + pub fn socket_mut(&mut self) -> &mut TcpStream { + match self { + Connection::H1(c) => c.socket.socket_mut(), + Connection::H2(c) => c.socket.socket_mut(), + } + } + pub fn timeout_container(&mut self) -> &mut TimeoutContainer { + match self { + Connection::H1(c) => &mut c.timeout_container, + Connection::H2(c) => &mut c.timeout_container, + } + } fn force_disconnect(&mut self) -> MuxResult { match self { Connection::H1(c) => c.force_disconnect(), @@ -949,6 +955,9 @@ impl SessionState for Mux { *position = Position::Client(BackendStatus::Connected( std::mem::take(cluster_id), )); + backend + .timeout_container() + .set_duration(self.router.configured_backend_timeout); } _ => {} } @@ -979,7 +988,29 @@ impl SessionState for Mux { } if !dead_backends.is_empty() { for token in &dead_backends { - self.router.backends.remove(token); + let proxy_borrow = proxy.borrow(); + if let Some(mut backend) = self.router.backends.remove(token) { + backend.timeout_container().cancel(); + let socket = backend.socket_mut(); + if let Err(e) = proxy_borrow.deregister_socket(socket) { + error!("error deregistering back socket({:?}): {:?}", socket, e); + } + if let Err(e) = socket.shutdown(Shutdown::Both) { + if e.kind() != ErrorKind::NotConnected { + error!( + "error shutting down back socket({:?}): {:?}", + socket, e + ); + } + } + } else { + error!("session {:?} has no backend!", token); + } + if !proxy_borrow.remove_session(*token) { + error!("session {:?} was already removed!", token); + } else { + println!("SUCCESS: session {token:?} was removed!"); + } } println_!("FRONTEND: {:#?}", self.frontend); println_!("BACKENDS: {:#?}", self.router.backends); @@ -1011,10 +1042,15 @@ impl SessionState for Mux { } let context = &mut self.context; - let front_readiness = self.frontend.readiness_mut(); let mut dirty = false; for stream_id in 0..context.streams.len() { if context.streams[stream_id].state == StreamState::Link { + // Before the first request triggers a stream Link, the frontend timeout is set + // to a shorter request_timeout, here we switch to the longer nominal timeout + self.frontend + .timeout_container() + .set_duration(self.configured_frontend_timeout); + let front_readiness = self.frontend.readiness_mut(); dirty = true; match self.router.connect( stream_id, @@ -1103,9 +1139,9 @@ impl SessionState for Mux { should_write = true; } StreamState::Linked(_) => { - // A stream Linked to a backend is waiting for the response, not the answer. + // A stream Linked to a backend is waiting for the response, not the request. // For streaming or malformed requests, it is possible that the request is not - // terminated at this point. For now, we do nothing and + // terminated at this point. For now, we do nothing should_close = false; } StreamState::Unlinked => { @@ -1200,27 +1236,48 @@ impl SessionState for Mux { } } - fn close(&mut self, _proxy: Rc>, _metrics: &mut SessionMetrics) { - let s = match &mut self.frontend { - Connection::H1(c) => &mut c.socket, - Connection::H2(c) => &mut c.socket, - }; - let mut b = [0; 1024]; - let (size, status) = s.socket_read(&mut b); - println_!("{size} {status:?} {:?}", &b[..size]); - for stream in &mut self.context.streams { - for kawa in [&mut stream.front, &mut stream.back] { - debug_kawa(kawa); - kawa.prepare(&mut kawa::h1::BlockConverter); - let out = kawa.as_io_slice(); - let mut writer = std::io::BufWriter::new(Vec::new()); - let amount = writer.write_vectored(&out).unwrap(); - println_!( - "amount: {amount}\n{}", - String::from_utf8_lossy(writer.buffer()) - ); + fn close(&mut self, proxy: Rc>, _metrics: &mut SessionMetrics) { + println_!("FRONTEND: {:#?}", self.frontend); + println_!("BACKENDS: {:#?}", self.router.backends); + + for (token, backend) in &mut self.router.backends { + let proxy_borrow = proxy.borrow(); + backend.timeout_container().cancel(); + let socket = backend.socket_mut(); + if let Err(e) = proxy_borrow.deregister_socket(socket) { + error!("error deregistering back socket({:?}): {:?}", socket, e); + } + if let Err(e) = socket.shutdown(Shutdown::Both) { + if e.kind() != ErrorKind::NotConnected { + error!("error shutting down back socket({:?}): {:?}", socket, e); + } + } + if !proxy_borrow.remove_session(*token) { + error!("session {:?} was already removed!", token); + } else { + println!("SUCCESS: session {token:?} was removed!"); } } + // let s = match &mut self.frontend { + // Connection::H1(c) => &mut c.socket, + // Connection::H2(c) => &mut c.socket, + // }; + // let mut b = [0; 1024]; + // let (size, status) = s.socket_read(&mut b); + // println_!("{size} {status:?} {:?}", &b[..size]); + // for stream in &mut self.context.streams { + // for kawa in [&mut stream.front, &mut stream.back] { + // debug_kawa(kawa); + // kawa.prepare(&mut kawa::h1::BlockConverter); + // let out = kawa.as_io_slice(); + // let mut writer = std::io::BufWriter::new(Vec::new()); + // let amount = writer.write_vectored(&out).unwrap(); + // println_!( + // "amount: {amount}\n{}", + // String::from_utf8_lossy(writer.buffer()) + // ); + // } + // } } fn shutting_down(&mut self) -> SessionIsToBeClosed { diff --git a/lib/src/protocol/mux/pkawa.rs b/lib/src/protocol/mux/pkawa.rs index fbb49e5da..d00a430ef 100644 --- a/lib/src/protocol/mux/pkawa.rs +++ b/lib/src/protocol/mux/pkawa.rs @@ -117,7 +117,7 @@ pub fn handle_header( version: Version::V20, code, status, - reason: Store::Static(b"Default"), + reason: Store::Static(b"FromH2"), } } };