Skip to content

Commit

Permalink
generic proxy: enhancement the observability of generic proxy (envoyp…
Browse files Browse the repository at this point in the history
…roxy#31558)

* generic proxy: enhancement the observability of generic proxy

This is a big enhancement to the observability of generic proxy. Although we already have full featured access logging support now, but the generic proxy actually doesn't populate some data (like response code, response flag, response code details, various durations). This patch complete this puzzle piece.

And this patch also provide more stats. Like the stats based on the response code or response flag.

Signed-off-by: wbpcode <[email protected]>
  • Loading branch information
code authored Jan 7, 2024
1 parent 12210e5 commit be4b266
Show file tree
Hide file tree
Showing 18 changed files with 617 additions and 156 deletions.
4 changes: 4 additions & 0 deletions contrib/generic_proxy/filters/network/source/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,14 @@ envoy_cc_library(

envoy_cc_library(
name = "stats_lib",
srcs = ["stats.cc"],
hdrs = ["stats.h"],
deps = [
"//envoy/server:factory_context_interface",
"//envoy/stats:stats_interface",
"//envoy/stats:stats_macros",
"//source/common/stats:symbol_table_lib",
"//source/common/stream_info:utility_lib",
],
)

Expand Down
15 changes: 11 additions & 4 deletions contrib/generic_proxy/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NetworkFilters {
namespace GenericProxy {

SINGLETON_MANAGER_REGISTRATION(generic_route_config_provider_manager);
SINGLETON_MANAGER_REGISTRATION(generic_proxy_code_or_flag_stats);

std::pair<CodecFactoryPtr, ProxyFactoryPtr>
Factory::factoriesFromProto(const envoy::config::core::v3::TypedExtensionConfig& codec_config,
Expand Down Expand Up @@ -101,6 +102,12 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config,
return std::make_shared<RouteConfigProviderManagerImpl>(server_context.admin());
});

// Pinned singleton and we needn't to keep the shared_ptr.
std::shared_ptr<CodeOrFlags> code_or_flags =
server_context.singletonManager().getTyped<CodeOrFlags>(
SINGLETON_MANAGER_REGISTERED_NAME(generic_proxy_code_or_flag_stats),
[&server_context] { return std::make_shared<CodeOrFlags>(server_context); }, true);

auto tracer_manager = Tracing::TracerManagerImpl::singleton(context);

auto factories = factoriesFromProto(proto_config.codec_config(), context);
Expand Down Expand Up @@ -131,18 +138,18 @@ Factory::createFilterFactoryFromProtoTyped(const ProxyConfig& proto_config,
routeConfigProviderFromProto(proto_config, context, *route_config_provider_manager),
filtersFactoryFromProto(proto_config.filters(), proto_config.codec_config(), stat_prefix,
context),
std::move(tracer), std::move(tracing_config), std::move(access_logs), context);
std::move(tracer), std::move(tracing_config), std::move(access_logs), *code_or_flags,
context);

return [route_config_provider_manager, tracer_manager, config, &server_context,
return [route_config_provider_manager, tracer_manager, config, &context,
custom_proxy_factory](Envoy::Network::FilterManager& filter_manager) -> void {
// Create filter by the custom filter factory if the custom filter factory is not null.
if (custom_proxy_factory != nullptr) {
custom_proxy_factory->createProxy(filter_manager, config);
return;
}

filter_manager.addReadFilter(std::make_shared<Filter>(
config, server_context.mainThreadDispatcher().timeSource(), server_context.runtime()));
filter_manager.addReadFilter(std::make_shared<Filter>(config, context));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ class ServerCodec {
/**
* Create a response frame with specified status and flags.
* @param status status of the response.
* @param short_response_flags short flags of the response.
* @param data any data that generic proxy filter wants to tell the codec.
* @param request origin request that the response is created for.
*/
virtual ResponsePtr respond(Status status, absl::string_view short_response_flags,
const Request& request) PURE;
virtual ResponsePtr respond(Status status, absl::string_view data, const Request& request) PURE;
};

/**
Expand Down
20 changes: 14 additions & 6 deletions contrib/generic_proxy/filters/network/source/interface/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ class StreamFilterCallbacks {
*/
virtual const CodecFactory& downstreamCodec() PURE;

/**
* Reset the underlying stream.
*/
virtual void resetStream() PURE;

/**
* @return const RouteEntry* cached route entry for current request.
*/
Expand Down Expand Up @@ -88,7 +83,20 @@ class StreamFilterCallbacks {

class DecoderFilterCallback : public virtual StreamFilterCallbacks {
public:
virtual void sendLocalReply(Status status, ResponseUpdateFunction&& cb = nullptr) PURE;
/**
* Send local reply directly to the downstream for the current request. Note encoder filters
* will be skipped for the local reply for now.
* @param status supplies the protocol independent response status to the codec to create
* actual response frame or message. Note the actual response code may be different with code
* in the status. For example, if the status is Protocol::Status::Ok, the actual response code
* may be 200 for HTTP/1.1 or 20 for Dubbo.
* The status message will be used as response code details and could be logged.
* @param data supplies the additional data to the codec to create actual response frame or
* message. This could be anything and is optional.
* @param cb supplies the callback to update the response. This is optional and could be nullptr.
*/
virtual void sendLocalReply(Status status, absl::string_view data = {},
ResponseUpdateFunction cb = {}) PURE;

virtual void continueDecoding() PURE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class FilterConfig : public FilterChainFactory {
*/
virtual GenericFilterStats& stats() PURE;

/**
* @return code or flags stats name to use.
*/
virtual const CodeOrFlags& codeOrFlags() const PURE;

/**
* @return const std::vector<AccessLogInstanceSharedPtr>& access logs.
*/
Expand Down
80 changes: 64 additions & 16 deletions contrib/generic_proxy/filters/network/source/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,36 @@ Tracing::Decision tracingDecision(const Tracing::ConnectionManagerTracingConfig&
return {Tracing::Reason::NotTraceable, false};
}

StreamInfo::ResponseFlag
responseFlagFromDownstreamReasonReason(DownstreamStreamResetReason reason) {
switch (reason) {
case DownstreamStreamResetReason::ConnectionTermination:
return StreamInfo::ResponseFlag::DownstreamConnectionTermination;
case DownstreamStreamResetReason::LocalConnectionTermination:
return StreamInfo::ResponseFlag::LocalReset;
case DownstreamStreamResetReason::ProtocolError:
return StreamInfo::ResponseFlag::DownstreamProtocolError;
}
PANIC("Unknown reset reason");
}

} // namespace

ActiveStream::ActiveStream(Filter& parent, StreamRequestPtr request)
: parent_(parent), request_stream_(std::move(request)),
request_stream_end_(request_stream_->frameFlags().endStream()),
stream_info_(parent_.time_source_,
parent_.callbacks_->connection().connectionInfoProviderSharedPtr()),
request_timer_(new Stats::HistogramCompletableTimespanImpl(parent_.stats_.request_time_ms_,
parent_.time_source_)) {
parent_.callbacks_->connection().connectionInfoProviderSharedPtr()) {
if (!request_stream_end_) {
// If the request is not fully received, register the stream to the frame handler map.
parent_.registerFrameHandler(requestStreamId(), this);
registered_in_frame_handlers_ = true;
} else {
// The request is fully received.
stream_info_.downstreamTiming().onLastDownstreamRxByteReceived(parent_.time_source_);
}

parent_.stats_.request_.inc();
parent_.stats_.request_active_.inc();
parent_.stats_helper_.onRequest();

connection_manager_tracing_config_ = parent_.config_->tracingConfig();

Expand Down Expand Up @@ -95,18 +108,23 @@ Envoy::Event::Dispatcher& ActiveStream::dispatcher() {
return parent_.downstreamConnection().dispatcher();
}
const CodecFactory& ActiveStream::downstreamCodec() { return parent_.config_->codecFactory(); }
void ActiveStream::resetStream() {
void ActiveStream::resetStream(DownstreamStreamResetReason reason) {
if (active_stream_reset_) {
return;
}
active_stream_reset_ = true;

parent_.stats_helper_.onRequestReset();
stream_info_.setResponseFlag(responseFlagFromDownstreamReasonReason(reason));

parent_.deferredStream(*this);
}

void ActiveStream::sendResponseStartToDownstream() {
ASSERT(response_stream_ != nullptr);
response_filter_chain_complete_ = true;

// The first frame of response is sent.
stream_info_.downstreamTiming().onFirstDownstreamTxByteSent(parent_.time_source_);
parent_.sendFrameToDownstream(*response_stream_, *this);
}

Expand Down Expand Up @@ -151,8 +169,9 @@ void ActiveStream::sendRequestFrameToUpstream() {

// TODO(wbpcode): add the short_response_flags support to the sendLocalReply
// method.
void ActiveStream::sendLocalReply(Status status, ResponseUpdateFunction&& func) {
response_stream_ = parent_.server_codec_->respond(status, "", *request_stream_);
void ActiveStream::sendLocalReply(Status status, absl::string_view data,
ResponseUpdateFunction func) {
response_stream_ = parent_.server_codec_->respond(status, data, *request_stream_);
response_stream_frames_.clear();
// Only one frame is allowed in the local reply.
response_stream_end_ = true;
Expand All @@ -163,6 +182,12 @@ void ActiveStream::sendLocalReply(Status status, ResponseUpdateFunction&& func)
func(*response_stream_);
}

local_reply_ = true;
// status message will be used as response code details.
stream_info_.setResponseCodeDetails(status.message());
// Set the response code to the stream info.
stream_info_.setResponseCode(response_stream_->status().code());

sendResponseStartToDownstream();
}

Expand All @@ -173,6 +198,13 @@ void ActiveStream::continueDecoding() {

if (cached_route_entry_ == nullptr) {
cached_route_entry_ = parent_.config_->routeEntry(*request_stream_);
if (cached_route_entry_ != nullptr) {
auto* cluster =
parent_.cluster_manager_.getThreadLocalCluster(cached_route_entry_->clusterName());
if (cluster != nullptr) {
stream_info_.setUpstreamClusterInfo(cluster->info());
}
}
}

ASSERT(request_stream_ != nullptr);
Expand All @@ -197,6 +229,9 @@ void ActiveStream::onRequestFrame(StreamFramePtr frame) {

ASSERT(registered_in_frame_handlers_);
if (request_stream_end_) {
// The request is fully received.
stream_info_.downstreamTiming().onLastDownstreamRxByteReceived(parent_.time_source_);

// If the request is fully received, remove the stream from the
// frame handler map.
parent_.unregisterFrameHandler(requestStreamId());
Expand All @@ -208,9 +243,16 @@ void ActiveStream::onRequestFrame(StreamFramePtr frame) {
}

void ActiveStream::onResponseStart(ResponsePtr response) {
ASSERT(response_stream_ == nullptr);
response_stream_ = std::move(response);
ASSERT(response_stream_ != nullptr);
response_stream_end_ = response_stream_->frameFlags().endStream();
parent_.stream_drain_decision_ = response_stream_->frameFlags().streamFlags().drainClose();

// The response code details always be "via_upstream" for response from upstream.
stream_info_.setResponseCodeDetails("via_upstream");
// Set the response code to the stream info.
stream_info_.setResponseCode(response_stream_->status().code());
continueEncoding();
}

Expand Down Expand Up @@ -260,12 +302,14 @@ void ActiveStream::onEncodingSuccess(Buffer::Instance& buffer, bool end_stream)
return;
}

// The response is fully sent.
stream_info_.downstreamTiming().onLastDownstreamTxByteSent(parent_.time_source_);

ENVOY_LOG(debug, "Generic proxy: downstream response complete");

ASSERT(response_stream_end_);
ASSERT(response_stream_frames_.empty());

parent_.stats_.response_.inc();
parent_.deferredStream(*this);
}

Expand All @@ -284,8 +328,12 @@ void ActiveStream::completeRequest() {

stream_info_.onRequestComplete();

request_timer_->complete();
parent_.stats_.request_active_.dec();
bool error_reply = false;
// This response frame may be nullptr if the request is one-way.
if (response_stream_ != nullptr) {
error_reply = !response_stream_->status().ok();
}
parent_.stats_helper_.onRequestComplete(stream_info_, local_reply_, error_reply);

if (active_span_) {
TraceContextBridge trace_context{*request_stream_};
Expand Down Expand Up @@ -342,9 +390,9 @@ void Filter::onDecodingSuccess(StreamFramePtr request) {
}

void Filter::onDecodingFailure() {
stats_.request_decoding_error_.inc();
stats_helper_.onRequestDecodingError();

resetStreamsForUnexpectedError();
resetDownstreamAllStreams(DownstreamStreamResetReason::ProtocolError);
closeDownstreamConnection();
}

Expand Down Expand Up @@ -402,9 +450,9 @@ void Filter::deferredStream(ActiveStream& stream) {
mayBeDrainClose();
}

void Filter::resetStreamsForUnexpectedError() {
void Filter::resetDownstreamAllStreams(DownstreamStreamResetReason reason) {
while (!active_streams_.empty()) {
active_streams_.front()->resetStream();
active_streams_.front()->resetStream(reason);
}
}

Expand Down
Loading

0 comments on commit be4b266

Please sign in to comment.