Skip to content

Commit

Permalink
Introduce timeouts in Mux State
Browse files Browse the repository at this point in the history
Signed-off-by: Eloi DEMOLIS <[email protected]>
  • Loading branch information
Wonshtrum committed Sep 29, 2023
1 parent 8c9057d commit 2366951
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 136 deletions.
191 changes: 106 additions & 85 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,12 @@ use crate::{
},
mux::{self, Mux},
proxy_protocol::expect::ExpectProxyProtocol,
Http, Pipe, SessionState,
Pipe, SessionState,
},
router::{Route, Router},
server::{ListenSession, ListenToken, ProxyChannel, Server, SessionManager},
socket::server_bind,
timer::TimeoutContainer,
util::UnwrapLog,
AcceptError, CachedTags, FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerError,
ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, SessionIsToBeClosed,
SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
Expand Down Expand Up @@ -127,15 +126,21 @@ impl HttpSession {
gauge_add!("protocol.http", 1);
let session_address = sock.peer_addr().ok();

let frontend = mux::Connection::new_h1_server(sock, container_frontend_timeout);
let router = mux::Router::new(
configured_backend_timeout,
configured_connect_timeout,
listener.clone(),
);
let mut context = mux::Context::new(pool.clone());
context
.create_stream(request_id, 1 << 16)
.ok_or(AcceptError::BufferCapacityReached)?;
let frontend = mux::Connection::new_h1_server(sock);
HttpStateMachine::Mux(Mux {
configured_frontend_timeout,
frontend_token: token,
frontend,
router: mux::Router::new(listener.clone()),
router,
public_address,
peer_address: session_address,
sticky_name: sticky_name.clone(),
Expand Down Expand Up @@ -208,115 +213,131 @@ impl HttpSession {
.map(|add| (add.destination(), add.source()))
{
Some((Some(public_address), Some(session_address))) => {
let mut http = Http::new(
self.answers.clone(),
let frontend = mux::Connection::new_h1_server(
expect.frontend,
expect.container_frontend_timeout,
);
let router = mux::Router::new(
self.configured_backend_timeout,
self.configured_connect_timeout,
self.configured_frontend_timeout,
expect.container_frontend_timeout,
expect.frontend,
expect.frontend_token,
self.listener.clone(),
self.pool.clone(),
Protocol::HTTP,
);
let mut context = mux::Context::new(self.pool.clone());
context.create_stream(expect.request_id, 1 << 16)?;
let mut mux = Mux {
configured_frontend_timeout: self.configured_frontend_timeout,
frontend_token: self.frontend_token,
frontend,
router,
public_address,
expect.request_id,
Some(session_address),
self.sticky_name.clone(),
)
.ok()?;
http.frontend_readiness.event = expect.frontend_readiness.event;
peer_address: Some(session_address),
sticky_name: self.sticky_name.clone(),
context,
};
mux.frontend.readiness_mut().event = expect.frontend_readiness.event;

gauge_add!("protocol.proxy.expect", -1);
gauge_add!("protocol.http", 1);
unimplemented!();
// Some(HttpStateMachine::Http(http))
Some(HttpStateMachine::Mux(mux))
}
_ => None,
}
}

fn upgrade_http(&mut self, http: Http<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
debug!("http switching to ws");
let front_token = self.frontend_token;
let back_token = unwrap_msg!(http.backend_token);
let ws_context = http.context.websocket_context();

let mut container_frontend_timeout = http.container_frontend_timeout;
let mut container_backend_timeout = http.container_backend_timeout;
container_frontend_timeout.reset();
container_backend_timeout.reset();

let mut pipe = Pipe::new(
http.response_stream.storage.buffer,
http.backend_id,
http.backend_socket,
http.backend,
Some(container_backend_timeout),
Some(container_frontend_timeout),
http.cluster_id,
http.request_stream.storage.buffer,
front_token,
http.frontend_socket,
self.listener.clone(),
Protocol::HTTP,
http.context.id,
http.context.session_address,
Some(ws_context),
);

pipe.frontend_readiness.event = http.frontend_readiness.event;
pipe.backend_readiness.event = http.backend_readiness.event;
pipe.set_back_token(back_token);

gauge_add!("protocol.http", -1);
gauge_add!("protocol.ws", 1);
gauge_add!("http.active_requests", -1);
gauge_add!("websocket.active_requests", 1);
Some(HttpStateMachine::WebSocket(pipe))
}
// fn upgrade_http(&mut self, http: Http<TcpStream, HttpListener>) -> Option<HttpStateMachine> {
// debug!("http switching to ws");
// let front_token = self.frontend_token;
// let back_token = unwrap_msg!(http.backend_token);
// let ws_context = http.context.websocket_context();

// let mut container_frontend_timeout = http.container_frontend_timeout;
// let mut container_backend_timeout = http.container_backend_timeout;
// container_frontend_timeout.reset();
// container_backend_timeout.reset();

// let mut pipe = Pipe::new(
// http.response_stream.storage.buffer,
// http.backend_id,
// http.backend_socket,
// http.backend,
// Some(container_backend_timeout),
// Some(container_frontend_timeout),
// http.cluster_id,
// http.request_stream.storage.buffer,
// front_token,
// http.frontend_socket,
// self.listener.clone(),
// Protocol::HTTP,
// http.context.id,
// http.context.session_address,
// Some(ws_context),
// );

// pipe.frontend_readiness.event = http.frontend_readiness.event;
// pipe.backend_readiness.event = http.backend_readiness.event;
// pipe.set_back_token(back_token);

// gauge_add!("protocol.http", -1);
// gauge_add!("protocol.ws", 1);
// gauge_add!("http.active_requests", -1);
// gauge_add!("websocket.active_requests", 1);
// Some(HttpStateMachine::WebSocket(pipe))
// }

fn upgrade_mux(&mut self, mut mux: Mux<TcpStream>) -> Option<HttpStateMachine> {
debug!("mux switching to ws");
let stream = mux.context.streams.pop().unwrap();

let (frontend_readiness, frontend_socket) = match mux.frontend {
mux::Connection::H1(mux::ConnectionH1 {
readiness, socket, ..
}) => (readiness, socket),
// only h1<->h1 connections can upgrade to websocket
mux::Connection::H2(_) => unreachable!(),
};
let (frontend_readiness, frontend_socket, mut container_frontend_timeout) =
match mux.frontend {
mux::Connection::H1(mux::ConnectionH1 {
readiness,
socket,
timeout_container,
..
}) => (readiness, socket, timeout_container),
mux::Connection::H2(_) => {
error!("Only h1<->h1 connections can upgrade to websocket");
return None;
}
};

let mux::StreamState::Linked(back_token) = stream.state else { unreachable!() };
let backend = mux.router.backends.remove(&back_token).unwrap();
let (cluster_id, backend_readiness, backend_socket) = match backend {
mux::Connection::H1(mux::ConnectionH1 {
position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)),
readiness,
socket,
..
}) => (cluster_id, readiness, socket),
// the backend disconnected just after upgrade, abort
mux::Connection::H1(_) => return None,
// only h1<->h1 connections can upgrade to websocket
mux::Connection::H2(_) => unreachable!(),
let mux::StreamState::Linked(back_token) = stream.state else {
error!("Upgrading stream should be linked to a backend");
return None;
};
let backend = mux.router.backends.remove(&back_token).unwrap();
let (cluster_id, backend_readiness, backend_socket, mut container_backend_timeout) =
match backend {
mux::Connection::H1(mux::ConnectionH1 {
position: mux::Position::Client(mux::BackendStatus::Connected(cluster_id)),
readiness,
socket,
timeout_container,
..
}) => (cluster_id, readiness, socket, timeout_container),
mux::Connection::H1(_) => {
error!("The backend disconnected just after upgrade, abort");
return None;
}
mux::Connection::H2(_) => {
error!("Only h1<->h1 connections can upgrade to websocket");
return None;
}
};

let ws_context = stream.context.websocket_context();

// let mut container_frontend_timeout = http.container_frontend_timeout;
// let mut container_backend_timeout = http.container_backend_timeout;
// container_frontend_timeout.reset();
// container_backend_timeout.reset();
container_frontend_timeout.reset();
container_backend_timeout.reset();

let mut pipe = Pipe::new(
stream.back.storage.buffer,
None,
Some(backend_socket),
None,
None,
None,
Some(container_backend_timeout),
Some(container_frontend_timeout),
Some(cluster_id),
stream.front.storage.buffer,
self.frontend_token,
Expand Down
Loading

0 comments on commit 2366951

Please sign in to comment.