Skip to content

Commit

Permalink
Invoke ORCA Load Report callbacks from Router::Filter. (envoyproxy#…
Browse files Browse the repository at this point in the history
…35728)

<!--
!!!ATTENTION!!!

If you are fixing *any* crash or *any* potential security issue, *do
not*
open a pull request in this repo. Please report the issue via emailing
[email protected] where the issue will be triaged
appropriately.
Thank you in advance for helping to keep Envoy secure.

!!!ATTENTION!!!

For an explanation of how to fill out the fields, please see the
relevant section
in
[PULL_REQUESTS.md](https://github.com/envoyproxy/envoy/blob/main/PULL_REQUESTS.md)
-->

Commit Message:
Invoke ORCA Load Report callbacks from `Router::Filter`.

- Add `LoadBalancerContext::setOrcaLoadReportCb(OrcaLoadReportCb
callback)` method.
- Allow LoadBalancer to set the callback to be invoked when ORCA load
report is received.

Risk Level: Low
Release Notes: N/A
Platform Specific Features: N/A
envoyproxy#34777

---------

Signed-off-by: Misha Efimov <[email protected]>
  • Loading branch information
efimki authored Aug 27, 2024
1 parent db05440 commit 45e0325
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 5 deletions.
1 change: 1 addition & 0 deletions envoy/upstream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ envoy_cc_library(
":upstream_interface",
"//envoy/router:router_interface",
"//envoy/upstream:types_interface",
"@com_github_cncf_xds//xds/data/orca/v3:pkg_cc_proto",
],
)

Expand Down
17 changes: 17 additions & 0 deletions envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "envoy/upstream/types.h"
#include "envoy/upstream/upstream.h"

#include "xds/data/orca/v3/orca_load_report.pb.h"

namespace Envoy {
namespace Http {
namespace ConnectionPool {
Expand Down Expand Up @@ -103,6 +105,21 @@ class LoadBalancerContext {
* and return the corresponding host directly.
*/
virtual absl::optional<OverrideHost> overrideHostToSelect() const PURE;

// Interface for callbacks when ORCA load reports are received from upstream.
class OrcaLoadReportCallbacks {
public:
virtual ~OrcaLoadReportCallbacks() = default;
// Invoked when a new orca report is received for this LB context.
virtual absl::Status
onOrcaLoadReport(const xds::data::orca::v3::OrcaLoadReport& orca_load_report) PURE;
};

/**
* Install a callback to be invoked when ORCA Load report is received for this
* LB context.
*/
virtual void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) PURE;
};

/**
Expand Down
19 changes: 14 additions & 5 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2118,7 +2118,7 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or
auto host = upstream_request.upstreamHost();
const bool need_to_send_load_report =
(host != nullptr) && cluster_->lrsReportMetricNames().has_value();
if (!need_to_send_load_report) {
if (!need_to_send_load_report && !orca_load_report_callbacks_.has_value()) {
return;
}

Expand All @@ -2132,10 +2132,19 @@ void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or

orca_load_report_received_ = true;

ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_,
orca_load_report->DebugString());
Envoy::Orca::addOrcaLoadReportToLoadMetricStats(
cluster_->lrsReportMetricNames().value(), orca_load_report.value(), host->loadMetricStats());
if (need_to_send_load_report) {
ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_,
orca_load_report->DebugString());
Envoy::Orca::addOrcaLoadReportToLoadMetricStats(cluster_->lrsReportMetricNames().value(),
orca_load_report.value(),
host->loadMetricStats());
}
if (orca_load_report_callbacks_.has_value()) {
const absl::Status status = orca_load_report_callbacks_->onOrcaLoadReport(*orca_load_report);
if (!status.ok()) {
ENVOY_LOG_MISC(error, "Failed to invoke OrcaLoadReportCallbacks: {}", status.message());
}
}
}

RetryStatePtr
Expand Down
5 changes: 5 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ class Filter : Logger::Loggable<Logger::Id::router>,
return callbacks_->upstreamOverrideHost();
}

void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) override {
orca_load_report_callbacks_ = callbacks;
}

/**
* Set a computed cookie to be sent with the downstream headers.
* @param key supplies the size of the cookie
Expand Down Expand Up @@ -604,6 +608,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
Http::Code timeout_response_code_ = Http::Code::GatewayTimeout;
FilterUtility::HedgingParams hedging_params_;
Http::StreamFilterSidestreamWatermarkCallbacks watermark_callbacks_;
OptRef<OrcaLoadReportCallbacks> orca_load_report_callbacks_;
bool grpc_request_ : 1;
bool exclude_http_code_stats_ : 1;
bool downstream_response_started_ : 1;
Expand Down
2 changes: 2 additions & 0 deletions source/common/upstream/load_balancer_context_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class LoadBalancerContextBase : public LoadBalancerContext {
}

absl::optional<OverrideHost> overrideHostToSelect() const override { return {}; }

void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks&) override {}
};

} // namespace Upstream
Expand Down
4 changes: 4 additions & 0 deletions source/extensions/load_balancing_policies/subset/subset_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ class SubsetLoadBalancer : public LoadBalancer, Logger::Loggable<Logger::Id::ups
return wrapped_->overrideHostToSelect();
}

void setOrcaLoadReportCallbacks(OrcaLoadReportCallbacks& callbacks) override {
wrapped_->setOrcaLoadReportCallbacks(callbacks);
}

private:
LoadBalancerContext* wrapped_;
Router::MetadataMatchCriteriaConstPtr metadata_match_;
Expand Down
122 changes: 122 additions & 0 deletions test/common/router/router_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6745,5 +6745,127 @@ TEST_F(RouterTest, OrcaLoadReport_NoConfiguredMetricNames) {
ASSERT_EQ(load_metric_stats_map, nullptr);
}

class TestOrcaLoadReportCallbacks : public Filter::OrcaLoadReportCallbacks {
public:
MOCK_METHOD(absl::Status, onOrcaLoadReport,
(const xds::data::orca::v3::OrcaLoadReport& orca_load_report), (override));
};

TEST_F(RouterTest, OrcaLoadReportCallbacks) {
EXPECT_CALL(callbacks_.route_->route_entry_, timeout())
.WillOnce(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0);

NiceMock<Http::MockRequestEncoder> encoder;
Http::ResponseDecoder* response_decoder = nullptr;
expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10);

Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
router_->decodeHeaders(headers, true);

// Configure ORCA callbacks to receive the report.
TestOrcaLoadReportCallbacks callbacks;
xds::data::orca::v3::OrcaLoadReport received_orca_load_report;
EXPECT_CALL(callbacks, onOrcaLoadReport(_))
.WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report) {
received_orca_load_report = orca_load_report;
return absl::OkStatus();
}));
router_->setOrcaLoadReportCallbacks(callbacks);

// Send ORCA report in the headers.
xds::data::orca::v3::OrcaLoadReport headers_orca_load_report;
headers_orca_load_report.set_cpu_utilization(0.5);
headers_orca_load_report.mutable_named_metrics()->insert({"good", 0.7});
std::string headers_proto_string =
TestUtility::getProtobufBinaryStringFromMessage(headers_orca_load_report);
std::string headers_orca_load_report_header_bin =
Envoy::Base64::encode(headers_proto_string.c_str(), headers_proto_string.length());
Http::ResponseHeaderMapPtr response_headers(new Http::TestResponseHeaderMapImpl{
{":status", "200"}, {"endpoint-load-metrics-bin", headers_orca_load_report_header_bin}});
response_decoder->decodeHeaders(std::move(response_headers), false);

// Send different ORCA report in the trailers. Expect it to be ignored.
xds::data::orca::v3::OrcaLoadReport trailers_orca_load_report;
trailers_orca_load_report.set_cpu_utilization(1.0);
trailers_orca_load_report.mutable_named_metrics()->insert({"good", 0.1});
std::string trailers_proto_string =
TestUtility::getProtobufBinaryStringFromMessage(trailers_orca_load_report);
std::string trailers_orca_load_report_header_bin =
Envoy::Base64::encode(trailers_proto_string.c_str(), trailers_proto_string.length());
Http::ResponseTrailerMapPtr response_trailers(new Http::TestResponseTrailerMapImpl{
{":status", "200"}, {"endpoint-load-metrics-bin", trailers_orca_load_report_header_bin}});
response_decoder->decodeTrailers(std::move(response_trailers));
// Verify that received load report is set in headers.
EXPECT_EQ(received_orca_load_report.cpu_utilization(),
headers_orca_load_report.cpu_utilization());
}

TEST_F(RouterTest, OrcaLoadReportCallbackReturnsError) {
EXPECT_CALL(callbacks_.route_->route_entry_, timeout())
.WillOnce(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0);

NiceMock<Http::MockRequestEncoder> encoder;
Http::ResponseDecoder* response_decoder = nullptr;
expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10);

Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
router_->decodeHeaders(headers, true);

// Configure ORCA callbacks to receive the report.
TestOrcaLoadReportCallbacks callbacks;
xds::data::orca::v3::OrcaLoadReport received_orca_load_report;
EXPECT_CALL(callbacks, onOrcaLoadReport(_))
.WillOnce(Invoke([&](const xds::data::orca::v3::OrcaLoadReport& orca_load_report) {
received_orca_load_report = orca_load_report;
// Return an error that gets logged by router filter.
return absl::InvalidArgumentError("Unexpected ORCA load Report");
}));
router_->setOrcaLoadReportCallbacks(callbacks);

// Send metrics in the trailers.
xds::data::orca::v3::OrcaLoadReport orca_load_report;
orca_load_report.set_cpu_utilization(0.5);
orca_load_report.mutable_named_metrics()->insert({"good", 0.7});
std::string proto_string = TestUtility::getProtobufBinaryStringFromMessage(orca_load_report);
std::string orca_load_report_header_bin =
Envoy::Base64::encode(proto_string.c_str(), proto_string.length());
Http::ResponseTrailerMapPtr response_trailers(new Http::TestResponseTrailerMapImpl{
{":status", "200"}, {"endpoint-load-metrics-bin", orca_load_report_header_bin}});
response_decoder->decodeTrailers(std::move(response_trailers));
EXPECT_EQ(received_orca_load_report.named_metrics().at("good"), 0.7);
}

TEST_F(RouterTest, OrcaLoadReportInvalidHeaderValue) {
EXPECT_CALL(callbacks_.route_->route_entry_, timeout())
.WillOnce(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0);

NiceMock<Http::MockRequestEncoder> encoder;
Http::ResponseDecoder* response_decoder = nullptr;
expectNewStreamWithImmediateEncoder(encoder, &response_decoder, Http::Protocol::Http10);

Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
router_->decodeHeaders(headers, true);

// Configure ORCA callbacks to receive the report, but don't expect it to be
// called for invalid orca header.
TestOrcaLoadReportCallbacks callbacks;
EXPECT_CALL(callbacks, onOrcaLoadReport(_)).Times(0);
router_->setOrcaLoadReportCallbacks(callbacks);

// Send report with invalid ORCA proto.
std::string proto_string = "Invalid ORCA proto value";
std::string orca_load_report_header_bin =
Envoy::Base64::encode(proto_string.c_str(), proto_string.length());
Http::ResponseHeaderMapPtr response_headers(new Http::TestResponseHeaderMapImpl{
{":status", "200"}, {"endpoint-load-metrics-bin", orca_load_report_header_bin}});
response_decoder->decodeHeaders(std::move(response_headers), true);
}

} // namespace Router
} // namespace Envoy
1 change: 1 addition & 0 deletions test/mocks/upstream/load_balancer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class MockLoadBalancerContext : public LoadBalancerContext {
MOCK_METHOD(Network::TransportSocketOptionsConstSharedPtr, upstreamTransportSocketOptions, (),
(const));
MOCK_METHOD(absl::optional<OverrideHost>, overrideHostToSelect, (), (const));
MOCK_METHOD(void, setOrcaLoadReportCallbacks, (OrcaLoadReportCallbacks&));

private:
HealthyAndDegradedLoad priority_load_;
Expand Down

0 comments on commit 45e0325

Please sign in to comment.