diff --git a/linkerd/app/core/src/control.rs b/linkerd/app/core/src/control.rs index 45b472d221..48633d7e7a 100644 --- a/linkerd/app/core/src/control.rs +++ b/linkerd/app/core/src/control.rs @@ -71,12 +71,7 @@ impl Config { identity: identity::NewClient, ) -> svc::ArcNewService< (), - impl svc::Service< - http::Request, - Response = http::Response, - Error = ControlError, - Future = impl Send, - > + Clone, + svc::BoxCloneSyncService, http::Response>, > { let addr = self.addr; @@ -135,6 +130,7 @@ impl Config { .push(svc::NewMapErr::layer_from_target::()) .instrument(|c: &ControlAddr| info_span!("controller", addr = %c.addr)) .push_map_target(move |()| addr.clone()) + .push_on_service(svc::BoxCloneSyncService::layer()) .push(svc::ArcNewService::layer()) .into_inner() } diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 8b075c88c8..bee41fbfb7 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -34,7 +34,7 @@ pub type BoxHttp = pub type ArcNewHttp = ArcNewService>; pub type BoxCloneHttp = - BoxCloneService, http::Response, Error>; + BoxCloneSyncService, http::Response>; pub type ArcNewCloneHttp = ArcNewService>; @@ -42,7 +42,7 @@ pub type BoxTcp = BoxService; pub type ArcNewTcp = ArcNewService>; -pub type BoxCloneTcp = BoxCloneService; +pub type BoxCloneTcp = BoxCloneSyncService; pub type ArcNewCloneTcp = ArcNewService>; @@ -302,11 +302,25 @@ impl Stack { T: 'static, B: 'static, S: NewService + Send + Sync + 'static, - Svc: Service, Response = http::Response, Error = Error>, - Svc: Clone + Send + 'static, + Svc: Service, Response = http::Response>, + Svc: Clone + Send + Sync + 'static, + Svc::Error: Into, + Svc::Future: Send, + { + self.push_on_service(BoxCloneHttp::layer()) + .push(ArcNewService::layer()) + } + + pub fn arc_new_clone_tcp(self) -> Stack> + where + T: 'static, + I: 'static, + S: NewService + Send + Sync + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Error: Into, Svc::Future: Send, { - self.push_on_service(BoxCloneService::layer()) + self.push_on_service(BoxCloneTcp::layer()) .push(ArcNewService::layer()) } diff --git a/linkerd/app/inbound/src/http/router.rs b/linkerd/app/inbound/src/http/router.rs index db4b5ed696..76274a4abb 100644 --- a/linkerd/app/inbound/src/http/router.rs +++ b/linkerd/app/inbound/src/http/router.rs @@ -74,7 +74,7 @@ impl Inbound { + Param> + Param + Param, - T: Clone + Send + Unpin + 'static, + T: Clone + Send + Sync + Unpin + 'static, P: profiles::GetProfile, C: svc::MakeConnection + Clone + Send + Sync + Unpin + 'static, C::Connection: Send + Unpin, diff --git a/linkerd/app/inbound/src/http/server.rs b/linkerd/app/inbound/src/http/server.rs index 69bf43349a..fef0173ff3 100644 --- a/linkerd/app/inbound/src/http/server.rs +++ b/linkerd/app/inbound/src/http/server.rs @@ -43,6 +43,7 @@ impl Inbound { HSvc: svc::Service, Response = http::Response> + Clone + Send + + Sync + Unpin + 'static, HSvc::Error: Into, diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index c2fc6c75a6..8f67f50d32 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -79,20 +79,7 @@ impl Outbound { /// 'failfast'. While in failfast, buffered requests are failed and the /// service becomes unavailable so callers may choose alternate concrete /// services. - pub fn push_http_concrete( - self, - resolve: R, - ) -> Outbound< - svc::ArcNewService< - T, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + pub fn push_http_concrete(self, resolve: R) -> Outbound> where // Concrete target type. T: svc::Param, @@ -157,7 +144,7 @@ impl Outbound { // TODO(ver) Configure this queue from the target (i.e. from // discovery). .push(svc::NewQueue::layer_via(config.http_request_queue)) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() }) } } @@ -182,20 +169,7 @@ where config: &crate::Config, rt: &crate::Runtime, resolve: R, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Self, - impl svc::Service< - http::Request, - Response = http::Response, - Error = BalanceError, - Future = impl std::future::Future< - Output = Result, BalanceError>, - > + Send, - >, - >, - > + Clone + ) -> impl svc::Layer> + Clone where // Endpoint resolution. R: Resolve + 'static, @@ -271,7 +245,8 @@ where port = %meta.port().map(u16::from).unwrap_or(0), ) }) - .push(svc::ArcNewService::layer()) + .push_on_service(svc::MapErr::layer_boxed()) + .arc_new_http() .into_inner() }) } diff --git a/linkerd/app/outbound/src/http/logical.rs b/linkerd/app/outbound/src/http/logical.rs index fd35ced06b..b1005eaaaa 100644 --- a/linkerd/app/outbound/src/http/logical.rs +++ b/linkerd/app/outbound/src/http/logical.rs @@ -81,19 +81,7 @@ impl Outbound { /// support per-request routing over a set of concrete inner services. /// Only available inner services are used for routing. When there are no /// available backends, requests are failed with a [`svc::stack::LoadShedError`]. - pub fn push_http_logical( - self, - ) -> Outbound< - svc::ArcNewService< - T, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + pub fn push_http_logical(self) -> Outbound> where // Logical target. T: svc::Param>, @@ -111,16 +99,13 @@ impl Outbound { self.map_stack(|_config, rt, concrete| { // For each `T` target, watch its `Profile`, rebuilding a // router stack. - let watch = concrete + concrete // Share the concrete stack with each router stack. .lift_new() .push_on_service(RouterParams::layer(rt.metrics.clone())) // Rebuild the inner router stack every time the watch changes. - .push(svc::NewSpawnWatch::::layer_into::>()); - - watch - .push_on_service(svc::MapErr::layer_boxed()) - .push(svc::ArcNewService::layer()) + .push(svc::NewSpawnWatch::::layer_into::>()) + .arc_new_clone_http() }) } } @@ -133,18 +118,7 @@ where { fn layer( metrics: OutboundMetrics, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - RouterParams, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + Clone + ) -> impl svc::Layer>> + Clone where N: svc::NewService, Service = S>, N: Clone + Send + Sync + 'static, @@ -191,8 +165,7 @@ where .into_inner(), ) .push(svc::NewMapErr::layer_from_target::()) - .push_on_service(svc::MapErr::layer_boxed()) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() .into_inner() }) } diff --git a/linkerd/app/outbound/src/http/logical/policy.rs b/linkerd/app/outbound/src/http/logical/policy.rs index 7a79dc6985..de5c41f675 100644 --- a/linkerd/app/outbound/src/http/logical/policy.rs +++ b/linkerd/app/outbound/src/http/logical/policy.rs @@ -51,18 +51,7 @@ where /// services. pub(super) fn layer( route_backend_metrics: RouteBackendMetrics, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Self, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + Clone + ) -> impl svc::Layer> + Clone where // Inner stack. N: svc::NewService, Service = S>, @@ -89,7 +78,7 @@ where }, grpc.into_inner(), ) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() .into_inner() }) } diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index 864e2b3661..803ebfa664 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -84,18 +84,7 @@ where /// backends are expected to be cached/shared by the inner stack. pub(crate) fn layer( backend_metrics: backend::RouteBackendMetrics, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Self, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + Clone + ) -> impl svc::Layer> + Clone where // Inner stack. N: svc::NewService, Service = S>, @@ -126,14 +115,12 @@ where .push(classify::NewClassify::layer()) .push(svc::NewMapErr::layer_with(|rt: &Self| { let route = rt.params.route_ref.clone(); - move |source| { - Error::from(RouteError { - route: route.clone(), - source, - }) + move |source| RouteError { + route: route.clone(), + source, } })) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() .into_inner() }) } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs index f790f14b85..0d4bbf0a84 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs @@ -85,18 +85,7 @@ where /// filters. pub(crate) fn layer( metrics: RouteBackendMetrics, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Self, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + Clone + ) -> impl svc::Layer> + Clone where // Inner stack. N: svc::NewService, Service = S>, @@ -131,7 +120,7 @@ where }) } })) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() .into_inner() }) } diff --git a/linkerd/app/outbound/src/http/logical/policy/router.rs b/linkerd/app/outbound/src/http/logical/policy/router.rs index e2d39d8a6c..2262e9640e 100644 --- a/linkerd/app/outbound/src/http/logical/policy/router.rs +++ b/linkerd/app/outbound/src/http/logical/policy/router.rs @@ -71,18 +71,7 @@ where /// set of inner services so that. pub(super) fn layer( route_backend_metrics: RouteBackendMetrics, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Self, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + Clone + ) -> impl svc::Layer> + Clone where // Inner stack. N: svc::NewService, Service = S>, @@ -105,7 +94,7 @@ where // `SelectRoute` impl. .push_on_service(route::MatchedRoute::layer(route_backend_metrics.clone())) .push(svc::NewOneshotRoute::::layer_cached()) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() .into_inner() }) } diff --git a/linkerd/app/outbound/src/http/logical/profile.rs b/linkerd/app/outbound/src/http/logical/profile.rs index 8bdfa6db96..897e95e0e9 100644 --- a/linkerd/app/outbound/src/http/logical/profile.rs +++ b/linkerd/app/outbound/src/http/logical/profile.rs @@ -69,18 +69,7 @@ where /// we can reuse inner services. pub(super) fn layer( metrics: metrics::Proxy, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Self, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + Clone + ) -> impl svc::Layer> + Clone where N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, S: svc::Service< @@ -101,7 +90,7 @@ where // returned from the `SelectRoute` impl. .push_on_service(RouteParams::layer(metrics.clone())) .push(svc::NewOneshotRoute::, _, _>::layer_cached()) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() .into_inner() }) } @@ -257,18 +246,7 @@ where impl RouteParams { fn layer( metrics: metrics::Proxy, - ) -> impl svc::Layer< - N, - Service = svc::ArcNewService< - Self, - impl svc::Service< - http::Request, - Response = http::Response, - Error = Error, - Future = impl Send, - > + Clone, - >, - > + Clone + ) -> impl svc::Layer> + Clone where T: Clone + Debug + Eq + Hash + Send + Sync + 'static, N: svc::NewService, Service = S> + Clone + Send + Sync + 'static, @@ -329,7 +307,7 @@ impl RouteParams { // extension. .push(classify::NewClassify::layer()) .push_on_service(http::BoxResponse::layer()) - .push(svc::ArcNewService::layer()) + .arc_new_clone_http() .into_inner() }) } diff --git a/linkerd/app/outbound/src/opaq.rs b/linkerd/app/outbound/src/opaq.rs index c3961907dc..bc56f6e444 100644 --- a/linkerd/app/outbound/src/opaq.rs +++ b/linkerd/app/outbound/src/opaq.rs @@ -26,15 +26,7 @@ impl Outbound { /// /// This stack uses caching so that a router/load-balancer may be reused /// across multiple connections. - pub fn push_opaq_cached( - self, - resolve: R, - ) -> Outbound< - svc::ArcNewService< - T, - impl svc::Service + Clone, - >, - > + pub fn push_opaq_cached(self, resolve: R) -> Outbound> where // Opaque target T: svc::Param, @@ -58,7 +50,7 @@ impl Outbound { // Use a dedicated target type to configure parameters for // the opaque stack. It also helps narrow the cache key. .push_map_target(|t: T| Opaq(t.param())) - .push(svc::ArcNewService::layer()) + .arc_new_clone_tcp() }) } } diff --git a/linkerd/app/outbound/src/opaq/logical.rs b/linkerd/app/outbound/src/opaq/logical.rs index 78e980c55f..95cc517cd3 100644 --- a/linkerd/app/outbound/src/opaq/logical.rs +++ b/linkerd/app/outbound/src/opaq/logical.rs @@ -74,14 +74,7 @@ impl Outbound { /// services. Only available inner services are used for routing. When /// there are no available backends, requests are failed with a /// [`svc::stack::LoadShedError`]. - pub fn push_opaq_logical( - self, - ) -> Outbound< - svc::ArcNewService< - T, - impl svc::Service + Clone, - >, - > + pub fn push_opaq_logical(self) -> Outbound> where // Opaque logical target. T: svc::Param, @@ -142,7 +135,7 @@ impl Outbound { }, concrete.into_inner(), ) - .push(svc::ArcNewService::layer()) + .arc_new_clone_tcp() }) } } diff --git a/linkerd/stack/src/box_future.rs b/linkerd/stack/src/box_future.rs index 4285df2dff..a55fef2f4a 100644 --- a/linkerd/stack/src/box_future.rs +++ b/linkerd/stack/src/box_future.rs @@ -17,7 +17,7 @@ impl BoxFuture { Self(inner) } - pub fn layer() -> impl tower::Layer { + pub fn layer() -> impl tower::Layer + Copy { crate::layer::mk(Self::new) } } diff --git a/linkerd/stack/src/box_service.rs b/linkerd/stack/src/box_service.rs index 929a74de5b..9ab096a524 100644 --- a/linkerd/stack/src/box_service.rs +++ b/linkerd/stack/src/box_service.rs @@ -1,37 +1,97 @@ -use std::marker::PhantomData; +use linkerd_error::{Error, Result}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + pub use tower::util::BoxService; -#[derive(Copy, Debug)] -pub struct BoxServiceLayer { - _p: PhantomData, +pub struct BoxCloneSyncService { + inner: Box>, +} + +impl BoxCloneSyncService { + pub fn new(inner: S) -> Self + where + S: crate::Service + Send + Sync + Clone + 'static, + S::Error: Into, + S::Future: Send + 'static, + { + Self { + inner: Box::new(crate::BoxFuture::new(crate::MapErrBoxed::from(inner))), + } + } + + pub fn layer() -> impl crate::layer::Layer + Copy + where + S: crate::Service + Send + Sync + Clone + 'static, + S::Error: Into, + S::Future: Send + 'static, + { + crate::layer::mk(Self::new) + } } -impl tower::Layer for BoxServiceLayer +impl crate::Service for BoxCloneSyncService where - S: tower::Service + Send + 'static, - S::Future: Send + 'static, - S::Error: Send + 'static, + Req: Send + 'static, { - type Service = BoxService; - fn layer(&self, s: S) -> Self::Service { - BoxService::new(s) + type Response = Rsp; + type Error = Error; + type Future = Pin> + Send>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) } -} -impl BoxServiceLayer { - pub fn new() -> Self { - Self { _p: PhantomData } + #[inline] + fn call(&mut self, req: Req) -> Self::Future { + self.inner.call(req) } } -impl Clone for BoxServiceLayer { +impl Clone for BoxCloneSyncService { fn clone(&self) -> Self { - Self::new() + Self { + inner: self.inner.box_clone_sync(), + } } } -impl Default for BoxServiceLayer { - fn default() -> Self { - Self::new() +mod sealed { + use crate::Service; + use linkerd_error::{Error, Result}; + use std::{future::Future, pin::Pin}; + + pub trait BoxCloneSyncService: + Service< + Req, + Response = Rsp, + Error = Error, + Future = Pin> + Send>>, + > + Send + + Sync + { + fn box_clone_sync(&self) -> Box>; + } + + // Implement the trait for Box + impl BoxCloneSyncService for S + where + S: Service< + Req, + Error = Error, + Response = Rsp, + Future = Pin> + Send>>, + > + Clone + + Send + + Sync + + 'static, + { + fn box_clone_sync(&self) -> Box> { + Box::new(self.clone()) + } } } diff --git a/linkerd/stack/src/lib.rs b/linkerd/stack/src/lib.rs index b419d03c47..9905420f46 100644 --- a/linkerd/stack/src/lib.rs +++ b/linkerd/stack/src/lib.rs @@ -33,7 +33,7 @@ mod watch; pub use self::{ arc_new_service::ArcNewService, box_future::BoxFuture, - box_service::{BoxService, BoxServiceLayer}, + box_service::{BoxCloneSyncService, BoxService}, connect::{MakeConnection, WithoutConnectionMetadata}, either::{Either, NewEither}, fail::Fail, @@ -59,7 +59,7 @@ pub use self::{ }; pub use tower::{ service_fn, - util::{self, future_service, BoxCloneService, FutureService, Oneshot, ServiceExt}, + util::{self, future_service, FutureService, Oneshot, ServiceExt}, Service, };