Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use heap indirection to manage type signatures #2510

Merged
merged 30 commits into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ members = [
"tools",
]

# Debug symbols end up chewing up several GB of disk space, so better to just
# disable them.
[profile.dev]
debug = false

[profile.test]
debug = false

[profile.release]
debug = 1
lto = true
13 changes: 7 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@ RUN apt-get update && \
fi && \
rm -rf /var/lib/apt/lists/*

ENV CARGO_INCREMENTAL=0
ENV CARGO_NET_RETRY=10
ENV RUSTFLAGS="-D warnings -A deprecated"
ENV RUSTUP_MAX_RETRIES=10

WORKDIR /usr/src/linkerd2-proxy
WORKDIR /src
COPY . .
RUN --mount=type=cache,id=cargo,target=/usr/local/cargo/registry \
just fetch
ENV CARGO_INCREMENTAL=0
ENV RUSTFLAGS="-D warnings -A deprecated"
ARG TARGETARCH="amd64"
ARG PROFILE="release"
ARG LINKERD2_PROXY_VERSION=""
ARG LINKERD2_PROXY_VENDOR=""
RUN --mount=type=cache,id=cargo,target=/usr/local/cargo/registry \
just arch="$TARGETARCH" features="$PROXY_FEATURES" profile="$PROFILE" build && \
mkdir -p /out && \
mv $(just --evaluate profile="$PROFILE" _target_bin) /out/linkerd2-proxy
/usr/bin/time -v just arch="$TARGETARCH" features="$PROXY_FEATURES" profile="$PROFILE" build && \
bin=$(just --evaluate profile="$PROFILE" _target_bin) ; \
du -sh "$bin" "$bin".dbg && \
mkdir -p /out && mv "$bin" /out/linkerd2-proxy

FROM $LINKERD2_IMAGE as linkerd2

Expand Down
21 changes: 17 additions & 4 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,23 @@ impl Config {

let (ready, latch) = crate::server::Readiness::new();
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
let admin = svc::stack(move |_| admin.clone())
.push(metrics.proxy.http_endpoint.to_layer::<classify::Response, _, Permitted>())
let http = svc::stack(move |_| admin.clone())
.push(
metrics
.proxy
.http_endpoint
.to_layer::<classify::Response, _, Permitted>(),
)
.push(classify::NewClassify::layer_default())
.push_map_target(|(permit, http)| Permitted { permit, http })
.push(inbound::policy::NewHttpPolicy::layer(metrics.http_authz.clone()))
.push(inbound::policy::NewHttpPolicy::layer(
metrics.http_authz.clone(),
))
Comment on lines +100 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like just Rustfmt, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

.push(Rescue::layer())
.push_on_service(http::BoxResponse::layer())
.arc_new_clone_http();

let tcp = http
.unlift_new()
.push(http::NewServeHttp::layer(Default::default(), drain.clone()))
.push_filter(
Expand Down Expand Up @@ -147,6 +157,7 @@ impl Config {
}
},
)
.arc_new_tcp()
.lift_new_with_target()
.push(detect::NewDetectService::layer(svc::stack::CloneParam::from(
detect::Config::<http::DetectHttp>::from_timeout(DETECT_TIMEOUT),
Expand All @@ -160,12 +171,14 @@ impl Config {
policy: policy.clone(),
}
})
.arc_new_tcp()
.push(tls::NewDetectTls::<identity::Server, _, _>::layer(TlsParams {
identity,
}))
.arc_new_tcp()
.into_inner();

let serve = Box::pin(serve::serve(listen, admin, drain.signaled()));
let serve = Box::pin(serve::serve(listen, tcp, drain.signaled()));
Ok(Task {
listen_addr,
latch,
Expand Down
61 changes: 61 additions & 0 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ pub type BoxHttp<B = http::BoxBody> =

pub type ArcNewHttp<T, B = http::BoxBody> = ArcNewService<T, BoxHttp<B>>;

pub type BoxCloneHttp<B = http::BoxBody> =
BoxCloneService<http::Request<B>, http::Response<http::BoxBody>, Error>;

pub type ArcNewCloneHttp<T, B = http::BoxBody> = ArcNewService<T, BoxCloneHttp<B>>;

pub type BoxTcp<I> = BoxService<I, (), Error>;

pub type ArcNewTcp<T, I> = ArcNewService<T, BoxTcp<I>>;

pub type BoxCloneTcp<I> = BoxCloneService<I, (), Error>;

pub type ArcNewCloneTcp<T, I> = ArcNewService<T, BoxCloneTcp<I>>;

#[derive(Clone, Debug)]
pub struct Layers<L>(L);

Expand Down Expand Up @@ -276,6 +285,58 @@ impl<S> Stack<S> {
self.push(NewCachedDiscover::layer(discover, idle))
}

pub fn arc_new_http<T, B, Svc>(self) -> Stack<ArcNewHttp<T, B>>
where
T: 'static,
B: 'static,
S: NewService<T, Service = Svc> + Send + Sync + 'static,
Svc: Service<http::Request<B>, Response = http::Response<http::BoxBody>, Error = Error>,
Svc: Send + 'static,
Svc::Future: Send,
{
self.arc_new_box()
}

pub fn arc_new_clone_http<T, B, Svc>(self) -> Stack<ArcNewCloneHttp<T, B>>
where
T: 'static,
B: 'static,
S: NewService<T, Service = Svc> + Send + Sync + 'static,
Svc: Service<http::Request<B>, Response = http::Response<http::BoxBody>, Error = Error>,
Svc: Clone + Send + 'static,
Svc::Future: Send,
{
self.push_on_service(BoxCloneService::layer())
.push(ArcNewService::layer())
}

pub fn arc_new_tcp<T, I, Svc>(self) -> Stack<ArcNewTcp<T, I>>
where
T: 'static,
I: 'static,
S: NewService<T, Service = Svc> + Send + Sync + 'static,
Svc: Service<I, Response = (), Error = Error>,
Svc: Send + 'static,
Svc::Future: Send,
{
self.arc_new_box()
}

pub fn arc_new_box<T, Req, Svc>(
self,
) -> Stack<ArcNewService<T, BoxService<Req, Svc::Response, Error>>>
where
T: 'static,
Req: 'static,
S: NewService<T, Service = Svc> + Send + Sync + 'static,
Svc: Service<Req, Error = Error>,
Svc: Send + 'static,
Svc::Future: Send,
{
self.push_on_service(BoxService::layer())
.push(ArcNewService::layer())
}

/// Validates that this stack serves T-typed targets.
pub fn check_new<T>(self) -> Self
where
Expand Down
30 changes: 12 additions & 18 deletions linkerd/app/gateway/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ impl Gateway {
/// Wrap the provided outbound HTTP client with the inbound HTTP server,
/// inbound authorization, tagged-transport gateway routing, and the
/// outbound router.
pub fn http<T, N, R, NSvc>(
pub fn http<T, R>(
&self,
inner: N,
inner: svc::ArcNewHttp<
outbound::http::concrete::Endpoint<
outbound::http::logical::Concrete<outbound::http::Http<Target>>,
>,
>,
resolve: R,
) -> svc::Stack<
svc::ArcNewService<
Expand Down Expand Up @@ -78,21 +82,6 @@ impl Gateway {
T: Clone + Send + Sync + Unpin + 'static,
// Endpoint resolution.
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
// HTTP outbound stack.
N: svc::NewService<
outbound::http::concrete::Endpoint<
outbound::http::logical::Concrete<outbound::http::Http<Target>>,
>,
Service = NSvc,
>,
N: Clone + Send + Sync + Unpin + 'static,
NSvc: svc::Service<
http::Request<http::BoxBody>,
Response = http::Response<http::BoxBody>,
Error = Error,
>,
NSvc: Send + Unpin + 'static,
NSvc::Future: Send + Unpin + 'static,
{
let http = self
.outbound
Expand All @@ -102,12 +91,14 @@ impl Gateway {
.into_stack()
// Discard `T` and its associated client-specific metadata.
.push_map_target(Target::discard_parent)
.push(svc::ArcNewService::layer())
// Add headers to prevent loops.
.push(NewHttpGateway::layer(
self.inbound.identity().local_id().clone(),
))
.push_on_service(svc::LoadShed::layer())
.lift_new()
.push(svc::ArcNewService::layer())
// After protocol-downgrade, we need to build an inner stack for
// each request-level HTTP version.
.push(svc::NewOneshotRoute::layer_via(|t: &Target<T>| {
Expand All @@ -132,8 +123,10 @@ impl Gateway {
parent,
})
})
.push(svc::ArcNewService::layer())
// Authorize requests to the gateway.
.push(self.inbound.authorize_http());
.push(self.inbound.authorize_http())
.arc_new_clone_http();

self.inbound
.clone()
Expand All @@ -144,6 +137,7 @@ impl Gateway {
// servers.
.push_http_server()
.into_stack()
.arc_new_clone_http()
}
}

Expand Down
5 changes: 4 additions & 1 deletion linkerd/app/gateway/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ async fn upgraded_request_remains_relative_form() {
Metadata::default(),
);
gateway
.http(move |_: _| inner.clone(), resolve)
.http(
svc::ArcNewHttp::new(move |_: _| svc::BoxHttp::new(inner.clone())),
resolve,
)
.new_service(Target)
};

Expand Down
3 changes: 1 addition & 2 deletions linkerd/app/gateway/src/opaq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl Gateway {
)
// Authorize connections to the gateway.
.push(self.inbound.authorize_tcp())
.push_on_service(svc::BoxService::layer())
.push(svc::ArcNewService::layer())
.arc_new_tcp()
}
}
3 changes: 1 addition & 2 deletions linkerd/app/gateway/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ impl Gateway {
.with_stack(protocol)
.push_discover(discover)
.into_stack()
.push_on_service(svc::BoxService::layer())
.push(svc::ArcNewService::layer())
.arc_new_tcp()
}
}

Expand Down
3 changes: 0 additions & 3 deletions linkerd/app/inbound/fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,3 @@ name = "fuzz_target_1"
path = "fuzz_targets/fuzz_target_1.rs"
test = true
doc = false

[patch.crates-io]
webpki = { git = "https://github.com/linkerd/webpki", branch = "cert-dns-names-0.22" }
3 changes: 1 addition & 2 deletions linkerd/app/inbound/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ impl<N> Inbound<N> {
let OrigDstAddr(addr) = t.param();
info_span!("server", port = addr.port())
})
.push_on_service(svc::BoxService::layer())
.push(svc::ArcNewService::layer())
.arc_new_tcp()
})
}
}
Expand Down
Loading