Skip to content

Commit

Permalink
Properly deregister backends from slab and mio
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Oct 18, 2023
1 parent b135cac commit fd40346
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 44 deletions.
2 changes: 1 addition & 1 deletion e2e/src/mock/async_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl<A: Aggregator + Send + Sync + 'static> BackendHandle<A> {
let name = name.into();
let (stop_tx, mut stop_rx) = mpsc::channel::<()>(1);
let (mut aggregator_tx, aggregator_rx) = mpsc::channel::<A>(1);
let listener = TcpListener::bind(address).expect("could not bind");
let listener = TcpListener::bind(address).expect(&format!("could not bind on: {address}"));
let mut clients = Vec::new();
let thread_name = name.to_owned();

Expand Down
2 changes: 1 addition & 1 deletion e2e/src/mock/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Client {
/// Establish a TCP connection with its address,
/// register the yielded TCP stream, apply timeouts
pub fn connect(&mut self) {
let stream = TcpStream::connect(self.address).expect("could not connect");
let stream = TcpStream::connect(self.address).expect(&format!("could not connect to: {}", self.address));
stream
.set_read_timeout(Some(Duration::from_millis(100)))
.expect("could not set read timeout");
Expand Down
2 changes: 1 addition & 1 deletion e2e/src/mock/sync_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Backend {

/// Binds itself to its address, stores the yielded TCP listener
pub fn connect(&mut self) {
let listener = TcpListener::bind(self.address).expect("could not bind");
let listener = TcpListener::bind(self.address).expect(&format!("could not bind on: {}", self.address));
let timeout = Duration::from_millis(100);
let timeout = libc::timeval {
tv_sec: 0,
Expand Down
10 changes: 7 additions & 3 deletions lib/src/protocol/mux/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
E: Endpoint,
{
println_!("======= MUX H1 READABLE {:?}", self.position);
self.timeout_container.reset();
let stream = &mut context.streams[self.stream];
let parts = stream.split(&self.position);
let kawa = parts.rbuffer;
Expand Down Expand Up @@ -67,6 +68,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
return MuxResult::Continue;
}
if kawa.is_terminated() {
self.timeout_container.cancel();
self.readiness.interest.remove(Ready::READABLE);
}
if kawa.is_main_phase() {
Expand Down Expand Up @@ -95,6 +97,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
E: Endpoint,
{
println_!("======= MUX H1 WRITABLE {:?}", self.position);
self.timeout_container.reset();
let stream = &mut context.streams[self.stream];
let kawa = stream.wbuffer(&self.position);
kawa.prepare(&mut kawa::h1::BlockConverter);
Expand All @@ -121,20 +124,20 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
match kawa.detached.status_line {
kawa::StatusLine::Response { code: 101, .. } => {
println!("============== HANDLE UPGRADE!");
// unimplemented!();
return MuxResult::Upgrade;
}
kawa::StatusLine::Response { code: 100, .. } => {
println!("============== HANDLE CONTINUE!");
// after a 100 continue, we expect the client to continue with its request
self.timeout_container.reset();
self.readiness.interest.insert(Ready::READABLE);
kawa.clear();
return MuxResult::Continue;
}
kawa::StatusLine::Response { code: 103, .. } => {
println!("============== HANDLE EARLY HINT!");
if let StreamState::Linked(token) = stream.state {
// after a 103 early hints, we expect the server to send its response
// after a 103 early hints, we expect the backend to send its response
endpoint
.readiness_mut(token)
.interest
Expand All @@ -149,6 +152,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
}
let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
if stream.context.keep_alive_frontend {
self.timeout_container.reset();
println!("{old_state:?} {:?}", self.readiness);
if let StreamState::Linked(token) = old_state {
println!("{:?}", endpoint.readiness(token));
Expand All @@ -160,7 +164,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
stream.back.clear();
stream.back.storage.clear();
stream.front.clear();
// do not clear stream.front.storage because of H1 pipelining
// do not stream.front.storage.clear() because of H1 pipelining
stream.attempts = 0;
} else {
return MuxResult::CloseSession;
Expand Down
24 changes: 19 additions & 5 deletions lib/src/protocol/mux/h2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, str::from_utf8_unchecked};
use std::collections::HashMap;

use rusty_ulid::Ulid;
use sozu_command::ready::Ready;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Default for H2Settings {
Self {
settings_header_table_size: 4096,
settings_enable_push: true,
settings_max_concurrent_streams: 256,
settings_max_concurrent_streams: 100,
settings_initial_window_size: (1 << 16) - 1,
settings_max_frame_size: 1 << 14,
settings_max_header_list_size: u32::MAX,
Expand Down Expand Up @@ -128,6 +128,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
E: Endpoint,
{
println_!("======= MUX H2 READABLE {:?}", self.position);
self.timeout_container.reset();
let (stream_id, kawa) = if let Some((stream_id, amount)) = self.expect_read {
let kawa = match stream_id {
H2StreamId::Zero => &mut self.zero,
Expand Down Expand Up @@ -269,7 +270,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
self.expect_read = Some((stream_id, header.payload_len as usize));
self.state = H2State::Frame(header);
}
Err(e) => panic!("stream error: {:?}", error_nom_to_h2(e)),
Err(_) => return self.goaway(H2Error::ProtocolError),
};
}
(H2State::ContinuationHeader(headers), _) => {
Expand All @@ -287,7 +288,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
headers.header_block_fragment.len += header.payload_len;
self.state = H2State::ContinuationFrame(headers);
}
Err(e) => panic!("stream error: {:?}", error_nom_to_h2(e)),
Err(_) => return self.goaway(H2Error::ProtocolError),
};
}
(H2State::Frame(header), _) => {
Expand Down Expand Up @@ -325,6 +326,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
E: Endpoint,
{
println_!("======= MUX H2 WRITABLE {:?}", self.position);
self.timeout_container.reset();
if let Some(H2StreamId::Zero) = self.expect_write {
let kawa = &mut self.zero;
println_!("{:?}", kawa.storage.data());
Expand Down Expand Up @@ -607,7 +609,19 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
rst_stream.error_code,
error_code_to_str(rst_stream.error_code)
);
self.streams.remove(&rst_stream.stream_id);
if let Some(stream_id) = self.streams.remove(&rst_stream.stream_id) {
let stream = &mut context.streams[stream_id];
if let StreamState::Linked(token) = stream.state {
endpoint.end_stream(token, stream_id, context);
}
let stream = &mut context.streams[stream_id];
match self.position {
Position::Client(_) => {}
Position::Server => {
stream.state = StreamState::Recycle;
}
}
}
}
Frame::Settings(settings) => {
if settings.ack {
Expand Down
121 changes: 89 additions & 32 deletions lib/src/protocol/mux/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::{
cell::RefCell,
collections::HashMap,
io::Write,
net::SocketAddr,
io::{ErrorKind, Write},

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Build documentation

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Build documentation

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused import: `Write`

Check warning on line 4 in lib/src/protocol/mux/mod.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

unused import: `Write`
net::{Shutdown, SocketAddr},
rc::{Rc, Weak},
};

Expand Down Expand Up @@ -47,7 +47,7 @@ macro_rules! println_ {
};
}
fn debug_kawa(_kawa: &GenericHttpStream) {
kawa::debug_kawa(_kawa);
// kawa::debug_kawa(_kawa);
}

/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
Expand Down Expand Up @@ -299,18 +299,24 @@ impl<Front: SocketHandler> Connection<Front> {
Connection::H2(c) => &mut c.position,
}
}
pub fn timeout_container(&mut self) -> &mut TimeoutContainer {
match self {
Connection::H1(c) => &mut c.timeout_container,
Connection::H2(c) => &mut c.timeout_container,
}
}
pub fn socket(&self) -> &TcpStream {
match self {
Connection::H1(c) => c.socket.socket_ref(),
Connection::H2(c) => c.socket.socket_ref(),
}
}
pub fn socket_mut(&mut self) -> &mut TcpStream {
match self {
Connection::H1(c) => c.socket.socket_mut(),
Connection::H2(c) => c.socket.socket_mut(),
}
}
pub fn timeout_container(&mut self) -> &mut TimeoutContainer {
match self {
Connection::H1(c) => &mut c.timeout_container,
Connection::H2(c) => &mut c.timeout_container,
}
}
fn force_disconnect(&mut self) -> MuxResult {
match self {
Connection::H1(c) => c.force_disconnect(),
Expand Down Expand Up @@ -949,6 +955,9 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
*position = Position::Client(BackendStatus::Connected(
std::mem::take(cluster_id),
));
backend
.timeout_container()
.set_duration(self.router.configured_backend_timeout);
}
_ => {}
}
Expand Down Expand Up @@ -979,7 +988,29 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
}
if !dead_backends.is_empty() {
for token in &dead_backends {
self.router.backends.remove(token);
let proxy_borrow = proxy.borrow();
if let Some(mut backend) = self.router.backends.remove(token) {
backend.timeout_container().cancel();
let socket = backend.socket_mut();
if let Err(e) = proxy_borrow.deregister_socket(socket) {
error!("error deregistering back socket({:?}): {:?}", socket, e);
}
if let Err(e) = socket.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!(
"error shutting down back socket({:?}): {:?}",
socket, e
);
}
}
} else {
error!("session {:?} has no backend!", token);
}
if !proxy_borrow.remove_session(*token) {
error!("session {:?} was already removed!", token);
} else {
println!("SUCCESS: session {token:?} was removed!");
}
}
println_!("FRONTEND: {:#?}", self.frontend);
println_!("BACKENDS: {:#?}", self.router.backends);
Expand Down Expand Up @@ -1011,10 +1042,15 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
}

let context = &mut self.context;
let front_readiness = self.frontend.readiness_mut();
let mut dirty = false;
for stream_id in 0..context.streams.len() {
if context.streams[stream_id].state == StreamState::Link {
// Before the first request triggers a stream Link, the frontend timeout is set
// to a shorter request_timeout, here we switch to the longer nominal timeout
self.frontend
.timeout_container()
.set_duration(self.configured_frontend_timeout);
let front_readiness = self.frontend.readiness_mut();
dirty = true;
match self.router.connect(
stream_id,
Expand Down Expand Up @@ -1103,9 +1139,9 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
should_write = true;
}
StreamState::Linked(_) => {
// A stream Linked to a backend is waiting for the response, not the answer.
// A stream Linked to a backend is waiting for the response, not the request.
// For streaming or malformed requests, it is possible that the request is not
// terminated at this point. For now, we do nothing and
// terminated at this point. For now, we do nothing
should_close = false;
}
StreamState::Unlinked => {
Expand Down Expand Up @@ -1200,27 +1236,48 @@ impl<Front: SocketHandler + std::fmt::Debug> SessionState for Mux<Front> {
}
}

fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
let s = match &mut self.frontend {
Connection::H1(c) => &mut c.socket,
Connection::H2(c) => &mut c.socket,
};
let mut b = [0; 1024];
let (size, status) = s.socket_read(&mut b);
println_!("{size} {status:?} {:?}", &b[..size]);
for stream in &mut self.context.streams {
for kawa in [&mut stream.front, &mut stream.back] {
debug_kawa(kawa);
kawa.prepare(&mut kawa::h1::BlockConverter);
let out = kawa.as_io_slice();
let mut writer = std::io::BufWriter::new(Vec::new());
let amount = writer.write_vectored(&out).unwrap();
println_!(
"amount: {amount}\n{}",
String::from_utf8_lossy(writer.buffer())
);
fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
println_!("FRONTEND: {:#?}", self.frontend);
println_!("BACKENDS: {:#?}", self.router.backends);

for (token, backend) in &mut self.router.backends {
let proxy_borrow = proxy.borrow();
backend.timeout_container().cancel();
let socket = backend.socket_mut();
if let Err(e) = proxy_borrow.deregister_socket(socket) {
error!("error deregistering back socket({:?}): {:?}", socket, e);
}
if let Err(e) = socket.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error shutting down back socket({:?}): {:?}", socket, e);
}
}
if !proxy_borrow.remove_session(*token) {
error!("session {:?} was already removed!", token);
} else {
println!("SUCCESS: session {token:?} was removed!");
}
}
// let s = match &mut self.frontend {
// Connection::H1(c) => &mut c.socket,
// Connection::H2(c) => &mut c.socket,
// };
// let mut b = [0; 1024];
// let (size, status) = s.socket_read(&mut b);
// println_!("{size} {status:?} {:?}", &b[..size]);
// for stream in &mut self.context.streams {
// for kawa in [&mut stream.front, &mut stream.back] {
// debug_kawa(kawa);
// kawa.prepare(&mut kawa::h1::BlockConverter);
// let out = kawa.as_io_slice();
// let mut writer = std::io::BufWriter::new(Vec::new());
// let amount = writer.write_vectored(&out).unwrap();
// println_!(
// "amount: {amount}\n{}",
// String::from_utf8_lossy(writer.buffer())
// );
// }
// }
}

fn shutting_down(&mut self) -> SessionIsToBeClosed {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/protocol/mux/pkawa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub fn handle_header<C>(
version: Version::V20,
code,
status,
reason: Store::Static(b"Default"),
reason: Store::Static(b"FromH2"),
}
}
};
Expand Down

0 comments on commit fd40346

Please sign in to comment.