Skip to content

Commit

Permalink
http: Parameterize NewServeHttp (#2696)
Browse files Browse the repository at this point in the history
We plan to add defensive timeouts to the HTTP server to limit idle streams and
connections. In preparation for this, we need to parameterize the server
to accept these additonal configurations.

This change updates NewServeHttp to use an ExtractParam to build a Params struct
for each server. In follow-up changes, the timeout configuration will be
instrumented through this Params struct.

There are no functional changes in this commit.
  • Loading branch information
olix0r authored Feb 5, 2024
1 parent 28b2d83 commit 96124bc
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 86 deletions.
11 changes: 10 additions & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,16 @@ impl Config {

let tcp = http
.unlift_new()
.push(http::NewServeHttp::layer(Default::default(), drain.clone()))
.push(http::NewServeHttp::layer({
let drain = drain.clone();
move |t: &Http| {
http::ServerParams {
version: t.version,
h2: Default::default(),
drain: drain.clone(),
}
}
}))
.push_filter(
|(http, tcp): (
Result<Option<http::Version>, detect::DetectTimeoutError<_>>,
Expand Down
14 changes: 8 additions & 6 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::set_identity_header::NewSetIdentityHeader;
use crate::{policy, Inbound};
pub use linkerd_app_core::proxy::http::{normalize_uri, Version};
use linkerd_app_core::{
config::{ProxyConfig, ServerConfig},
config::ProxyConfig,
errors, http_tracing, io,
metrics::ServerLabel,
proxy::http,
Expand Down Expand Up @@ -112,16 +112,18 @@ impl<H> Inbound<H> {
HSvc::Future: Send,
{
self.map_stack(|config, rt, http| {
let ProxyConfig {
server: ServerConfig { h2_settings, .. },
..
} = config.proxy;
let h2 = config.proxy.server.h2_settings;
let drain = rt.drain.clone();

http.push_on_service(http::BoxRequest::layer())
.check_new_service::<T, http::Request<_>>()
.unlift_new()
.check_new_new_service::<T, http::ClientHandle, http::Request<_>>()
.push(http::NewServeHttp::layer(h2_settings, rt.drain.clone()))
.push(http::NewServeHttp::layer(move |t: &T| http::ServerParams {
version: t.param(),
h2,
drain: drain.clone(),
}))
.check_new_service::<T, I>()
.arc_new_tcp()
})
Expand Down
44 changes: 32 additions & 12 deletions linkerd/app/outbound/src/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
use super::IdentityRequired;
use crate::{http, trace_labels, Outbound};
use linkerd_app_core::{
errors, http_tracing, io,
svc::{self, ExtractParam},
Error, Result,
};
use linkerd_app_core::{drain, errors, http_tracing, io, svc, Error, Result};

#[derive(Copy, Clone, Debug)]
pub(crate) struct ServerRescue {
emit_headers: bool,
}

#[derive(Clone, Debug)]
pub struct ExtractServerParams {
h2: http::h2::Settings,
drain: drain::Watch,
}

impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
/// Builds a [`svc::NewService`] stack that prepares HTTP requests to be
/// proxied.
Expand Down Expand Up @@ -61,7 +63,9 @@ impl<T> Outbound<svc::ArcNewCloneHttp<T>> {
impl<N> Outbound<N> {
pub fn push_tcp_http_server<T, I, NSvc>(
self,
) -> Outbound<http::NewServeHttp<svc::ArcNewService<T, svc::NewCloneService<NSvc>>>>
) -> Outbound<
http::NewServeHttp<ExtractServerParams, svc::ArcNewService<T, svc::NewCloneService<NSvc>>>,
>
where
// Target
T: svc::Param<http::Version>,
Expand All @@ -81,10 +85,10 @@ impl<N> Outbound<N> {
self.map_stack(|config, rt, http| {
http.unlift_new()
.push(svc::ArcNewService::layer())
.push(http::NewServeHttp::layer(
config.proxy.server.h2_settings,
rt.drain.clone(),
))
.push(http::NewServeHttp::layer(ExtractServerParams {
h2: config.proxy.server.h2_settings,
drain: rt.drain.clone(),
}))
})
}
}
Expand All @@ -99,14 +103,14 @@ impl ServerRescue {
}
}

impl<T> ExtractParam<Self, T> for ServerRescue {
impl<T> svc::ExtractParam<Self, T> for ServerRescue {
#[inline]
fn extract_param(&self, _: &T) -> Self {
*self
}
}

impl<T> ExtractParam<errors::respond::EmitHeaders, T> for ServerRescue {
impl<T> svc::ExtractParam<errors::respond::EmitHeaders, T> for ServerRescue {
#[inline]
fn extract_param(&self, _: &T) -> errors::respond::EmitHeaders {
errors::respond::EmitHeaders(self.emit_headers)
Expand Down Expand Up @@ -179,3 +183,19 @@ impl errors::HttpRescue<Error> for ServerRescue {
Ok(errors::SyntheticHttpResponse::unexpected_error())
}
}

// === impl ExtractServerParams ===

impl<T> svc::ExtractParam<http::ServerParams, T> for ExtractServerParams
where
T: svc::Param<http::Version>,
{
#[inline]
fn extract_param(&self, t: &T) -> http::ServerParams {
http::ServerParams {
version: t.param(),
h2: self.h2,
drain: self.drain.clone(),
}
}
}
21 changes: 10 additions & 11 deletions linkerd/app/outbound/src/ingress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{http, opaq, policy, Config, Discovery, Outbound, ParentRef};
use crate::{http, opaq, policy, Discovery, Outbound, ParentRef};
use linkerd_app_core::{
config::{ProxyConfig, ServerConfig},
detect, errors, io,
metrics::prom,
profiles,
Expand Down Expand Up @@ -278,14 +277,6 @@ impl<N> Outbound<N> {
{
self.map_stack(|config, rt, inner| {
let detect_http = config.proxy.detect_http();
let Config {
proxy:
ProxyConfig {
server: ServerConfig { h2_settings, .. },
..
},
..
} = config;

// Route requests with destinations that can be discovered via the
// `l5d-dst-override` header through the (load balanced) logical
Expand Down Expand Up @@ -314,7 +305,15 @@ impl<N> Outbound<N> {
// destination address.
http.check_new_service::<Http<T>, http::Request<_>>()
.unlift_new()
.push(http::NewServeHttp::layer(*h2_settings, rt.drain.clone()))
.push(http::NewServeHttp::layer({
let h2 = config.proxy.server.h2_settings;
let drain = rt.drain.clone();
move |http: &Http<T>| http::ServerParams {
version: http.version,
h2,
drain: drain.clone()
}
}))
.check_new_service::<Http<T>, I>()
.push_switch(
|(detected, target): (detect::Result<http::Version>, T)| -> Result<_, Infallible> {
Expand Down
13 changes: 9 additions & 4 deletions linkerd/app/outbound/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@ impl<N> Outbound<N> {
let opaq = self.clone().into_stack();

let http = self.with_stack(http).map_stack(|config, rt, stk| {
let h2 = config.proxy.server.h2_settings;
let drain = rt.drain.clone();
stk.push_on_service(http::BoxRequest::layer())
.unlift_new()
.push(http::NewServeHttp::layer(
config.proxy.server.h2_settings,
rt.drain.clone(),
))
.push(http::NewServeHttp::layer(move |t: &Http<T>| {
http::ServerParams {
version: t.version,
h2,
drain: drain.clone(),
}
}))
.arc_new_tcp()
});

Expand Down
2 changes: 1 addition & 1 deletion linkerd/proxy/http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub use self::{
normalize_uri::{MarkAbsoluteForm, NewNormalizeUri},
override_authority::{AuthorityOverride, NewOverrideAuthority},
retain::Retain,
server::{NewServeHttp, ServeHttp},
server::{NewServeHttp, Params as ServerParams, ServeHttp},
strip_header::StripHeader,
timeout::{NewTimeout, ResponseTimeout, ResponseTimeoutError},
version::Version,
Expand Down
101 changes: 50 additions & 51 deletions linkerd/proxy/http/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use linkerd_error::Error;
use linkerd_io::{self as io, PeerAddr};
use linkerd_stack::{layer, NewService, Param};
use linkerd_stack::{layer, ExtractParam, NewService};
use std::{
future::Future,
pin::Pin,
Expand All @@ -18,13 +18,22 @@ use tracing::{debug, Instrument};

type Server = hyper::server::conn::Http<trace::Executor>;

/// Configures HTTP server behavior.
#[derive(Clone, Debug)]
pub struct NewServeHttp<N> {
pub struct Params {
pub version: Version,
pub h2: H2Settings,
pub drain: drain::Watch,
}

// A stack that builds HTTP servers.
#[derive(Clone, Debug)]
pub struct NewServeHttp<X, N> {
inner: N,
server: Server,
drain: drain::Watch,
params: X,
}

/// Serves HTTP connectionswith an inner service.
#[derive(Clone, Debug)]
pub struct ServeHttp<N> {
version: Version,
Expand All @@ -35,55 +44,43 @@ pub struct ServeHttp<N> {

// === impl NewServeHttp ===

impl<N> NewServeHttp<N> {
pub fn layer(
h2: H2Settings,
drain: drain::Watch,
) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self::new(h2, inner, drain.clone()))
impl<X: Clone, N> NewServeHttp<X, N> {
pub fn layer(params: X) -> impl layer::Layer<N, Service = Self> + Clone {
layer::mk(move |inner| Self::new(params.clone(), inner))
}

/// Creates a new `ServeHttp`.
fn new(h2: H2Settings, inner: N, drain: drain::Watch) -> Self {
let mut server = hyper::server::conn::Http::new().with_executor(trace::Executor::new());
server
.http2_initial_stream_window_size(h2.initial_stream_window_size)
.http2_initial_connection_window_size(h2.initial_connection_window_size);

// Configure HTTP/2 PING frames
if let Some(timeout) = h2.keepalive_timeout {
// XXX(eliza): is this a reasonable interval between
// PING frames?
let interval = timeout / 4;
server
.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(interval);
}

Self {
inner,
server,
drain,
}
fn new(params: X, inner: N) -> Self {
Self { inner, params }
}
}

impl<T, N> NewService<T> for NewServeHttp<N>
impl<T, X, N> NewService<T> for NewServeHttp<X, N>
where
T: Param<Version>,
X: ExtractParam<Params, T>,
N: NewService<T> + Clone,
{
type Service = ServeHttp<N::Service>;

fn new_service(&self, target: T) -> Self::Service {
let version = target.param();
let Params { version, h2, drain } = self.params.extract_param(&target);

let mut srv = hyper::server::conn::Http::new().with_executor(trace::Executor::new());
srv.http2_initial_stream_window_size(h2.initial_stream_window_size)
.http2_initial_connection_window_size(h2.initial_connection_window_size);
// Configure HTTP/2 PING frames
if let Some(timeout) = h2.keepalive_timeout {
srv.http2_keep_alive_timeout(timeout)
.http2_keep_alive_interval(timeout / 4);
}

debug!(?version, "Creating HTTP service");
let inner = self.inner.new_service(target);
ServeHttp {
inner,
version,
server: self.server.clone(),
drain: self.drain.clone(),
drain,
server: srv,
}
}
}
Expand All @@ -93,7 +90,7 @@ where
impl<I, N, S> Service<I> for ServeHttp<N>
where
I: io::AsyncRead + io::AsyncWrite + PeerAddr + Send + Unpin + 'static,
N: NewService<ClientHandle, Service = S> + Clone + Send + 'static,
N: NewService<ClientHandle, Service = S> + Send + 'static,
S: Service<http::Request<UpgradeBody>, Response = http::Response<http::BoxBody>, Error = Error>
+ Unpin
+ Send
Expand All @@ -109,29 +106,30 @@ where
}

fn call(&mut self, io: I) -> Self::Future {
let Self {
version,
inner,
drain,
mut server,
} = self.clone();
debug!(?version, "Handling as HTTP");
let version = self.version;
let drain = self.drain.clone();
let mut server = self.server.clone();

let res = io.peer_addr().map(|pa| {
let (handle, closed) = ClientHandle::new(pa);
let svc = self.inner.new_service(handle.clone());
let svc = SetClientHandle::new(handle, svc);
(svc, closed)
});

Box::pin(
async move {
let client_addr = io.peer_addr()?;
let (client_handle, closed) = ClientHandle::new(client_addr);
// TODO(ver): Move this into the inner stack.
let svc =
SetClientHandle::new(client_handle.clone(), inner.new_service(client_handle));

let (svc, closed) = res?;
debug!(?version, "Handling as HTTP");
match version {
Version::Http1 => {
// Enable support for HTTP upgrades (CONNECT and websockets).
let svc = upgrade::Service::new(svc, drain.clone());
let mut conn = server
.http1_only(true)
.serve_connection(io, upgrade::Service::new(svc, drain.clone()))
.serve_connection(io, svc)
.with_upgrades();

tokio::select! {
res = &mut conn => {
debug!(?res, "The client is shutting down the connection");
Expand All @@ -154,6 +152,7 @@ where
let mut conn = server
.http2_only(true)
.serve_connection(io, HyperServerSvc::new(svc));

tokio::select! {
res = &mut conn => {
debug!(?res, "The client is shutting down the connection");
Expand Down

0 comments on commit 96124bc

Please sign in to comment.