diff --git a/envoy/grpc/async_client_manager.h b/envoy/grpc/async_client_manager.h index 629873e9f16d..aa99f2c23a6e 100644 --- a/envoy/grpc/async_client_manager.h +++ b/envoy/grpc/async_client_manager.h @@ -34,6 +34,8 @@ using AsyncClientFactoryPtr = std::unique_ptr; class GrpcServiceConfigWithHashKey { public: + GrpcServiceConfigWithHashKey() = default; + explicit GrpcServiceConfigWithHashKey(const envoy::config::core::v3::GrpcService& config) : config_(config), pre_computed_hash_(Envoy::MessageUtil::hash(config)){}; @@ -53,9 +55,14 @@ class GrpcServiceConfigWithHashKey { const envoy::config::core::v3::GrpcService& config() const { return config_; } + void setConfig(const envoy::config::core::v3::GrpcService g) { + config_ = g; + pre_computed_hash_ = Envoy::MessageUtil::hash(g); + } + private: - const envoy::config::core::v3::GrpcService config_; - const std::size_t pre_computed_hash_; + envoy::config::core::v3::GrpcService config_; + std::size_t pre_computed_hash_; }; // Singleton gRPC client manager. Grpc::AsyncClientManager can be used to create per-service diff --git a/source/extensions/filters/http/ext_proc/BUILD b/source/extensions/filters/http/ext_proc/BUILD index 2c2397d6d304..484395f170d3 100644 --- a/source/extensions/filters/http/ext_proc/BUILD +++ b/source/extensions/filters/http/ext_proc/BUILD @@ -56,6 +56,7 @@ envoy_cc_library( name = "client_interface", hdrs = ["client.h"], deps = [ + "//envoy/grpc:async_client_manager_interface", "//envoy/grpc:status", "//envoy/stream_info:stream_info_interface", "@envoy_api//envoy/config/core/v3:pkg_cc_proto", diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index e4c7dba2fef3..25594b3748e4 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -4,6 +4,7 @@ #include "envoy/common/pure.h" #include "envoy/config/core/v3/grpc_service.pb.h" +#include "envoy/grpc/async_client_manager.h" #include "envoy/grpc/status.h" #include "envoy/service/ext_proc/v3/external_processor.pb.h" #include "envoy/stream_info/stream_info.h" @@ -38,9 +39,10 @@ class ExternalProcessorCallbacks { class ExternalProcessorClient { public: virtual ~ExternalProcessorClient() = default; - virtual ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, - const StreamInfo::StreamInfo& stream_info) PURE; + virtual ExternalProcessorStreamPtr + start(ExternalProcessorCallbacks& callbacks, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, + const StreamInfo::StreamInfo& stream_info) PURE; }; using ExternalProcessorClientPtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.cc b/source/extensions/filters/http/ext_proc/client_impl.cc index 22d970929b2d..80ccd31189db 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.cc +++ b/source/extensions/filters/http/ext_proc/client_impl.cc @@ -13,10 +13,10 @@ ExternalProcessorClientImpl::ExternalProcessorClientImpl(Grpc::AsyncClientManage ExternalProcessorStreamPtr ExternalProcessorClientImpl::start(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const StreamInfo::StreamInfo& stream_info) { Grpc::AsyncClient grpcClient( - client_manager_.getOrCreateRawAsyncClient(grpc_service, scope_, true)); + client_manager_.getOrCreateRawAsyncClientWithHashKey(config_with_hash_key, scope_, true)); return ExternalProcessorStreamImpl::create(std::move(grpcClient), callbacks, stream_info); } diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 90f00fa0baa9..45c86fc9da5c 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -27,7 +27,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { ExternalProcessorClientImpl(Grpc::AsyncClientManager& client_manager, Stats::Scope& scope); ExternalProcessorStreamPtr start(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const StreamInfo::StreamInfo& stream_info) override; private: diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index f4b56b93a515..b0e16cba79bb 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -160,7 +160,7 @@ Filter::StreamOpenState Filter::openStream() { } if (!stream_) { ENVOY_LOG(debug, "Opening gRPC stream to external processor"); - stream_ = client_->start(*this, grpc_service_, decoder_callbacks_->streamInfo()); + stream_ = client_->start(*this, config_with_hash_key_, decoder_callbacks_->streamInfo()); if (processing_complete_) { // Stream failed while starting and either onGrpcError or onGrpcClose was already called // Asserts that `stream_` is nullptr since it is not valid to be used any further @@ -874,6 +874,7 @@ void Filter::mergePerRouteConfig() { if (merged_config->grpcService()) { ENVOY_LOG(trace, "Setting new GrpcService from per-route configuration"); grpc_service_ = *merged_config->grpcService(); + config_with_hash_key_.setConfig(*merged_config->grpcService()); } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 9a2467c27989..9b74867aba47 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -246,7 +246,8 @@ class Filter : public Logger::Loggable, Filter(const FilterConfigSharedPtr& config, ExternalProcessorClientPtr&& client, const envoy::config::core::v3::GrpcService& grpc_service) : config_(config), client_(std::move(client)), stats_(config->stats()), - grpc_service_(grpc_service), decoding_state_(*this, config->processingMode()), + grpc_service_(grpc_service), config_with_hash_key_(grpc_service), + decoding_state_(*this, config->processingMode()), encoding_state_(*this, config->processingMode()) {} const FilterConfig& config() const { return *config_; } @@ -304,6 +305,7 @@ class Filter : public Logger::Loggable, ExtProcFilterStats stats_; ExtProcLoggingInfo* logging_info_; envoy::config::core::v3::GrpcService grpc_service_; + Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; // The state of the filter on both the encoding and decoding side. DecodingProcessorState decoding_state_; diff --git a/test/extensions/filters/http/ext_proc/client_test.cc b/test/extensions/filters/http/ext_proc/client_test.cc index 1099ae88fc54..63bd95f2b1aa 100644 --- a/test/extensions/filters/http/ext_proc/client_test.cc +++ b/test/extensions/filters/http/ext_proc/client_test.cc @@ -30,8 +30,9 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback protected: void SetUp() override { grpc_service_.mutable_envoy_grpc()->set_cluster_name("test"); + config_with_hash_key_.setConfig(grpc_service_); - EXPECT_CALL(client_manager_, getOrCreateRawAsyncClient(_, _, _)) + EXPECT_CALL(client_manager_, getOrCreateRawAsyncClientWithHashKey(_, _, _)) .WillOnce(Invoke(this, &ExtProcStreamTest::doFactory)); client_ = @@ -67,6 +68,7 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback bool grpc_closed_ = false; envoy::config::core::v3::GrpcService grpc_service_; + Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; ExternalProcessorClientPtr client_; Grpc::MockAsyncClientManager client_manager_; Grpc::MockAsyncStream stream_; @@ -77,14 +79,14 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback }; TEST_F(ExtProcStreamTest, OpenCloseStream) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); EXPECT_CALL(stream_, closeStream()); EXPECT_CALL(stream_, resetStream()); stream->close(); } TEST_F(ExtProcStreamTest, SendToStream) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); // Send something and ensure that we get it. Doesn't really matter what. EXPECT_CALL(stream_, sendMessageRaw_(_, false)); ProcessingRequest req; @@ -95,14 +97,14 @@ TEST_F(ExtProcStreamTest, SendToStream) { } TEST_F(ExtProcStreamTest, SendAndClose) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); EXPECT_CALL(stream_, sendMessageRaw_(_, true)); ProcessingRequest req; stream->send(std::move(req), true); } TEST_F(ExtProcStreamTest, ReceiveFromStream) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); // Send something and ensure that we get it. Doesn't really matter what. ProcessingResponse resp; @@ -132,7 +134,7 @@ TEST_F(ExtProcStreamTest, ReceiveFromStream) { } TEST_F(ExtProcStreamTest, StreamClosed) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); @@ -145,7 +147,7 @@ TEST_F(ExtProcStreamTest, StreamClosed) { } TEST_F(ExtProcStreamTest, StreamError) { - auto stream = client_->start(*this, grpc_service_, stream_info_); + auto stream = client_->start(*this, config_with_hash_key_, stream_info_); ASSERT_NE(stream_callbacks_, nullptr); EXPECT_FALSE(last_response_); EXPECT_FALSE(grpc_closed_); diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index befcfc7df45b..ecf2893003cd 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -163,10 +163,13 @@ class HttpFilterTest : public testing::Test { } ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, testing::Unused) { if (final_expected_grpc_service_.has_value()) { - EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), grpc_service)); + EXPECT_TRUE(TestUtility::protoEqual(final_expected_grpc_service_.value(), + config_with_hash_key.config())); + std::cout << final_expected_grpc_service_.value().DebugString(); + std::cout << config_with_hash_key.config().DebugString(); } stream_callbacks_ = &callbacks; @@ -463,6 +466,7 @@ class HttpFilterTest : public testing::Test { } absl::optional final_expected_grpc_service_; + Grpc::GrpcServiceConfigWithHashKey config_with_hash_key_; std::unique_ptr client_; ExternalProcessorCallbacks* stream_callbacks_ = nullptr; ProcessingRequest last_request_; @@ -2462,6 +2466,7 @@ TEST_F(HttpFilterTest, ProcessingModeResponseHeadersOnlyWithoutCallingDecodeHead cb(route_config); })); final_expected_grpc_service_.emplace(route_proto.overrides().grpc_service()); + config_with_hash_key_.setConfig(route_proto.overrides().grpc_service()); response_headers_.addCopy(LowerCaseString(":status"), "200"); response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index f655e977a2e4..5f4dedb90173 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -14,7 +14,7 @@ class MockClient : public ExternalProcessorClient { MockClient(); ~MockClient() override; MOCK_METHOD(ExternalProcessorStreamPtr, start, - (ExternalProcessorCallbacks&, const envoy::config::core::v3::GrpcService&, + (ExternalProcessorCallbacks&, const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&)); }; diff --git a/test/extensions/filters/http/ext_proc/ordering_test.cc b/test/extensions/filters/http/ext_proc/ordering_test.cc index ace797afd5fb..035be9801616 100644 --- a/test/extensions/filters/http/ext_proc/ordering_test.cc +++ b/test/extensions/filters/http/ext_proc/ordering_test.cc @@ -81,7 +81,7 @@ class OrderingTest : public testing::Test { // Called by the "start" method on the stream by the filter virtual ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService&, + const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&) { stream_callbacks_ = &callbacks; auto stream = std::make_unique(); @@ -218,7 +218,7 @@ class OrderingTest : public testing::Test { class FastFailOrderingTest : public OrderingTest { // All tests using this class have gRPC streams that will fail while being opened. ExternalProcessorStreamPtr doStart(ExternalProcessorCallbacks& callbacks, - const envoy::config::core::v3::GrpcService&, + const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&) override { callbacks.onGrpcError(Grpc::Status::Internal); // Returns nullptr on start stream failure. diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc index 6bb5e89d272f..984b828e0e02 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/ext_proc_unit_test_fuzz.cc @@ -87,7 +87,7 @@ DEFINE_PROTO_FUZZER( EXPECT_CALL(*client, start(_, _, _)) .WillRepeatedly(Invoke( [&](ExternalProcessing::ExternalProcessorCallbacks&, - const envoy::config::core::v3::GrpcService&, + const Grpc::GrpcServiceConfigWithHashKey&, const StreamInfo::StreamInfo&) -> ExternalProcessing::ExternalProcessorStreamPtr { auto stream = std::make_unique(); EXPECT_CALL(*stream, send(_, _)) diff --git a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h index cbfafb5f2473..207b820e21df 100644 --- a/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h +++ b/test/extensions/filters/http/ext_proc/unit_test_fuzz/mocks.h @@ -30,7 +30,7 @@ class MockClient : public ExternalProcessing::ExternalProcessorClient { MOCK_METHOD(ExternalProcessing::ExternalProcessorStreamPtr, start, (ExternalProcessing::ExternalProcessorCallbacks & callbacks, - const envoy::config::core::v3::GrpcService& grpc_service, + const Grpc::GrpcServiceConfigWithHashKey& config_with_hash_key, const StreamInfo::StreamInfo& stream_info)); };