Skip to content

Commit

Permalink
ext_proc http bytes tracking support (envoyproxy#37297)
Browse files Browse the repository at this point in the history
This is a follow up PR of:
envoyproxy#35740.

This PR added the bytes tracking support for ext_proc http service,
which basically is counting how many bytes are sent to and received from
ext_proc HTTP service.

It takes the stream_info from the active HTTP request, and saved in the
ext_proc filter. When the external processing completes, the bytes
counters from the stream_info are retrieved and stored into the filter
state.

The existing logging test filter is leveraged to test the byte counters
in the filter state are correctly updated.

---------

Signed-off-by: Yanjun Xiang <[email protected]>
  • Loading branch information
yanjunxiang-google authored Dec 6, 2024
1 parent 10bbd32 commit 57fa0e0
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 37 deletions.
2 changes: 1 addition & 1 deletion source/extensions/filters/http/ext_proc/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ExternalProcessorCallbacks : public RequestCallbacks {
std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>&& response) PURE;
virtual void onGrpcError(Grpc::Status::GrpcStatus error) PURE;
virtual void onGrpcClose() PURE;
virtual void logGrpcStreamInfo() PURE;
virtual void logStreamInfo() PURE;
};

class ExternalProcessorClient : public ClientBase {
Expand Down
2 changes: 2 additions & 0 deletions source/extensions/filters/http/ext_proc/client_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <memory>

#include "envoy/service/ext_proc/v3/external_processor.pb.h"
#include "envoy/stream_info/stream_info.h"

namespace Envoy {
namespace Extensions {
Expand Down Expand Up @@ -37,6 +38,7 @@ class ClientBase {
bool end_stream, const uint64_t stream_id, RequestCallbacks* callbacks,
StreamBase* stream) PURE;
virtual void cancel() PURE;
virtual const Envoy::StreamInfo::StreamInfo* getStreamInfo() const PURE;
};

using ClientBasePtr = std::unique_ptr<ClientBase>;
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/ext_proc/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void ExternalProcessorStreamImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
return;
}

callbacks_->logGrpcStreamInfo();
callbacks_->logStreamInfo();
if (status == Grpc::Status::Ok) {
callbacks_->onGrpcClose();
} else {
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/ext_proc/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ExternalProcessorClientImpl : public ExternalProcessorClient {
const uint64_t stream_id, RequestCallbacks* callbacks,
StreamBase* stream) override;
void cancel() override {}
const Envoy::StreamInfo::StreamInfo* getStreamInfo() const override { return nullptr; }

private:
Grpc::AsyncClientManager& client_manager_;
Expand Down
45 changes: 28 additions & 17 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,6 @@ Filter::StreamOpenState Filter::openStream() {

stream_ = config_->threadLocalStreamManager().store(std::move(stream_object), config_->stats(),
config_->deferredCloseTimeout());
// For custom access logging purposes. Applicable only for Envoy gRPC as Google gRPC does not
// have a proper implementation of streamInfo.
if (grpc_service_.has_envoy_grpc() && logging_info_ != nullptr) {
logging_info_->setClusterInfo(stream_->streamInfo().upstreamClusterInfo());
}
}
return StreamOpenState::Ok;
}
Expand Down Expand Up @@ -1039,21 +1034,37 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers
stats_.stream_msgs_sent_.inc();
}

void Filter::logGrpcStreamInfo() {
void Filter::logStreamInfoBase(const Envoy::StreamInfo::StreamInfo* stream_info) {
if (stream_info == nullptr || logging_info_ == nullptr) {
return;
}

const auto& upstream_meter = stream_info->getUpstreamBytesMeter();
if (upstream_meter != nullptr) {
logging_info_->setBytesSent(upstream_meter->wireBytesSent());
logging_info_->setBytesReceived(upstream_meter->wireBytesReceived());
}
// Only set upstream host in logging info once.
if (logging_info_->upstreamHost() == nullptr) {
logging_info_->setUpstreamHost(stream_info->upstreamInfo()->upstreamHost());
}

// Only set cluster info in logging info once.
if (logging_info_->clusterInfo() == nullptr) {
logging_info_->setClusterInfo(stream_info->upstreamClusterInfo());
}
}

void Filter::logStreamInfo() {
if (!config().grpcService().has_value()) {
// HTTP service
logStreamInfoBase(client_->getStreamInfo());
return;
}

if (stream_ != nullptr && logging_info_ != nullptr && grpc_service_.has_envoy_grpc()) {
const auto& upstream_meter = stream_->streamInfo().getUpstreamBytesMeter();
if (upstream_meter != nullptr) {
logging_info_->setBytesSent(upstream_meter->wireBytesSent());
logging_info_->setBytesReceived(upstream_meter->wireBytesReceived());
}
// Only set upstream host in logging info once.
if (logging_info_->upstreamHost() == nullptr) {
logging_info_->setUpstreamHost(stream_->streamInfo().upstreamInfo()->upstreamHost());
}
if (stream_ != nullptr && grpc_service_.has_envoy_grpc()) {
// Envoy gRPC service
logStreamInfoBase(&stream_->streamInfo());
}
}

Expand Down Expand Up @@ -1368,7 +1379,7 @@ void Filter::onGrpcClose() {

void Filter::onMessageTimeout() {
ENVOY_LOG(debug, "message timeout reached");
logGrpcStreamInfo();
logStreamInfo();
stats_.message_timeouts_.inc();
if (config_->failureModeAllow()) {
// The user would like a timeout to not cause message processing to fail.
Expand Down
5 changes: 3 additions & 2 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,

void encodeComplete() override {
if (config_->observabilityMode()) {
logGrpcStreamInfo();
logStreamInfo();
}
}

Expand All @@ -435,7 +435,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
std::unique_ptr<envoy::service::ext_proc::v3::ProcessingResponse>&& response) override;
void onGrpcError(Grpc::Status::GrpcStatus error) override;
void onGrpcClose() override;
void logGrpcStreamInfo() override;
void logStreamInfoBase(const Envoy::StreamInfo::StreamInfo* stream_info);
void logStreamInfo() override;

void onMessageTimeout();
void onNewTimeout(const ProtobufWkt::Duration& override_message_timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ namespace HttpFilters {
namespace ExternalProcessing {

namespace {
Http::RequestMessagePtr buildHttpRequest(absl::string_view uri, const uint64_t stream_id,
absl::string_view req_in_json) {
Http::RequestHeaderMapPtr buildHttpRequestHeaders(absl::string_view uri, const uint64_t stream_id) {
absl::string_view host, path;
Envoy::Http::Utility::extractHostPathFromUri(uri, host, path);
ENVOY_LOG_MISC(debug, " Ext_Proc HTTP client send request to uri {}, host {}, path {}", uri, host,
Expand All @@ -28,10 +27,7 @@ Http::RequestMessagePtr buildHttpRequest(absl::string_view uri, const uint64_t s
{header_values.ContentType, "application/json"},
{header_values.RequestId, std::to_string(stream_id)},
{header_values.Host, std::string(host)}});
Http::RequestMessagePtr message =
std::make_unique<Envoy::Http::RequestMessageImpl>(std::move(headers));
message->body().add(req_in_json);
return message;
return headers;
}

} // namespace
Expand All @@ -47,8 +43,7 @@ void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequ
auto req_in_json = MessageUtil::getJsonStringFromMessage(req);
if (req_in_json.ok()) {
const auto http_uri = config_.http_service().http_service().http_uri();
Http::RequestMessagePtr message =
buildHttpRequest(http_uri.uri(), stream_id, req_in_json.value());
Http::RequestHeaderMapPtr headers = buildHttpRequestHeaders(http_uri.uri(), stream_id);
auto options = Http::AsyncClient::RequestOptions()
.setTimeout(std::chrono::milliseconds(
DurationUtil::durationToMilliseconds(http_uri.timeout())))
Expand All @@ -58,7 +53,11 @@ void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequ
const auto thread_local_cluster = context().clusterManager().getThreadLocalCluster(cluster);
if (thread_local_cluster) {
active_request_ =
thread_local_cluster->httpAsyncClient().send(std::move(message), *this, options);
thread_local_cluster->httpAsyncClient().startRequest(std::move(headers), *this, options);
if (active_request_ != nullptr) {
Buffer::OwnedImpl body(req_in_json.value());
active_request_->sendData(body, true);
}
} else {
ENVOY_LOG(error, "ext_proc cluster {} does not exist in the config", cluster);
}
Expand All @@ -68,7 +67,6 @@ void ExtProcHttpClient::sendRequest(envoy::service::ext_proc::v3::ProcessingRequ
void ExtProcHttpClient::onSuccess(const Http::AsyncClient::Request&,
Http::ResponseMessagePtr&& response) {
auto status = Envoy::Http::Utility::getResponseStatusOrNullopt(response->headers());
active_request_ = nullptr;
if (status.has_value()) {
uint64_t status_code = status.value();
if (status_code == Envoy::enumToInt(Envoy::Http::Code::OK)) {
Expand All @@ -93,6 +91,7 @@ void ExtProcHttpClient::onSuccess(const Http::AsyncClient::Request&,
if (callbacks_) {
callbacks_->onComplete(response_msg);
callbacks_ = nullptr;
active_request_ = nullptr;
}
} else {
ENVOY_LOG(error, "Response status is not OK, status: {}", status_code);
Expand All @@ -110,18 +109,24 @@ void ExtProcHttpClient::onFailure(const Http::AsyncClient::Request&,
ASSERT(reason == Http::AsyncClient::FailureReason::Reset ||
reason == Http::AsyncClient::FailureReason::ExceedResponseBufferLimit);
ENVOY_LOG(error, "Request failed: stream has been reset");
active_request_ = nullptr;
onError();
}

const Envoy::StreamInfo::StreamInfo* ExtProcHttpClient::getStreamInfo() const {
if (active_request_ != nullptr) {
return &active_request_->streamInfo();
} else {
return nullptr;
}
}

void ExtProcHttpClient::onError() {
// Cancel if the request is active.
cancel();
ENVOY_LOG(error, "ext_proc HTTP client error condition happens.");
if (callbacks_) {
callbacks_->onError();
callbacks_ = nullptr;
}
active_request_ = nullptr;
}

void ExtProcHttpClient::cancel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ class ExtProcHttpClient : public ClientBase,
void onFailure(const Http::AsyncClient::Request& request,
Http::AsyncClient::FailureReason reason) override;

const Envoy::StreamInfo::StreamInfo* getStreamInfo() const override;

Server::Configuration::ServerFactoryContext& context() const { return context_; }

private:
void onError();
envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor config_;
Server::Configuration::ServerFactoryContext& context_;
Http::AsyncClient::Request* active_request_{};
Http::AsyncClient::OngoingRequest* active_request_{};
RequestCallbacks* callbacks_{};
};

Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/ext_proc/processor_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::millis
void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status,
CallbackState next_state) {
ENVOY_LOG(debug, "Finish external processing call");
filter_.logGrpcStreamInfo();
filter_.logStreamInfo();

stopMessageTimer();

Expand Down
2 changes: 1 addition & 1 deletion test/extensions/filters/http/ext_proc/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ExtProcStreamTest : public testing::Test, public ExternalProcessorCallback
void onGrpcError(Grpc::Status::GrpcStatus status) override { grpc_status_ = status; }

void onGrpcClose() override { grpc_closed_ = true; }
void logGrpcStreamInfo() override {}
void logStreamInfo() override {}
void onComplete(envoy::service::ext_proc::v3::ProcessingResponse&) override {}
void onError() override {}

Expand Down
1 change: 1 addition & 0 deletions test/extensions/filters/http/ext_proc/http_client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ envoy_extension_cc_test(
deps = [
"//source/extensions/filters/http/ext_proc:config",
"//test/common/http:common_lib",
"//test/extensions/filters/http/ext_proc:logging_test_filter_lib",
"//test/extensions/filters/http/ext_proc:utils_lib",
"//test/integration:http_protocol_integration_lib",
"//test/test_common:utility_lib",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "source/extensions/filters/http/ext_proc/ext_proc.h"

#include "test/common/http/common.h"
#include "test/extensions/filters/http/ext_proc/logging_test_filter.pb.h"
#include "test/extensions/filters/http/ext_proc/utils.h"
#include "test/integration/http_protocol_integration.h"
#include "test/test_common/utility.h"
Expand Down Expand Up @@ -42,6 +43,7 @@ struct ConfigOptions {
bool failure_mode_allow = false;
int64_t timeout = 900000000;
std::string cluster = "ext_proc_server_0";
bool add_log_filter = false;
};

struct ExtProcHttpTestParams {
Expand Down Expand Up @@ -122,6 +124,18 @@ class ExtProcHttpClientIntegrationTest : public testing::TestWithParam<ExtProcHt
ext_proc_filter.mutable_typed_config()->PackFrom(proto_config_);
config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(ext_proc_filter));
}

if (config_option.add_log_filter) {
test::integration::filters::LoggingTestFilterConfig logging_filter_config;
logging_filter_config.set_logging_id(ext_proc_filter_name);
logging_filter_config.set_upstream_cluster_name(config_option.cluster);
logging_filter_config.set_check_received_bytes(true);
envoy::extensions::filters::network::http_connection_manager::v3::HttpFilter logging_filter;
logging_filter.set_name("logging-test-filter");
logging_filter.mutable_typed_config()->PackFrom(logging_filter_config);

config_helper_.prependFilter(MessageUtil::getJsonStringFromMessageOrError(logging_filter));
}
});

setUpstreamProtocol(GetParam().upstream_protocol);
Expand Down Expand Up @@ -486,6 +500,43 @@ TEST_P(ExtProcHttpClientIntegrationTest, WrongClusterConfigWithFailOpen) {
verifyDownstreamResponse(*response, 200);
}

// Using logging filter to test stats in onSuccess case.
TEST_P(ExtProcHttpClientIntegrationTest, StatsTestOnSuccess) {
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
ConfigOptions config_option = {};
config_option.add_log_filter = true;
initializeConfig(config_option);
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequest(
[](Http::HeaderMap& headers) { headers.addCopy(LowerCaseString("foo"), "yes"); });

// The side stream get the request and sends back the response.
processRequestHeadersMessage(http_side_upstreams_[0], true, absl::nullopt);

// The request is sent to the upstream.
handleUpstreamRequest();
EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("foo", "yes"));

upstream_request_->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, true);
verifyDownstreamResponse(*response, 200);
}

// Using logging filter to test stats in onFailure case.
TEST_P(ExtProcHttpClientIntegrationTest, StatsTestOnFailure) {
proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP);
ConfigOptions config_option = {};
config_option.add_log_filter = true;
initializeConfig(config_option);
HttpIntegrationTest::initialize();
auto response = sendDownstreamRequest(absl::nullopt);

processRequestHeadersMessage(
http_side_upstreams_[0], true, [](const HttpHeaders&, HeadersResponse&) { return true; },
true);

verifyDownstreamResponse(*response, 500);
}

} // namespace
} // namespace ExternalProcessing
} // namespace HttpFilters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class LoggingTestFilter : public Http::PassThroughFilter {
}
ASSERT_TRUE(ext_proc_logging_info->upstreamHost() != nullptr);
EXPECT_EQ(ext_proc_logging_info->upstreamHost()->cluster().name(), expected_cluster_name_);
EXPECT_EQ(ext_proc_logging_info->clusterInfo()->name(), expected_cluster_name_);
}
}

Expand Down
2 changes: 2 additions & 0 deletions test/extensions/filters/http/ext_proc/mock_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class MockClient : public ExternalProcessorClient {
(envoy::service::ext_proc::v3::ProcessingRequest&&, bool, const uint64_t,
RequestCallbacks*, StreamBase*));
MOCK_METHOD(void, cancel, ());

MOCK_METHOD(const Envoy::StreamInfo::StreamInfo*, getStreamInfo, (), (const));
};

class MockStream : public ExternalProcessorStream {
Expand Down

0 comments on commit 57fa0e0

Please sign in to comment.