Skip to content

Commit

Permalink
ext_proc: performance optimization (envoyproxy#29527)
Browse files Browse the repository at this point in the history
Change API call on ext_proc to more efficient new implementation. Following the PR envoyproxy#29199, we are replacing the API call in ext_proc so it's not hashing the whole config proto on each request.

Signed-off-by: AlanDiaz <[email protected]>
  • Loading branch information
DiazAlan authored Sep 11, 2023
1 parent 154219f commit 9e106e5
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 24 deletions.
11 changes: 9 additions & 2 deletions envoy/grpc/async_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ using AsyncClientFactoryPtr = std::unique_ptr<AsyncClientFactory>;

class GrpcServiceConfigWithHashKey {
public:
GrpcServiceConfigWithHashKey() = default;

explicit GrpcServiceConfigWithHashKey(const envoy::config::core::v3::GrpcService& config)
: config_(config), pre_computed_hash_(Envoy::MessageUtil::hash(config)){};

Expand All @@ -53,9 +55,14 @@ class GrpcServiceConfigWithHashKey {

const envoy::config::core::v3::GrpcService& config() const { return config_; }

void setConfig(const envoy::config::core::v3::GrpcService g) {
config_ = g;
pre_computed_hash_ = Envoy::MessageUtil::hash(g);
}

private:
const envoy::config::core::v3::GrpcService config_;
const std::size_t pre_computed_hash_;
envoy::config::core::v3::GrpcService config_;
std::size_t pre_computed_hash_;
};

// Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ envoy_cc_library(
name = "client_interface",
hdrs = ["client.h"],
deps = [
"//envoy/grpc:async_client_manager_interface",
"//envoy/grpc:status",
"//envoy/stream_info:stream_info_interface",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
8 changes: 5 additions & 3 deletions source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/common/pure.h"
#include "envoy/config/core/v3/grpc_service.pb.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/grpc/status.h"
#include "envoy/service/ext_proc/v3/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"
Expand Down Expand Up @@ -38,9 +39,10 @@ class ExternalProcessorCallbacks {
class ExternalProcessorClient {
public:
virtual ~ExternalProcessorClient() = default;
virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const StreamInfo::StreamInfo& stream_info) PURE;
virtual ExternalProcessorStreamPtr
start(ExternalProcessorCallbacks& callbacks,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info) PURE;
};

using ExternalProcessorClientPtr = std::unique_ptr<ExternalProcessorClient>;
Expand Down
4 changes: 2 additions & 2 deletions source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl(Grpc::AsyncClientManage

ExternalProcessorStreamPtr
ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info) {
Grpc::AsyncClient<ProcessingRequest, ProcessingResponse> grpcClient(
client_manager_.getOrCreateRawAsyncClient(grpc_service, scope_, true));
client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true));
return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, stream_info);
}

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager, Stats::Scope& scope);

ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info) override;

private:
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Filter::StreamOpenState Filter::openStream() {
}
if (!stream_) {
ENVOY_LOG(debug, "Opening gRPC stream to external processor");
stream_ = client_->start(*this, grpc_service_, decoder_callbacks_->streamInfo());
stream_ = client_->start(*this, config_with_hash_key_, decoder_callbacks_->streamInfo());
if (processing_complete_) {
// Stream failed while starting and either onGrpcError or onGrpcClose was already called
// Asserts that `stream_` is nullptr since it is not valid to be used any further
Expand Down Expand Up @@ -874,6 +874,7 @@ void Filter::mergePerRouteConfig() {
if (merged_config->grpcService()) {
ENVOY_LOG(trace, "Setting new GrpcService from per-route configuration");
grpc_service_ = *merged_config->grpcService();
config_with_hash_key_.setConfig(*merged_config->grpcService());
}
}

Expand Down
4 changes: 3 additions & 1 deletion source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client,
const envoy::config::core::v3::GrpcService& grpc_service)
: config_(config), client_(std::move(client)), stats_(config->stats()),
grpc_service_(grpc_service), decoding_state_(*this, config->processingMode()),
grpc_service_(grpc_service), config_with_hash_key_(grpc_service),
decoding_state_(*this, config->processingMode()),
encoding_state_(*this, config->processingMode()) {}

const FilterConfig& config() const { return *config_; }
Expand Down Expand Up @@ -304,6 +305,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
ExtProcFilterStats stats_;
ExtProcLoggingInfo* logging_info_;
envoy::config::core::v3::GrpcService grpc_service_;
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_;

// The state of the filter on both the encoding and decoding side.
DecodingProcessorState decoding_state_;
Expand Down
16 changes: 9 additions & 7 deletions test/extensions/filters/http/ext_proc/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
protected:
void SetUp() override {
grpc_service_.mutable_envoy_grpc()->set_cluster_name("test");
config_with_hash_key_.setConfig(grpc_service_);

EXPECT_CALL(client_manager_, getOrCreateRawAsyncClient(_, _, _))
EXPECT_CALL(client_manager_, getOrCreateRawAsyncClientWithHashKey(_, _, _))
.WillOnce(Invoke(this, &ExtProcStreamTest::doFactory));

client_ =
Expand Down Expand Up @@ -67,6 +68,7 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
bool grpc_closed_ = false;

envoy::config::core::v3::GrpcService grpc_service_;
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_;
ExternalProcessorClientPtr client_;
Grpc::MockAsyncClientManager client_manager_;
Grpc::MockAsyncStream stream_;
Expand All @@ -77,14 +79,14 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
};

TEST_F(ExtProcStreamTest, OpenCloseStream) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
EXPECT_CALL(stream_, closeStream());
EXPECT_CALL(stream_, resetStream());
stream->close();
}

TEST_F(ExtProcStreamTest, SendToStream) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
// Send something and ensure that we get it. Doesn't really matter what.
EXPECT_CALL(stream_, sendMessageRaw_(_, false));
ProcessingRequest req;
Expand All @@ -95,14 +97,14 @@ TEST_F(ExtProcStreamTest, SendToStream) {
}

TEST_F(ExtProcStreamTest, SendAndClose) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
EXPECT_CALL(stream_, sendMessageRaw_(_, true));
ProcessingRequest req;
stream->send(std::move(req), true);
}

TEST_F(ExtProcStreamTest, ReceiveFromStream) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
// Send something and ensure that we get it. Doesn't really matter what.
ProcessingResponse resp;
Expand Down Expand Up @@ -132,7 +134,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) {
}

TEST_F(ExtProcStreamTest, StreamClosed) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand All @@ -145,7 +147,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) {
}

TEST_F(ExtProcStreamTest, StreamError) {
auto stream = client_->start(*this, grpc_service_, stream_info_);
auto stream = client_->start(*this, config_with_hash_key_, stream_info_);
ASSERT_NE(stream_callbacks_, nullptr);
EXPECT_FALSE(last_response_);
EXPECT_FALSE(grpc_closed_);
Expand Down
9 changes: 7 additions & 2 deletions test/extensions/filters/http/ext_proc/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,13 @@ class HttpFilterTest : public testing::Test {
}

ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
testing::Unused) {
if (final_expected_grpc_service_.has_value()) {
EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), grpc_service));
EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(),
config_with_hash_key.config()));
std::cout << final_expected_grpc_service_.value().DebugString();
std::cout << config_with_hash_key.config().DebugString();
}

stream_callbacks_ = &callbacks;
Expand Down Expand Up @@ -463,6 +466,7 @@ class HttpFilterTest : public testing::Test {
}

absl::optional<envoy::config::core::v3::GrpcService> final_expected_grpc_service_;
Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_;
std::unique_ptr<MockClient> client_;
ExternalProcessorCallbacks* stream_callbacks_ = nullptr;
ProcessingRequest last_request_;
Expand Down Expand Up @@ -2462,6 +2466,7 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead
cb(route_config);
}));
final_expected_grpc_service_.emplace(route_proto.overrides().grpc_service());
config_with_hash_key_.setConfig(route_proto.overrides().grpc_service());

response_headers_.addCopy(LowerCaseString(":status"), "200");
response_headers_.addCopy(LowerCaseString("content-type"), "text/plain");
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class MockClient : public ExternalProcessorClient {
MockClient();
~MockClient() override;
MOCK_METHOD(ExternalProcessorStreamPtr, start,
(ExternalProcessorCallbacks&, const envoy::config::core::v3::GrpcService&,
(ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&));
};

Expand Down
4 changes: 2 additions & 2 deletions test/extensions/filters/http/ext_proc/ordering_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class OrderingTest : public testing::Test {

// Called by the "start" method on the stream by the filter
virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService&,
const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&) {
stream_callbacks_ = &callbacks;
auto stream = std::make_unique<MockStream>();
Expand Down Expand Up @@ -218,7 +218,7 @@ class OrderingTest : public testing::Test {
class FastFailOrderingTest : public OrderingTest {
// All tests using this class have gRPC streams that will fail while being opened.
ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks,
const envoy::config::core::v3::GrpcService&,
const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&) override {
callbacks.onGrpcError(Grpc::Status::Internal);
// Returns nullptr on start stream failure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ DEFINE_PROTO_FUZZER(
EXPECT_CALL(*client, start(_, _, _))
.WillRepeatedly(Invoke(
[&](ExternalProcessing::ExternalProcessorCallbacks&,
const envoy::config::core::v3::GrpcService&,
const Grpc::GrpcServiceConfigWithHashKey&,
const StreamInfo::StreamInfo&) -> ExternalProcessing::ExternalProcessorStreamPtr {
auto stream = std::make_unique<MockStream>();
EXPECT_CALL(*stream, send(_, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MockClient : public ExternalProcessing::ExternalProcessorClient {

MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start,
(ExternalProcessing::ExternalProcessorCallbacks & callbacks,
const envoy::config::core::v3::GrpcService& grpc_service,
const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key,
const StreamInfo::StreamInfo& stream_info));
};

Expand Down

0 comments on commit 9e106e5

Please sign in to comment.