diff --git a/.github/workflows/check-each.yml b/.github/workflows/check-each.yml index 40b828d461..934fe79e70 100644 --- a/.github/workflows/check-each.yml +++ b/.github/workflows/check-each.yml @@ -30,7 +30,7 @@ jobs: steps: - uses: actions/checkout@8f4b7f84864484a7bf31766abe9204da3cbe65b3 - run: git config --global --add safe.directory "$PWD" # actions/runner#2033 - - uses: tj-actions/changed-files@db5dd7c176cf59a19ef6561bf1936f059dee4b74 + - uses: tj-actions/changed-files@c9124514c375de5dbb9697afa6f2e36a236ee58c id: changed-files with: files: | diff --git a/.github/workflows/fuzzers.yml b/.github/workflows/fuzzers.yml index ac2893114b..adc55fefe1 100644 --- a/.github/workflows/fuzzers.yml +++ b/.github/workflows/fuzzers.yml @@ -32,7 +32,7 @@ jobs: - run: apt update && apt install -y jo - uses: actions/checkout@8f4b7f84864484a7bf31766abe9204da3cbe65b3 - run: git config --global --add safe.directory "$PWD" # actions/runner#2033 - - uses: tj-actions/changed-files@db5dd7c176cf59a19ef6561bf1936f059dee4b74 + - uses: tj-actions/changed-files@c9124514c375de5dbb9697afa6f2e36a236ee58c id: changed-files - name: list changed crates id: list-changed diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e5caa046c2..468fd46c80 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -37,11 +37,11 @@ jobs: exit 1 fi ( echo publish=true - echo version="$ver" + echo version="${ver#v}" ) >> "$GITHUB_OUTPUT" else sha="${{ github.sha }}" - echo version="test-${sha:0:7}" >> "$GITHUB_OUTPUT" + echo version="0.0.0-test.${sha:0:7}" >> "$GITHUB_OUTPUT" fi outputs: publish: ${{ steps.meta.outputs.publish }} @@ -60,13 +60,16 @@ jobs: runs-on: ubuntu-latest timeout-minutes: 40 container: docker://ghcr.io/linkerd/dev:v40-rust-musl + env: + LINKERD2_PROXY_VENDOR: ${{ github.repository_owner }} + LINKERD2_PROXY_VERSION: ${{ needs.meta.outputs.version }} steps: - uses: actions/checkout@8f4b7f84864484a7bf31766abe9204da3cbe65b3 - run: git config --global --add safe.directory "$PWD" # actions/runner#2033 - run: just fetch - run: just arch=${{ matrix.arch }} libc=${{ matrix.libc }} rustup - run: just arch=${{ matrix.arch }} libc=${{ matrix.libc }} profile=release build - - run: just arch=${{ matrix.arch }} libc=${{ matrix.libc }} profile=release package_version=${{ needs.meta.outputs.version }} package + - run: just arch=${{ matrix.arch }} libc=${{ matrix.libc }} profile=release package - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce with: name: ${{ matrix.arch }}-artifacts @@ -84,6 +87,6 @@ jobs: - if: needs.meta.outputs.publish uses: softprops/action-gh-release@de2c0eb89ae2a093876385947365aca7b0e5f844 with: - name: ${{ needs.meta.outputs.version }} + name: v${{ needs.meta.outputs.version }} files: artifacts/**/* generate_release_notes: true diff --git a/Cargo.lock b/Cargo.lock index 001c63f606..f30a76372d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,6 +938,7 @@ dependencies = [ "pin-project", "quickcheck", "regex", + "semver 1.0.17", "serde_json", "thiserror", "tokio", @@ -1892,9 +1893,9 @@ dependencies = [ [[package]] name = "linkerd2-proxy-api" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5191a6b6a0d97519b4746c09a5e92cb9f586cb808d1828f6d7f9889e9ba24d" +checksum = "597facef5c3f12aece4d18a5e3dbba88288837b0b5d8276681d063e4c9b98a14" dependencies = [ "h2", "http", @@ -2396,7 +2397,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" dependencies = [ - "semver", + "semver 0.9.0", ] [[package]] @@ -2471,6 +2472,12 @@ dependencies = [ "semver-parser", ] +[[package]] +name = "semver" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" + [[package]] name = "semver-parser" version = "0.7.0" diff --git a/Dockerfile b/Dockerfile index e5ec830aed..8f060d56ef 100644 --- a/Dockerfile +++ b/Dockerfile @@ -31,8 +31,9 @@ RUN --mount=type=cache,id=cargo,target=/usr/local/cargo/registry \ just fetch ARG TARGETARCH="amd64" ARG PROFILE="release" -RUN --mount=type=cache,id=target,target=target \ - --mount=type=cache,id=cargo,target=/usr/local/cargo/registry \ +ARG LINKERD2_PROXY_VERSION="" +ARG LINKERD2_PROXY_VENDOR="" +RUN --mount=type=cache,id=cargo,target=/usr/local/cargo/registry \ just arch="$TARGETARCH" features="$PROXY_FEATURES" profile="$PROFILE" build && \ mkdir -p /out && \ mv $(just --evaluate profile="$PROFILE" _target_bin) /out/linkerd2-proxy diff --git a/justfile b/justfile index 3e1de4f528..aa4ee39ffb 100644 --- a/justfile +++ b/justfile @@ -15,8 +15,11 @@ toolchain := "" features := "" +export LINKERD2_PROXY_VERSION := env_var_or_default("LINKERD2_PROXY_VERSION", "0.0.0-dev." + `git rev-parse --short HEAD`) +export LINKERD2_PROXY_VENDOR := env_var_or_default("LINKERD2_PROXY_VENDOR", `whoami` + "@" + `hostname`) + # The version name to use for packages. -package_version := `git rev-parse --short HEAD` +package_version := "v" + LINKERD2_PROXY_VERSION # Docker image name & tag. docker-repo := "localhost/linkerd/proxy" @@ -175,6 +178,9 @@ docker *args='--output=type=docker': && _clean-cache --pull \ --tag={{ docker-image }} \ --build-arg PROFILE='{{ profile }}' \ + --build-arg LINKERD2_PROXY_VENDOR='{{ LINKERD2_PROXY_VENDOR }}' \ + --build-arg LINKERD2_PROXY_VERSION='{{ LINKERD2_PROXY_VERSION }}' \ + --no-cache-filter=runtime \ {{ if linkerd-tag == '' { '' } else { '--build-arg=RUNTIME_IMAGE=ghcr.io/linkerd/proxy:' + linkerd-tag } }} \ {{ if features != "" { "--build-arg PROXY_FEATURES=" + features } else { "" } }} \ {{ if DOCKER_BUILDX_CACHE_DIR == '' { '' } else { '--cache-from=type=local,src=' + DOCKER_BUILDX_CACHE_DIR + ' --cache-to=type=local,dest=' + DOCKER_BUILDX_CACHE_DIR } }} \ diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index b60764a652..1a98d081fe 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -76,5 +76,8 @@ features = ["make", "spawn-ready", "timeout", "util", "limit"] [target.'cfg(target_os = "linux")'.dependencies] linkerd-system = { path = "../../system" } +[build-dependencies] +semver = "1" + [dev-dependencies] quickcheck = { version = "1", default-features = false } diff --git a/linkerd/app/core/build.rs b/linkerd/app/core/build.rs index f1c2a3c202..e1db4f96de 100644 --- a/linkerd/app/core/build.rs +++ b/linkerd/app/core/build.rs @@ -1,5 +1,4 @@ use std::process::Command; -use std::string::String; fn set_env(name: &str, cmd: &mut Command) { let value = match cmd.output() { @@ -12,21 +11,38 @@ fn set_env(name: &str, cmd: &mut Command) { println!("cargo:rustc-env={}={}", name, value); } +fn version() -> String { + if let Ok(v) = std::env::var("LINKERD2_PROXY_VERSION") { + if !v.is_empty() { + if semver::Version::parse(&v).is_err() { + panic!("LINKERD2_PROXY_VERSION must be semver"); + } + return v; + } + } + + "0.0.0-dev".to_string() +} + +fn vendor() -> String { + std::env::var("LINKERD2_PROXY_VENDOR").unwrap_or_default() +} + fn main() { - set_env( - "GIT_BRANCH", - Command::new("git").args(["rev-parse", "--abbrev-ref", "HEAD"]), - ); set_env( "GIT_SHA", Command::new("git").args(["rev-parse", "--short", "HEAD"]), ); + + // Capture the ISO 8601 formatted UTC time. set_env( - "GIT_VERSION", - Command::new("git").args(["describe", "--always", "HEAD"]), + "LINKERD2_PROXY_BUILD_DATE", + Command::new("date").args(["-u", "+%Y-%m-%dT%H:%M:%SZ"]), ); - set_env("RUST_VERSION", Command::new("rustc").arg("--version")); - let profile = std::env::var("PROFILE").unwrap(); - println!("cargo:rustc-env=PROFILE={}", profile); + println!("cargo:rustc-env=LINKERD2_PROXY_VERSION={}", version()); + println!("cargo:rustc-env=LINKERD2_PROXY_VENDOR={}", vendor()); + + let profile = std::env::var("PROFILE").expect("PROFILE must be set"); + println!("cargo:rustc-env=PROFILE={profile}"); } diff --git a/linkerd/app/core/src/metrics.rs b/linkerd/app/core/src/metrics.rs index 831a6a7361..baee0aa785 100644 --- a/linkerd/app/core/src/metrics.rs +++ b/linkerd/app/core/src/metrics.rs @@ -149,7 +149,7 @@ impl Metrics { ) -> (Self, impl FmtMetrics + Clone + Send + 'static) { let process = telemetry::process::Report::new(start_time); - let build_info = telemetry::build_info::Report::new(); + let build_info = telemetry::build_info::Report::default(); let (control, control_report) = { let m = metrics::Requests::::default(); diff --git a/linkerd/app/core/src/telemetry/build_info.rs b/linkerd/app/core/src/telemetry/build_info.rs index d6f7928bef..5e96de5aad 100644 --- a/linkerd/app/core/src/telemetry/build_info.rs +++ b/linkerd/app/core/src/telemetry/build_info.rs @@ -1,14 +1,11 @@ use linkerd_metrics::{metrics, FmtLabels, FmtMetric, FmtMetrics, Gauge}; -use std::env; -use std::fmt; -use std::string::String; -use std::sync::Arc; +use std::{env, fmt}; -const GIT_BRANCH: &str = env!("GIT_BRANCH"); -const GIT_SHA: &str = env!("GIT_SHA"); -const GIT_VERSION: &str = env!("GIT_VERSION"); -const PROFILE: &str = env!("PROFILE"); -const RUST_VERSION: &str = env!("RUST_VERSION"); +pub const VERSION: &str = env!("LINKERD2_PROXY_VERSION"); +pub const DATE: &str = env!("LINKERD2_PROXY_BUILD_DATE"); +pub const VENDOR: &str = env!("LINKERD2_PROXY_VENDOR"); +pub const GIT_SHA: &str = env!("GIT_SHA"); +pub const PROFILE: &str = env!("PROFILE"); metrics! { proxy_build_info: Gauge { @@ -17,56 +14,25 @@ metrics! { } #[derive(Clone, Debug, Default)] -pub struct Report { - name: String, - // `value` remains constant over the lifetime of the proxy so that - // build information in `labels` remains accurate - value: Arc, - labels: Arc, -} - -#[derive(Clone, Debug, Default)] -struct BuildInfoLabels { - git_branch: String, - git_sha: String, - git_version: String, - profile: String, - rust_version: String, -} +pub struct Report(()); -impl Report { - pub fn new() -> Self { - let labels = Arc::new(BuildInfoLabels { - git_branch: GIT_BRANCH.to_string(), - git_sha: GIT_SHA.to_string(), - git_version: GIT_VERSION.to_string(), - profile: PROFILE.to_string(), - rust_version: RUST_VERSION.to_string(), - }); - Self { - name: "proxy_build_info".to_string(), - value: Arc::new(1.into()), - labels, - } - } -} +struct Labels; impl FmtMetrics for Report { fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { proxy_build_info.fmt_help(f)?; - self.value - .fmt_metric_labeled(f, self.name.as_str(), self.labels.as_ref())?; + Gauge::from(1).fmt_metric_labeled(f, "proxy_build_info", &Labels)?; Ok(()) } } -impl FmtLabels for BuildInfoLabels { +impl FmtLabels for Labels { fn fmt_labels(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "git_branch=\"{}\"", self.git_branch)?; - write!(f, ",git_sha=\"{}\"", self.git_sha)?; - write!(f, ",git_version=\"{}\"", self.git_version)?; - write!(f, ",profile=\"{}\"", self.profile)?; - write!(f, ",rust_version=\"{}\"", self.rust_version)?; + write!(f, "version=\"{VERSION}\"")?; + write!(f, ",git_sha=\"{GIT_SHA}\"")?; + write!(f, ",profile=\"{PROFILE}\"")?; + write!(f, ",date=\"{DATE}\"")?; + write!(f, ",vendor=\"{VENDOR}\"")?; Ok(()) } } diff --git a/linkerd/app/inbound/Cargo.toml b/linkerd/app/inbound/Cargo.toml index c996eff9b5..69f93f1ee7 100644 --- a/linkerd/app/inbound/Cargo.toml +++ b/linkerd/app/inbound/Cargo.toml @@ -29,7 +29,7 @@ linkerd-meshtls = { path = "../../meshtls", optional = true } linkerd-meshtls-rustls = { path = "../../meshtls/rustls", optional = true } linkerd-proxy-client-policy = { path = "../../proxy/client-policy" } linkerd-tonic-watch = { path = "../../tonic-watch" } -linkerd2-proxy-api = { version = "0.9", features = ["inbound"] } +linkerd2-proxy-api = { version = "0.10", features = ["inbound"] } once_cell = "1" parking_lot = "0.12" rangemap = "1" diff --git a/linkerd/app/integration/Cargo.toml b/linkerd/app/integration/Cargo.toml index 39080a0621..7785de83a8 100644 --- a/linkerd/app/integration/Cargo.toml +++ b/linkerd/app/integration/Cargo.toml @@ -34,7 +34,7 @@ ipnet = "2" linkerd-app = { path = "..", features = ["allow-loopback"] } linkerd-app-core = { path = "../core" } linkerd-metrics = { path = "../../metrics", features = ["test_util"] } -linkerd2-proxy-api = { version = "0.9", features = [ +linkerd2-proxy-api = { version = "0.10", features = [ "destination", "arbitrary", ] } diff --git a/linkerd/app/integration/src/policy.rs b/linkerd/app/integration/src/policy.rs index 9ba0b339e4..004baa2efc 100644 --- a/linkerd/app/integration/src/policy.rs +++ b/linkerd/app/integration/src/policy.rs @@ -151,6 +151,7 @@ pub fn outbound_default_http_route(dst: impl ToString) -> outbound::HttpRoute { }], filters: Vec::new(), backends: Some(http_first_available(std::iter::once(backend(dst)))), + request_timeout: None, }], } } @@ -214,6 +215,7 @@ pub fn http_first_available( .map(|backend| http_route::RouteBackend { backend: Some(backend), filters: Vec::new(), + request_timeout: None, }) .collect(), }, diff --git a/linkerd/app/integration/src/tests/client_policy.rs b/linkerd/app/integration/src/tests/client_policy.rs index e035390ec2..50de7fa640 100644 --- a/linkerd/app/integration/src/tests/client_policy.rs +++ b/linkerd/app/integration/src/tests/client_policy.rs @@ -223,6 +223,7 @@ async fn header_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(dst), ))), + request_timeout: None, }; let route = outbound::HttpRoute { @@ -236,6 +237,7 @@ async fn header_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(&dst_world), ))), + request_timeout: None, }, // x-hello-city: sf | x-hello-city: san francisco mk_header_rule( @@ -398,6 +400,8 @@ async fn path_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(dst), ))), + + request_timeout: None, }; let route = outbound::HttpRoute { @@ -411,6 +415,7 @@ async fn path_based_routing() { backends: Some(policy::http_first_available(std::iter::once( policy::backend(&dst_world), ))), + request_timeout: None, }, // /goodbye/* mk_path_rule( diff --git a/linkerd/app/outbound/Cargo.toml b/linkerd/app/outbound/Cargo.toml index 5d8696652c..dc075e5052 100644 --- a/linkerd/app/outbound/Cargo.toml +++ b/linkerd/app/outbound/Cargo.toml @@ -20,7 +20,7 @@ ahash = "0.8" bytes = "1" http = "0.2" futures = { version = "0.3", default-features = false } -linkerd2-proxy-api = { version = "0.9", features = ["outbound"] } +linkerd2-proxy-api = { version = "0.10", features = ["outbound"] } linkerd-app-core = { path = "../core" } linkerd-app-test = { path = "../test", optional = true } linkerd-distribute = { path = "../../distribute" } diff --git a/linkerd/app/outbound/src/discover.rs b/linkerd/app/outbound/src/discover.rs index 0b5e32f728..7d99e442b0 100644 --- a/linkerd/app/outbound/src/discover.rs +++ b/linkerd/app/outbound/src/discover.rs @@ -191,25 +191,60 @@ pub fn synthesize_forward_policy( queue: policy::Queue, addr: SocketAddr, metadata: policy::EndpointMetadata, +) -> ClientPolicy { + policy_for_backend( + meta, + timeout, + policy::Backend { + meta: meta.clone(), + queue, + dispatcher: policy::BackendDispatcher::Forward(addr, metadata), + }, + ) +} + +pub fn synthesize_balance_policy( + meta: &Arc, + timeout: Duration, + queue: policy::Queue, + load: policy::Load, + addr: impl ToString, +) -> ClientPolicy { + policy_for_backend( + meta, + timeout, + policy::Backend { + meta: meta.clone(), + queue, + dispatcher: policy::BackendDispatcher::BalanceP2c( + load, + policy::EndpointDiscovery::DestinationGet { + path: addr.to_string(), + }, + ), + }, + ) +} + +fn policy_for_backend( + meta: &Arc, + timeout: Duration, + backend: policy::Backend, ) -> ClientPolicy { static NO_HTTP_FILTERS: Lazy> = Lazy::new(|| Arc::new([])); static NO_OPAQ_FILTERS: Lazy> = Lazy::new(|| Arc::new([])); - let backend = policy::Backend { - meta: meta.clone(), - queue, - dispatcher: policy::BackendDispatcher::Forward(addr, metadata), - }; - let opaque = policy::opaq::Opaque { policy: Some(policy::opaq::Policy { meta: meta.clone(), filters: NO_OPAQ_FILTERS.clone(), failure_policy: Default::default(), + request_timeout: None, distribution: policy::RouteDistribution::FirstAvailable(Arc::new([ policy::RouteBackend { filters: NO_OPAQ_FILTERS.clone(), backend: backend.clone(), + request_timeout: None, }, ])), }), @@ -223,10 +258,12 @@ pub fn synthesize_forward_policy( meta: meta.clone(), filters: NO_HTTP_FILTERS.clone(), failure_policy: Default::default(), + request_timeout: None, distribution: policy::RouteDistribution::FirstAvailable(Arc::new([ policy::RouteBackend { filters: NO_HTTP_FILTERS.clone(), backend: backend.clone(), + request_timeout: None, }, ])), }, diff --git a/linkerd/app/outbound/src/http/concrete.rs b/linkerd/app/outbound/src/http/concrete.rs index 0fbfcca0cd..e690530f6e 100644 --- a/linkerd/app/outbound/src/http/concrete.rs +++ b/linkerd/app/outbound/src/http/concrete.rs @@ -42,7 +42,7 @@ pub struct DispatcherFailed(Arc); /// Wraps errors encountered in this module. #[derive(Debug, thiserror::Error)] -#[error("{} {}.{}: {source}", backend.0.kind(), backend.0.name(), backend.0.namespace())] +#[error("{}: {source}", backend.0)] pub struct BalanceError { backend: BackendRef, #[source] diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index 45a5596acb..3ef64224cc 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -30,6 +30,7 @@ pub(crate) struct Route { pub(super) filters: Arc<[F]>, pub(super) distribution: BackendDistribution, pub(super) failure_policy: E, + pub(super) request_timeout: Option, } pub(crate) type MatchedRoute = Matched>; @@ -49,6 +50,15 @@ pub(crate) type Grpc = MatchedRoute< pub(crate) type BackendDistribution = distribute::Distribution>; pub(crate) type NewDistribute = distribute::NewDistribute, (), N>; +/// Wraps errors with route metadata. +#[derive(Debug, thiserror::Error)] +#[error("route {}: {source}", route.0)] +struct RouteError { + route: RouteRef, + #[source] + source: Error, +} + // === impl MatchedRoute === impl MatchedRoute @@ -111,7 +121,18 @@ where .push_on_service(svc::LoadShed::layer()) // TODO(ver) attach the `E` typed failure policy to requests. .push(filters::NewApplyFilters::::layer()) + // Sets an optional request timeout. + .push(http::NewTimeout::layer()) .push(classify::NewClassify::layer()) + .push(svc::NewMapErr::layer_with(|rt: &Self| { + let route = rt.params.route_ref.clone(); + move |source| { + Error::from(RouteError { + route: route.clone(), + source, + }) + } + })) .push(svc::ArcNewService::layer()) .into_inner() }) @@ -124,6 +145,12 @@ impl svc::Param> for MatchedRoute svc::Param for MatchedRoute { + fn param(&self) -> http::timeout::ResponseTimeout { + http::timeout::ResponseTimeout(self.params.request_timeout) + } +} + impl filters::Apply for Http { #[inline] fn apply(&self, req: &mut ::http::Request) -> Result<()> { diff --git a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs index a0d335c000..51594fa257 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/backend.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/backend.rs @@ -1,5 +1,5 @@ use super::{super::Concrete, filters}; -use crate::RouteRef; +use crate::{BackendRef, RouteRef}; use linkerd_app_core::{proxy::http, svc, Error, Result}; use linkerd_http_route as http_route; use linkerd_proxy_client_policy as policy; @@ -16,6 +16,7 @@ pub(crate) struct Backend { pub(crate) route_ref: RouteRef, pub(crate) concrete: Concrete, pub(crate) filters: Arc<[F]>, + pub(crate) request_timeout: Option, } pub(crate) type MatchedBackend = super::Matched>; @@ -29,6 +30,15 @@ pub struct ExtractMetrics { metrics: RouteBackendMetrics, } +/// Wraps errors with backend metadata. +#[derive(Debug, thiserror::Error)] +#[error("backend {}: {source}", backend.0)] +struct BackendError { + backend: BackendRef, + #[source] + source: Error, +} + // === impl Backend === impl Clone for Backend { @@ -37,6 +47,7 @@ impl Clone for Backend { route_ref: self.route_ref.clone(), filters: self.filters.clone(), concrete: self.concrete.clone(), + request_timeout: self.request_timeout, } } } @@ -107,15 +118,31 @@ where }| concrete, ) .push(filters::NewApplyFilters::::layer()) + .push(http::NewTimeout::layer()) .push(count_reqs::NewCountRequests::layer_via(ExtractMetrics { metrics: metrics.clone(), })) + .push(svc::NewMapErr::layer_with(|t: &Self| { + let backend = t.params.concrete.backend_ref.clone(); + move |source| { + Error::from(BackendError { + backend: backend.clone(), + source, + }) + } + })) .push(svc::ArcNewService::layer()) .into_inner() }) } } +impl svc::Param for MatchedBackend { + fn param(&self) -> http::ResponseTimeout { + http::ResponseTimeout(self.params.request_timeout) + } +} + impl filters::Apply for Http { #[inline] fn apply(&self, req: &mut ::http::Request) -> Result<()> { diff --git a/linkerd/app/outbound/src/http/logical/policy/router.rs b/linkerd/app/outbound/src/http/logical/policy/router.rs index 9e6786a129..e2d39d8a6c 100644 --- a/linkerd/app/outbound/src/http/logical/policy/router.rs +++ b/linkerd/app/outbound/src/http/logical/policy/router.rs @@ -179,6 +179,7 @@ where route_ref: route_ref.clone(), filters, concrete, + request_timeout: rb.request_timeout, } }; @@ -204,6 +205,7 @@ where filters, distribution, failure_policy, + request_timeout, }| { let route_ref = RouteRef(meta); let distribution = mk_distribution(&route_ref, &distribution); @@ -214,6 +216,7 @@ where filters, failure_policy, distribution, + request_timeout, } }; diff --git a/linkerd/app/outbound/src/http/logical/policy/tests.rs b/linkerd/app/outbound/src/http/logical/policy/tests.rs index 2d47830864..866a6ee2f4 100644 --- a/linkerd/app/outbound/src/http/logical/policy/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/tests.rs @@ -48,9 +48,11 @@ async fn header_based_route() { }), filters: Arc::new([]), failure_policy: Default::default(), + request_timeout: None, distribution: policy::RouteDistribution::FirstAvailable(Arc::new([policy::RouteBackend { filters: Arc::new([]), backend, + request_timeout: None, }])), }; @@ -197,6 +199,7 @@ async fn http_filter_request_headers() { policy: policy::RoutePolicy { meta: policy::Meta::new_default("turtles"), failure_policy: Default::default(), + request_timeout: None, filters: Arc::new([policy::http::Filter::RequestHeaders( policy::http::filter::ModifyHeader { add: vec![(PIZZA.clone(), TUBULAR.clone())], @@ -212,6 +215,7 @@ async fn http_filter_request_headers() { ..Default::default() }, )]), + request_timeout: None, }, ])), }, diff --git a/linkerd/app/outbound/src/http/logical/tests.rs b/linkerd/app/outbound/src/http/logical/tests.rs index d38d6f86e9..429b9e8390 100644 --- a/linkerd/app/outbound/src/http/logical/tests.rs +++ b/linkerd/app/outbound/src/http/logical/tests.rs @@ -285,6 +285,148 @@ async fn balancer_doesnt_select_tripped_breakers() { } } +#[tokio::test(flavor = "current_thread")] +async fn route_request_timeout() { + tokio::time::pause(); + let _trace = trace::test::trace_init(); + const REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(2); + + let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT); + let dest: NameAddr = format!("{AUTHORITY}:{PORT}") + .parse::() + .expect("dest addr is valid"); + let (svc, mut handle) = tower_test::mock::pair(); + let connect = HttpConnect::default().service(addr, svc); + let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); + let (rt, _shutdown) = runtime(); + let stack = Outbound::new(default_config(), rt) + .with_stack(connect) + .push_http_cached(resolve) + .into_inner(); + + let (_route_tx, routes) = { + let backend = default_backend(&dest); + // Set a request timeout for the route, and no backend request timeout + // on the backend. + let route = timeout_route(backend.clone(), Some(REQUEST_TIMEOUT), None); + watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend]), + routes: Arc::new([route]), + failure_accrual: client_policy::FailureAccrual::None, + }))) + }; + let target = Target { + num: 1, + version: http::Version::H2, + routes, + }; + let svc = stack.new_service(target); + + handle.allow(1); + let rsp = send_req(svc.clone(), http::Request::get("/")); + serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; + assert_eq!( + rsp.await.expect("request must succeed").status(), + http::StatusCode::OK + ); + + // now, time out... + let rsp = send_req(svc.clone(), http::Request::get("/")); + tokio::time::sleep(REQUEST_TIMEOUT).await; + let error = rsp.await.expect_err("request must fail with a timeout"); + assert!( + error.is::(), + "error must originate in the logical stack" + ); + assert!(errors::is_caused_by::( + error.as_ref() + )); +} + +#[tokio::test(flavor = "current_thread")] +async fn backend_request_timeout() { + tokio::time::pause(); + let _trace = trace::test::trace_init(); + // must be less than the `default_config` failfast timeout, or we'll hit + // that instead. + const ROUTE_REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(2); + const BACKEND_REQUEST_TIMEOUT: Duration = std::time::Duration::from_secs(1); + + let addr = SocketAddr::new([192, 0, 2, 41].into(), PORT); + let dest: NameAddr = format!("{AUTHORITY}:{PORT}") + .parse::() + .expect("dest addr is valid"); + let (svc, mut handle) = tower_test::mock::pair(); + let connect = HttpConnect::default().service(addr, svc); + let resolve = support::resolver().endpoint_exists(dest.clone(), addr, Default::default()); + let (rt, _shutdown) = runtime(); + let stack = Outbound::new(default_config(), rt) + .with_stack(connect) + .push_http_cached(resolve) + .into_inner(); + + let (_route_tx, routes) = { + let backend = default_backend(&dest); + // Set both a route request timeout and a backend request timeout. + let route = timeout_route( + backend.clone(), + Some(ROUTE_REQUEST_TIMEOUT), + Some(BACKEND_REQUEST_TIMEOUT), + ); + watch::channel(Routes::Policy(policy::Params::Http(policy::HttpParams { + addr: dest.into(), + meta: ParentRef(client_policy::Meta::new_default("parent")), + backends: Arc::new([backend]), + routes: Arc::new([route]), + failure_accrual: client_policy::FailureAccrual::None, + }))) + }; + let target = Target { + num: 1, + version: http::Version::H2, + routes, + }; + let svc = stack.new_service(target); + + handle.allow(1); + let rsp = send_req(svc.clone(), http::Request::get("/")); + serve_req(&mut handle, mk_rsp(StatusCode::OK, "good")).await; + assert_eq!( + rsp.await.expect("request must succeed").status(), + http::StatusCode::OK + ); + + // Now, time out... + let rsp = send_req(svc.clone(), http::Request::get("/")); + // Wait until we actually get the request --- this timeout only starts once + // the service has been acquired. + handle.allow(1); + let (_, send_rsp) = handle + .next_request() + .await + .expect("service must receive request"); + tokio::time::sleep(BACKEND_REQUEST_TIMEOUT + Duration::from_millis(1)).await; + // Still send a response, so that if we didn't hit the backend timeout + // timeout, we don't hit the route timeout and succeed incorrectly. + send_rsp.send_response(mk_rsp(StatusCode::OK, "good")); + let error = rsp.await.expect_err("request must fail with a timeout"); + assert!(errors::is_caused_by::( + error.as_ref() + )); + + // The route request timeout should still apply to time spent before + // the backend is acquired. + let rsp = send_req(svc.clone(), http::Request::get("/")); + tokio::time::sleep(ROUTE_REQUEST_TIMEOUT + Duration::from_millis(1)).await; + handle.allow(1); + let error = rsp.await.expect_err("request must fail with a timeout"); + assert!(errors::is_caused_by::( + error.as_ref() + )); +} + #[derive(Clone, Debug)] struct Target { num: usize, @@ -448,9 +590,41 @@ fn default_route(backend: client_policy::Backend) -> client_policy::http::Route meta: Meta::new_default("test_route"), filters: NO_FILTERS.clone(), failure_policy: Default::default(), + request_timeout: None, + distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { + filters: NO_FILTERS.clone(), + backend, + request_timeout: None, + }])), + }, + }], + } +} + +fn timeout_route( + backend: client_policy::Backend, + route_timeout: Option, + backend_timeout: Option, +) -> client_policy::http::Route { + use client_policy::{ + http::{self, Filter, Policy, Route, Rule}, + Meta, RouteBackend, RouteDistribution, + }; + use once_cell::sync::Lazy; + static NO_FILTERS: Lazy> = Lazy::new(|| Arc::new([])); + Route { + hosts: vec![], + rules: vec![Rule { + matches: vec![http::r#match::MatchRequest::default()], + policy: Policy { + meta: Meta::new_default("test_route"), + filters: NO_FILTERS.clone(), + failure_policy: Default::default(), + request_timeout: route_timeout, distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { filters: NO_FILTERS.clone(), backend, + request_timeout: backend_timeout, }])), }, }], diff --git a/linkerd/app/outbound/src/ingress.rs b/linkerd/app/outbound/src/ingress.rs index c409b628d1..0c2453f337 100644 --- a/linkerd/app/outbound/src/ingress.rs +++ b/linkerd/app/outbound/src/ingress.rs @@ -1,18 +1,21 @@ use crate::{http, opaq, policy, Config, Discovery, Outbound, ParentRef}; use linkerd_app_core::{ config::{ProxyConfig, ServerConfig}, - detect, io, profiles, + detect, errors, io, profiles, proxy::{ api_resolve::{ConcreteAddr, Metadata}, core::Resolve, + http::balance, }, svc::{self, ServiceExt}, transport::addrs::*, Addr, Error, Infallible, NameAddr, Result, }; -use std::{fmt::Debug, hash::Hash}; +use once_cell::sync::Lazy; +use std::{fmt::Debug, hash::Hash, sync::Arc}; use thiserror::Error; use tokio::sync::watch; +use tracing::Instrument; #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct Http { @@ -79,20 +82,7 @@ impl Outbound<()> { // Endpoint resolver. R: Resolve, { - let discover = svc::mk(move |DiscoverAddr(addr)| { - let profile = profiles - .clone() - .get_profile(profiles::LookupAddr(addr.clone())); - let policy = policies.get_policy(addr); - Box::pin(async move { - let (profile, policy) = tokio::join!(profile, policy); - let profile = profile.unwrap_or_else(|error| { - tracing::warn!(%error, "Failed to resolve profile"); - None - }); - Ok((profile, policy?)) - }) - }); + let discover = self.ingress_resolver(profiles, policies); // The fallback stack is the same thing as the normal proxy stack, but // it doesn't include TCP metrics, since they are already instrumented @@ -124,6 +114,128 @@ impl Outbound<()> { .push_tcp_instrument(|t: &T| tracing::info_span!("ingress", addr = %t.param())) .into_inner() } + + fn ingress_resolver( + &self, + profiles: impl profiles::GetProfile, + policies: impl policy::GetPolicy, + ) -> impl svc::Service< + DiscoverAddr, + Error = Error, + Response = (Option, policy::Receiver), + Future = impl Send, + > + Clone + + Send + + Sync + + 'static { + let detect_timeout = self.config.proxy.detect_protocol_timeout; + let queue = { + let queue = self.config.tcp_connection_queue; + policy::Queue { + capacity: queue.capacity, + failfast_timeout: queue.failfast_timeout, + } + }; + let load = { + let balance::EwmaConfig { decay, default_rtt } = + crate::http::logical::profile::DEFAULT_EWMA; + policy::Load::PeakEwma(policy::PeakEwma { decay, default_rtt }) + }; + svc::mk(move |DiscoverAddr(addr)| { + tracing::debug!(%addr, "Discover"); + + let profile = profiles + .clone() + .get_profile(profiles::LookupAddr(addr.clone())) + .instrument(tracing::debug_span!("profiles")); + let policy = policies + .get_policy(addr) + .instrument(tracing::debug_span!("policy")); + + Box::pin(async move { + let (profile, policy) = tokio::join!(profile, policy); + tracing::debug!("Discovered"); + + let profile = profile.unwrap_or_else(|error| { + tracing::warn!(%error, "Error resolving ServiceProfile"); + None + }); + + // If there was a policy resolution, return it with the profile so + // the stack can determine how to switch on them. + let policy_error = match policy { + Ok(policy) => return Ok((profile, policy)), + // XXX(ver) The policy controller may (for the time being) reject + // our lookups, since it doesn't yet serve endpoint metadata for + // forwarding. + Err(error) if errors::has_grpc_status(&error, tonic::Code::NotFound) => { + tracing::debug!("Policy not found"); + error + } + // Earlier versions of the Linkerd control plane (e.g. + // 2.12.x) will return `Unimplemented` for requests to the + // OutboundPolicy API. Log a warning and synthesize a policy + // for backwards compatibility. + Err(error) if errors::has_grpc_status(&error, tonic::Code::Unimplemented) => { + tracing::warn!("Policy controller returned `Unimplemented`, the control plane may be out of date."); + error + } + Err(error) => return Err(error), + }; + + // If there was a profile resolution, try to use it to synthesize a + // policy. Otherwise, return an error, as we have neither a + // policy nor a ServiceProfile and can't synthesize a policy. + let profile = match profile { + Some(profile) => profile, + None => return Err(policy_error), + }; + + let policy = crate::discover::spawn_synthesized_profile_policy( + profile.clone().into(), + move |profile: &profiles::Profile| { + static ENDPOINT_META: Lazy> = Lazy::new(|| { + Arc::new(policy::Meta::Default { + name: "endpoint".into(), + }) + }); + static PROFILE_META: Lazy> = Lazy::new(|| { + Arc::new(policy::Meta::Default { + name: "profile".into(), + }) + }); + + if let Some((addr, meta)) = profile.endpoint.clone() { + return crate::discover::synthesize_forward_policy( + &ENDPOINT_META, + detect_timeout, + queue, + addr, + meta, + ); + } + + if let Some(ref logical) = profile.addr { + return crate::discover::synthesize_balance_policy( + &PROFILE_META, + detect_timeout, + queue, + load, + logical, + ); + } + + // Return an empty policy so that a + // `DiscoveryRejected` error is returned if + // selecting the policy. + policy::ClientPolicy::empty(detect_timeout) + }, + ); + + Ok((Some(profile), policy)) + }) + }) + } } impl Outbound { diff --git a/linkerd/app/test/src/resolver/client_policy.rs b/linkerd/app/test/src/resolver/client_policy.rs index 1b561f38e4..c8860425cf 100644 --- a/linkerd/app/test/src/resolver/client_policy.rs +++ b/linkerd/app/test/src/resolver/client_policy.rs @@ -73,9 +73,11 @@ impl ClientPolicies { meta: Meta::new_default("default"), filters: Arc::new([]), failure_policy: Default::default(), + request_timeout: None, distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { filters: Arc::new([]), backend: backend.clone(), + request_timeout: None, }])), }, }], @@ -96,9 +98,11 @@ impl ClientPolicies { meta: Meta::new_default("default"), filters: Arc::new([]), failure_policy: Default::default(), + request_timeout: None, distribution: RouteDistribution::FirstAvailable(Arc::new([RouteBackend { filters: Arc::new([]), backend: backend.clone(), + request_timeout: None, }])), }), }, diff --git a/linkerd/http-access-log/src/lib.rs b/linkerd/http-access-log/src/lib.rs index 8a785d64fd..c0ec70d379 100644 --- a/linkerd/http-access-log/src/lib.rs +++ b/linkerd/http-access-log/src/lib.rs @@ -133,6 +133,7 @@ where processing_ns = field::Empty, user_agent = get_header(http::header::USER_AGENT), host = get_header(http::header::HOST), + x_forwarded_for = get_header(http::header::HeaderName::from_static("x-forwarded-for")) ); // The access log span is only enabled by the `tracing` subscriber if diff --git a/linkerd/http-route/Cargo.toml b/linkerd/http-route/Cargo.toml index 7a2739587a..614e727baa 100644 --- a/linkerd/http-route/Cargo.toml +++ b/linkerd/http-route/Cargo.toml @@ -17,7 +17,7 @@ tracing = "0.1" url = "2" [dependencies.linkerd2-proxy-api] -version = "0.9" +version = "0.10" features = ["http-route", "grpc-route"] optional = true diff --git a/linkerd/proxy/api-resolve/Cargo.toml b/linkerd/proxy/api-resolve/Cargo.toml index 11b49d5130..5d64a2893d 100644 --- a/linkerd/proxy/api-resolve/Cargo.toml +++ b/linkerd/proxy/api-resolve/Cargo.toml @@ -14,7 +14,7 @@ async-stream = "0.3" futures = { version = "0.3", default-features = false } linkerd-addr = { path = "../../addr" } linkerd-error = { path = "../../error" } -linkerd2-proxy-api = { version = "0.9", features = ["destination"] } +linkerd2-proxy-api = { version = "0.10", features = ["destination"] } linkerd-proxy-core = { path = "../core" } linkerd-stack = { path = "../../stack" } linkerd-tls = { path = "../../tls" } diff --git a/linkerd/proxy/client-policy/Cargo.toml b/linkerd/proxy/client-policy/Cargo.toml index cc364bb559..8216131965 100644 --- a/linkerd/proxy/client-policy/Cargo.toml +++ b/linkerd/proxy/client-policy/Cargo.toml @@ -18,7 +18,7 @@ proto = [ ahash = "0.8" ipnet = "2" http = "0.2" -linkerd2-proxy-api = { version = "0.9", optional = true, features = [ +linkerd2-proxy-api = { version = "0.10", optional = true, features = [ "outbound", ] } linkerd-error = { path = "../../error" } diff --git a/linkerd/proxy/client-policy/src/grpc.rs b/linkerd/proxy/client-policy/src/grpc.rs index 69df0dd04d..7f6a05fc2c 100644 --- a/linkerd/proxy/client-policy/src/grpc.rs +++ b/linkerd/proxy/client-policy/src/grpc.rs @@ -37,6 +37,7 @@ pub fn default(distribution: crate::RouteDistribution) -> Route { filters: Arc::new([]), distribution, failure_policy: Codes::default(), + request_timeout: None, }, }], } @@ -101,6 +102,7 @@ pub mod proto { r#match::host::{proto::InvalidHostMatch, MatchHost}, }, }; + use std::time::Duration; #[derive(Debug, thiserror::Error)] pub enum InvalidGrpcRoute { @@ -124,6 +126,9 @@ pub mod proto { #[error("invalid failure accrual policy: {0}")] Breaker(#[from] InvalidFailureAccrual), + + #[error("invalid duration: {0}")] + Duration(#[from] prost_types::DurationError), } #[derive(Debug, thiserror::Error)] @@ -198,6 +203,7 @@ pub mod proto { matches, backends, filters, + request_timeout, } = proto; let matches = matches @@ -214,6 +220,8 @@ pub mod proto { .ok_or(InvalidGrpcRoute::Missing("distribution"))? .try_into()?; + let request_timeout = request_timeout.map(Duration::try_from).transpose()?; + Ok(Rule { matches, policy: Policy { @@ -221,6 +229,7 @@ pub mod proto { filters, distribution, failure_policy: Codes::default(), + request_timeout, }, }) } @@ -270,10 +279,14 @@ pub mod proto { impl TryFrom for RouteBackend { type Error = InvalidBackend; fn try_from( - grpc_route::RouteBackend { backend, filters }: grpc_route::RouteBackend, + grpc_route::RouteBackend { + backend, + filters, + request_timeout, + }: grpc_route::RouteBackend, ) -> Result, InvalidBackend> { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, filters) + RouteBackend::try_from_proto(backend, filters, request_timeout) } } diff --git a/linkerd/proxy/client-policy/src/http.rs b/linkerd/proxy/client-policy/src/http.rs index 8edd8b7e2d..8e0f38b6eb 100644 --- a/linkerd/proxy/client-policy/src/http.rs +++ b/linkerd/proxy/client-policy/src/http.rs @@ -47,6 +47,7 @@ pub fn default(distribution: crate::RouteDistribution) -> Route { filters: Arc::new([]), distribution, failure_policy: StatusRanges::default(), + request_timeout: None, }, }], } @@ -130,6 +131,9 @@ pub mod proto { #[error("missing {0}")] Missing(&'static str), + + #[error("invalid request timeout: {0}")] + Timeout(#[from] prost_types::DurationError), } #[derive(Debug, thiserror::Error)] @@ -217,6 +221,7 @@ pub mod proto { matches, backends, filters, + request_timeout, } = proto; let matches = matches @@ -233,6 +238,10 @@ pub mod proto { .ok_or(InvalidHttpRoute::Missing("distribution"))? .try_into()?; + let request_timeout = request_timeout + .map(std::time::Duration::try_from) + .transpose()?; + Ok(Rule { matches, policy: Policy { @@ -240,6 +249,7 @@ pub mod proto { filters, distribution, failure_policy: StatusRanges::default(), + request_timeout, }, }) } @@ -289,10 +299,14 @@ pub mod proto { impl TryFrom for RouteBackend { type Error = InvalidBackend; fn try_from( - http_route::RouteBackend { backend, filters }: http_route::RouteBackend, + http_route::RouteBackend { + backend, + filters, + request_timeout, + }: http_route::RouteBackend, ) -> Result { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, filters) + RouteBackend::try_from_proto(backend, filters, request_timeout) } } diff --git a/linkerd/proxy/client-policy/src/lib.rs b/linkerd/proxy/client-policy/src/lib.rs index 4e3f4f596c..ccbfa27be1 100644 --- a/linkerd/proxy/client-policy/src/lib.rs +++ b/linkerd/proxy/client-policy/src/lib.rs @@ -2,7 +2,7 @@ #![forbid(unsafe_code)] use once_cell::sync::Lazy; -use std::{borrow::Cow, hash::Hash, net::SocketAddr, num::NonZeroU16, sync::Arc, time}; +use std::{borrow::Cow, fmt, hash::Hash, net::SocketAddr, num::NonZeroU16, sync::Arc, time}; pub mod grpc; pub mod http; @@ -58,6 +58,18 @@ pub struct RoutePolicy { pub meta: Arc, pub filters: Arc<[T]>, pub distribution: RouteDistribution, + /// Request timeout applied to HTTP and gRPC routes. + /// + /// Opaque routes are proxied as opaque TCP, and therefore, we have no + /// concept of a "request", so this field is ignored by opaque routes. + /// It's somewhat unfortunate that this field is part of the `RoutePolicy` + /// struct, which is used to represent routes for all protocols, rather than + /// as a filter, which are a generic type that depends on the protocol in + /// use. However, this can't be easily modeled as a filter using the current + /// design for filters, as filters synchronously modify a request or return + /// an error --- a filter cannot wrap the response future in order to add a + /// timeout. + pub request_timeout: Option, /// Configures what responses are classified as failures. pub failure_policy: F, @@ -78,6 +90,7 @@ pub enum RouteDistribution { pub struct RouteBackend { pub filters: Arc<[T]>, pub backend: Backend, + pub request_timeout: Option, } // TODO(ver) how does configuration like failure accrual fit in here? What about @@ -155,6 +168,7 @@ impl ClientPolicy { .collect(), distribution: RouteDistribution::Empty, failure_policy: http::StatusRanges::default(), + request_timeout: None, }, }], }]) @@ -182,6 +196,37 @@ impl ClientPolicy { backends: BACKENDS.clone(), } } + + pub fn empty(timeout: time::Duration) -> Self { + static META: Lazy> = Lazy::new(|| { + Arc::new(Meta::Default { + name: "empty".into(), + }) + }); + static NO_HTTP_ROUTES: Lazy> = Lazy::new(|| Arc::new([])); + static NO_BACKENDS: Lazy> = Lazy::new(|| Arc::new([])); + + Self { + parent: META.clone(), + protocol: Protocol::Detect { + timeout, + http1: http::Http1 { + routes: NO_HTTP_ROUTES.clone(), + failure_accrual: Default::default(), + }, + http2: http::Http2 { + routes: NO_HTTP_ROUTES.clone(), + failure_accrual: Default::default(), + }, + opaque: opaq::Opaque { + // TODO(eliza): eventually, can we configure the opaque + // policy to fail conns? + policy: None, + }, + }, + backends: NO_BACKENDS.clone(), + } + } } // === impl Meta === @@ -250,6 +295,27 @@ impl std::hash::Hash for Meta { } } +impl fmt::Display for Meta { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Default { name } => write!(f, "default.{name}"), + Self::Resource { + kind, + name, + namespace, + port, + .. + } => { + write!(f, "{kind}.{namespace}.{name}")?; + if let Some(port) = port { + write!(f, ":{port}")? + } + Ok(()) + } + } + } +} + // === impl FailureAccrual === impl Default for FailureAccrual { @@ -532,6 +598,7 @@ pub mod proto { pub(crate) fn try_from_proto( backend: outbound::Backend, filters: impl IntoIterator, + request_timeout: Option, ) -> Result where T: TryFrom, @@ -543,8 +610,20 @@ pub mod proto { .map(T::try_from) .collect::, _>>() .map_err(|error| InvalidBackend::Filter(error.into()))?; - - Ok(RouteBackend { filters, backend }) + let request_timeout = + request_timeout + .map(|d| d.try_into()) + .transpose() + .map_err(|error| InvalidBackend::Duration { + field: "backend request timeout", + error, + })?; + + Ok(RouteBackend { + filters, + backend, + request_timeout, + }) } } diff --git a/linkerd/proxy/client-policy/src/opaq.rs b/linkerd/proxy/client-policy/src/opaq.rs index b86dce5f6e..f708b8a2c0 100644 --- a/linkerd/proxy/client-policy/src/opaq.rs +++ b/linkerd/proxy/client-policy/src/opaq.rs @@ -127,6 +127,8 @@ pub(crate) mod proto { filters: NO_FILTERS.clone(), failure_policy: NonIoErrors::default(), distribution, + // Request timeouts are ignored on opaque routes. + request_timeout: None, }) } @@ -178,7 +180,7 @@ pub(crate) mod proto { opaque_route::RouteBackend { backend }: opaque_route::RouteBackend, ) -> Result { let backend = backend.ok_or(InvalidBackend::Missing("backend"))?; - RouteBackend::try_from_proto(backend, std::iter::empty::<()>()) + RouteBackend::try_from_proto(backend, std::iter::empty::<()>(), None) } } diff --git a/linkerd/proxy/identity-client/Cargo.toml b/linkerd/proxy/identity-client/Cargo.toml index 8bcdcd18c5..f9445eb2b6 100644 --- a/linkerd/proxy/identity-client/Cargo.toml +++ b/linkerd/proxy/identity-client/Cargo.toml @@ -8,7 +8,7 @@ publish = false [dependencies] futures = { version = "0.3", default-features = false } -linkerd2-proxy-api = { version = "0.9", features = ["identity"] } +linkerd2-proxy-api = { version = "0.10", features = ["identity"] } linkerd-error = { path = "../../error" } linkerd-identity = { path = "../../identity" } linkerd-metrics = { path = "../../metrics" } diff --git a/linkerd/proxy/resolve/src/recover.rs b/linkerd/proxy/resolve/src/recover.rs index a8d6b0c797..fb0f980e68 100644 --- a/linkerd/proxy/resolve/src/recover.rs +++ b/linkerd/proxy/resolve/src/recover.rs @@ -147,7 +147,7 @@ where type Item = Result, Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); + let this = self.project(); loop { // XXX(eliza): note that this match was originally an `if let`, // but that doesn't work with `#[project]` for some kinda reason diff --git a/linkerd/proxy/server-policy/Cargo.toml b/linkerd/proxy/server-policy/Cargo.toml index 53f55f8a06..d5abef9080 100644 --- a/linkerd/proxy/server-policy/Cargo.toml +++ b/linkerd/proxy/server-policy/Cargo.toml @@ -17,7 +17,7 @@ prost-types = { version = "0.11", optional = true } thiserror = "1" [dependencies.linkerd2-proxy-api] -version = "0.9" +version = "0.10" features = ["inbound"] optional = true diff --git a/linkerd/proxy/tap/Cargo.toml b/linkerd/proxy/tap/Cargo.toml index b78b44fed3..29a50ed85b 100644 --- a/linkerd/proxy/tap/Cargo.toml +++ b/linkerd/proxy/tap/Cargo.toml @@ -11,7 +11,7 @@ http = "0.2" hyper = { version = "0.14", features = ["http1", "http2"] } futures = { version = "0.3", default-features = false } ipnet = "2.7" -linkerd2-proxy-api = { version = "0.9", features = ["tap"] } +linkerd2-proxy-api = { version = "0.10", features = ["tap"] } linkerd-conditional = { path = "../../conditional" } linkerd-error = { path = "../../error" } linkerd-meshtls = { path = "../../meshtls" } @@ -30,5 +30,5 @@ tracing = "0.1" pin-project = "1" [dev-dependencies] -linkerd2-proxy-api = { version = "0.9", features = ["arbitrary"] } +linkerd2-proxy-api = { version = "0.10", features = ["arbitrary"] } quickcheck = { version = "1", default-features = false } diff --git a/linkerd/service-profiles/Cargo.toml b/linkerd/service-profiles/Cargo.toml index 6f1338fd74..b7e5a221cc 100644 --- a/linkerd/service-profiles/Cargo.toml +++ b/linkerd/service-profiles/Cargo.toml @@ -21,7 +21,7 @@ linkerd-http-box = { path = "../http-box" } linkerd-proxy-api-resolve = { path = "../proxy/api-resolve" } linkerd-stack = { path = "../stack" } linkerd-tonic-watch = { path = "../tonic-watch" } -linkerd2-proxy-api = { version = "0.9", features = ["destination"] } +linkerd2-proxy-api = { version = "0.10", features = ["destination"] } once_cell = "1.17" prost-types = "0.11" regex = "1" @@ -33,5 +33,5 @@ thiserror = "1" tracing = "0.1" [dev-dependencies] -linkerd2-proxy-api = { version = "0.9", features = ["arbitrary"] } +linkerd2-proxy-api = { version = "0.10", features = ["arbitrary"] } quickcheck = { version = "1", default-features = false } diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index d106ab910d..0733cd48c8 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -12,7 +12,10 @@ compile_error!( ); use linkerd_app::{ - core::{telemetry::StartTime, transport::BindTcp}, + core::{ + telemetry::{build_info, StartTime}, + transport::BindTcp, + }, trace, Config, }; use linkerd_signal as signal; @@ -37,6 +40,15 @@ fn main() { } }; + info!( + "{profile} {version} ({sha}) by {vendor} on {date}", + date = build_info::DATE, + sha = build_info::GIT_SHA, + version = build_info::VERSION, + profile = build_info::PROFILE, + vendor = build_info::VENDOR, + ); + // Load configuration from the environment without binding ports. let config = match Config::try_from_env() { Ok(config) => config,