diff --git a/source/extensions/filters/http/ext_proc/client.h b/source/extensions/filters/http/ext_proc/client.h index 4bcb6134185e..fdb0e3755883 100644 --- a/source/extensions/filters/http/ext_proc/client.h +++ b/source/extensions/filters/http/ext_proc/client.h @@ -44,7 +44,7 @@ class ExternalProcessorCallbacks : public RequestCallbacks { std::unique_ptr&& response) PURE; virtual void onGrpcError(Grpc::Status::GrpcStatus error) PURE; virtual void onGrpcClose() PURE; - virtual void logGrpcStreamInfo() PURE; + virtual void logStreamInfo() PURE; }; class ExternalProcessorClient : public ClientBase { diff --git a/source/extensions/filters/http/ext_proc/client_base.h b/source/extensions/filters/http/ext_proc/client_base.h index d37fd0c1f512..e331f7c5b383 100644 --- a/source/extensions/filters/http/ext_proc/client_base.h +++ b/source/extensions/filters/http/ext_proc/client_base.h @@ -3,6 +3,7 @@ #include #include "envoy/service/ext_proc/v3/external_processor.pb.h" +#include "envoy/stream_info/stream_info.h" namespace Envoy { namespace Extensions { @@ -37,6 +38,7 @@ class ClientBase { bool end_stream, const uint64_t stream_id, RequestCallbacks* callbacks, StreamBase* stream) PURE; virtual void cancel() PURE; + virtual const Envoy::StreamInfo::StreamInfo* getStreamInfo() const PURE; }; using ClientBasePtr = std::unique_ptr; diff --git a/source/extensions/filters/http/ext_proc/client_impl.cc b/source/extensions/filters/http/ext_proc/client_impl.cc index fc1af7c88281..fbab59ecd943 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.cc +++ b/source/extensions/filters/http/ext_proc/client_impl.cc @@ -104,7 +104,7 @@ void ExternalProcessorStreamImpl::onRemoteClose(Grpc::Status::GrpcStatus status, return; } - callbacks_->logGrpcStreamInfo(); + callbacks_->logStreamInfo(); if (status == Grpc::Status::Ok) { callbacks_->onGrpcClose(); } else { diff --git a/source/extensions/filters/http/ext_proc/client_impl.h b/source/extensions/filters/http/ext_proc/client_impl.h index 789a05fef47e..25a13ee5ae83 100644 --- a/source/extensions/filters/http/ext_proc/client_impl.h +++ b/source/extensions/filters/http/ext_proc/client_impl.h @@ -36,6 +36,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient { const uint64_t stream_id, RequestCallbacks* callbacks, StreamBase* stream) override; void cancel() override {} + const Envoy::StreamInfo::StreamInfo* getStreamInfo() const override { return nullptr; } private: Grpc::AsyncClientManager& client_manager_; diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index c3011626ef87..b396a9fd9cb8 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -416,11 +416,6 @@ Filter::StreamOpenState Filter::openStream() { stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(), config_->deferredCloseTimeout()); - // For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not - // have a proper implementation of streamInfo. - if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) { - logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo()); - } } return StreamOpenState::Ok; } @@ -1039,21 +1034,37 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers stats_.stream_msgs_sent_.inc(); } -void Filter::logGrpcStreamInfo() { +void Filter::logStreamInfoBase(const Envoy::StreamInfo::StreamInfo* stream_info) { + if (stream_info == nullptr || logging_info_ == nullptr) { + return; + } + + const auto& upstream_meter = stream_info->getUpstreamBytesMeter(); + if (upstream_meter != nullptr) { + logging_info_->setBytesSent(upstream_meter->wireBytesSent()); + logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); + } + // Only set upstream host in logging info once. + if (logging_info_->upstreamHost() == nullptr) { + logging_info_->setUpstreamHost(stream_info->upstreamInfo()->upstreamHost()); + } + + // Only set cluster info in logging info once. + if (logging_info_->clusterInfo() == nullptr) { + logging_info_->setClusterInfo(stream_info->upstreamClusterInfo()); + } +} + +void Filter::logStreamInfo() { if (!config().grpcService().has_value()) { + // HTTP service + logStreamInfoBase(client_->getStreamInfo()); return; } - if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) { - const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter(); - if (upstream_meter != nullptr) { - logging_info_->setBytesSent(upstream_meter->wireBytesSent()); - logging_info_->setBytesReceived(upstream_meter->wireBytesReceived()); - } - // Only set upstream host in logging info once. - if (logging_info_->upstreamHost() == nullptr) { - logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost()); - } + if (stream_ != nullptr && grpc_service_.has_envoy_grpc()) { + // Envoy gRPC service + logStreamInfoBase(&stream_->streamInfo()); } } @@ -1368,7 +1379,7 @@ void Filter::onGrpcClose() { void Filter::onMessageTimeout() { ENVOY_LOG(debug, "message timeout reached"); - logGrpcStreamInfo(); + logStreamInfo(); stats_.message_timeouts_.inc(); if (config_->failureModeAllow()) { // The user would like a timeout to not cause message processing to fail. diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 1260bb0a2040..783f56da040d 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -426,7 +426,7 @@ class Filter : public Logger::Loggable, void encodeComplete() override { if (config_->observabilityMode()) { - logGrpcStreamInfo(); + logStreamInfo(); } } @@ -435,7 +435,8 @@ class Filter : public Logger::Loggable, std::unique_ptr&& response) override; void onGrpcError(Grpc::Status::GrpcStatus error) override; void onGrpcClose() override; - void logGrpcStreamInfo() override; + void logStreamInfoBase(const Envoy::StreamInfo::StreamInfo* stream_info); + void logStreamInfo() override; void onMessageTimeout(); void onNewTimeout(const ProtobufWkt::Duration& override_message_timeout); diff --git a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc index 39de7d371e10..9b7e0ff18b8c 100644 --- a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc +++ b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.cc @@ -11,8 +11,7 @@ namespace HttpFilters { namespace ExternalProcessing { namespace { -Http::RequestMessagePtr buildHttpRequest(absl::string_view uri, const uint64_t stream_id, - absl::string_view req_in_json) { +Http::RequestHeaderMapPtr buildHttpRequestHeaders(absl::string_view uri, const uint64_t stream_id) { absl::string_view host, path; Envoy::Http::Utility::extractHostPathFromUri(uri, host, path); ENVOY_LOG_MISC(debug, " Ext_Proc HTTP client send request to uri {}, host {}, path {}", uri, host, @@ -28,10 +27,7 @@ Http::RequestMessagePtr buildHttpRequest(absl::string_view uri, const uint64_t s {header_values.ContentType, "application/json"}, {header_values.RequestId, std::to_string(stream_id)}, {header_values.Host, std::string(host)}}); - Http::RequestMessagePtr message = - std::make_unique(std::move(headers)); - message->body().add(req_in_json); - return message; + return headers; } } // namespace @@ -47,8 +43,7 @@ void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequ auto req_in_json = MessageUtil::getJsonStringFromMessage(req); if (req_in_json.ok()) { const auto http_uri = config_.http_service().http_service().http_uri(); - Http::RequestMessagePtr message = - buildHttpRequest(http_uri.uri(), stream_id, req_in_json.value()); + Http::RequestHeaderMapPtr headers = buildHttpRequestHeaders(http_uri.uri(), stream_id); auto options = Http::AsyncClient::RequestOptions() .setTimeout(std::chrono::milliseconds( DurationUtil::durationToMilliseconds(http_uri.timeout()))) @@ -58,7 +53,11 @@ void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequ const auto thread_local_cluster = context().clusterManager().getThreadLocalCluster(cluster); if (thread_local_cluster) { active_request_ = - thread_local_cluster->httpAsyncClient().send(std::move(message), *this, options); + thread_local_cluster->httpAsyncClient().startRequest(std::move(headers), *this, options); + if (active_request_ != nullptr) { + Buffer::OwnedImpl body(req_in_json.value()); + active_request_->sendData(body, true); + } } else { ENVOY_LOG(error, "ext_proc cluster {} does not exist in the config", cluster); } @@ -68,7 +67,6 @@ void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequ void ExtProcHttpClient::onSuccess(const Http::AsyncClient::Request&, Http::ResponseMessagePtr&& response) { auto status = Envoy::Http::Utility::getResponseStatusOrNullopt(response->headers()); - active_request_ = nullptr; if (status.has_value()) { uint64_t status_code = status.value(); if (status_code == Envoy::enumToInt(Envoy::Http::Code::OK)) { @@ -93,6 +91,7 @@ void ExtProcHttpClient::onSuccess(const Http::AsyncClient::Request&, if (callbacks_) { callbacks_->onComplete(response_msg); callbacks_ = nullptr; + active_request_ = nullptr; } } else { ENVOY_LOG(error, "Response status is not OK, status: {}", status_code); @@ -110,18 +109,24 @@ void ExtProcHttpClient::onFailure(const Http::AsyncClient::Request&, ASSERT(reason == Http::AsyncClient::FailureReason::Reset || reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit); ENVOY_LOG(error, "Request failed: stream has been reset"); - active_request_ = nullptr; onError(); } +const Envoy::StreamInfo::StreamInfo* ExtProcHttpClient::getStreamInfo() const { + if (active_request_ != nullptr) { + return &active_request_->streamInfo(); + } else { + return nullptr; + } +} + void ExtProcHttpClient::onError() { - // Cancel if the request is active. - cancel(); ENVOY_LOG(error, "ext_proc HTTP client error condition happens."); if (callbacks_) { callbacks_->onError(); callbacks_ = nullptr; } + active_request_ = nullptr; } void ExtProcHttpClient::cancel() { diff --git a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h index a0b5977e0269..a456a3caf84e 100644 --- a/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h +++ b/source/extensions/filters/http/ext_proc/http_client/http_client_impl.h @@ -36,13 +36,15 @@ class ExtProcHttpClient : public ClientBase, void onFailure(const Http::AsyncClient::Request& request, Http::AsyncClient::FailureReason reason) override; + const Envoy::StreamInfo::StreamInfo* getStreamInfo() const override; + Server::Configuration::ServerFactoryContext& context() const { return context_; } private: void onError(); envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_; Server::Configuration::ServerFactoryContext& context_; - Http::AsyncClient::Request* active_request_{}; + Http::AsyncClient::OngoingRequest* active_request_{}; RequestCallbacks* callbacks_{}; }; diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index 76a73e5e508e..365814a151c7 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -40,7 +40,7 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, CallbackState next_state) { ENVOY_LOG(debug, "Finish external processing call"); - filter_.logGrpcStreamInfo(); + filter_.logStreamInfo(); stopMessageTimer(); diff --git a/test/extensions/filters/http/ext_proc/client_test.cc b/test/extensions/filters/http/ext_proc/client_test.cc index e0cd216872dc..3010109d4a24 100644 --- a/test/extensions/filters/http/ext_proc/client_test.cc +++ b/test/extensions/filters/http/ext_proc/client_test.cc @@ -65,7 +65,7 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback void onGrpcError(Grpc::Status::GrpcStatus status) override { grpc_status_ = status; } void onGrpcClose() override { grpc_closed_ = true; } - void logGrpcStreamInfo() override {} + void logStreamInfo() override {} void onComplete(envoy::service::ext_proc::v3::ProcessingResponse&) override {} void onError() override {} diff --git a/test/extensions/filters/http/ext_proc/http_client/BUILD b/test/extensions/filters/http/ext_proc/http_client/BUILD index 550720ea63df..45e2d0588ecd 100644 --- a/test/extensions/filters/http/ext_proc/http_client/BUILD +++ b/test/extensions/filters/http/ext_proc/http_client/BUILD @@ -39,6 +39,7 @@ envoy_extension_cc_test( deps = [ "//source/extensions/filters/http/ext_proc:config", "//test/common/http:common_lib", + "//test/extensions/filters/http/ext_proc:logging_test_filter_lib", "//test/extensions/filters/http/ext_proc:utils_lib", "//test/integration:http_protocol_integration_lib", "//test/test_common:utility_lib", diff --git a/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc b/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc index 1b8e9899fc27..4b21a3b5ae2b 100644 --- a/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/http_client/ext_proc_http_integration_test.cc @@ -8,6 +8,7 @@ #include "source/extensions/filters/http/ext_proc/ext_proc.h" #include "test/common/http/common.h" +#include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.h" #include "test/extensions/filters/http/ext_proc/utils.h" #include "test/integration/http_protocol_integration.h" #include "test/test_common/utility.h" @@ -42,6 +43,7 @@ struct ConfigOptions { bool failure_mode_allow = false; int64_t timeout = 900000000; std::string cluster = "ext_proc_server_0"; + bool add_log_filter = false; }; struct ExtProcHttpTestParams { @@ -122,6 +124,18 @@ class ExtProcHttpClientIntegrationTest : public testing::TestWithParamPackFrom(proto_config_); config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter)); } + + if (config_option.add_log_filter) { + test::integration::filters::LoggingTestFilterConfig logging_filter_config; + logging_filter_config.set_logging_id(ext_proc_filter_name); + logging_filter_config.set_upstream_cluster_name(config_option.cluster); + logging_filter_config.set_check_received_bytes(true); + envoy::extensions::filters::network::http_connection_manager::v3::HttpFilter logging_filter; + logging_filter.set_name("logging-test-filter"); + logging_filter.mutable_typed_config()->PackFrom(logging_filter_config); + + config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(logging_filter)); + } }); setUpstreamProtocol(GetParam().upstream_protocol); @@ -486,6 +500,43 @@ TEST_P(ExtProcHttpClientIntegrationTest, WrongClusterConfigWithFailOpen) { verifyDownstreamResponse(*response, 200); } +// Using logging filter to test stats in onSuccess case. +TEST_P(ExtProcHttpClientIntegrationTest, StatsTestOnSuccess) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + ConfigOptions config_option = {}; + config_option.add_log_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest( + [](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("foo"), "yes"); }); + + // The side stream get the request and sends back the response. + processRequestHeadersMessage(http_side_upstreams_[0], true, absl::nullopt); + + // The request is sent to the upstream. + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("foo", "yes")); + + upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true); + verifyDownstreamResponse(*response, 200); +} + +// Using logging filter to test stats in onFailure case. +TEST_P(ExtProcHttpClientIntegrationTest, StatsTestOnFailure) { + proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); + ConfigOptions config_option = {}; + config_option.add_log_filter = true; + initializeConfig(config_option); + HttpIntegrationTest::initialize(); + auto response = sendDownstreamRequest(absl::nullopt); + + processRequestHeadersMessage( + http_side_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse&) { return true; }, + true); + + verifyDownstreamResponse(*response, 500); +} + } // namespace } // namespace ExternalProcessing } // namespace HttpFilters diff --git a/test/extensions/filters/http/ext_proc/logging_test_filter.cc b/test/extensions/filters/http/ext_proc/logging_test_filter.cc index 702bd61804d0..b3bb9a7ebdd2 100644 --- a/test/extensions/filters/http/ext_proc/logging_test_filter.cc +++ b/test/extensions/filters/http/ext_proc/logging_test_filter.cc @@ -39,6 +39,7 @@ class LoggingTestFilter : public Http::PassThroughFilter { } ASSERT_TRUE(ext_proc_logging_info->upstreamHost() != nullptr); EXPECT_EQ(ext_proc_logging_info->upstreamHost()->cluster().name(), expected_cluster_name_); + EXPECT_EQ(ext_proc_logging_info->clusterInfo()->name(), expected_cluster_name_); } } diff --git a/test/extensions/filters/http/ext_proc/mock_server.h b/test/extensions/filters/http/ext_proc/mock_server.h index 1adebbdf39c5..918b6d4dcc0d 100644 --- a/test/extensions/filters/http/ext_proc/mock_server.h +++ b/test/extensions/filters/http/ext_proc/mock_server.h @@ -22,6 +22,8 @@ class MockClient : public ExternalProcessorClient { (envoy::service::ext_proc::v3::ProcessingRequest&&, bool, const uint64_t, RequestCallbacks*, StreamBase*)); MOCK_METHOD(void, cancel, ()); + + MOCK_METHOD(const Envoy::StreamInfo::StreamInfo*, getStreamInfo, (), (const)); }; class MockStream : public ExternalProcessorStream {