From 4fa73a88c97c87e43a397cb39e5a68bc966b34b2 Mon Sep 17 00:00:00 2001 From: code Date: Thu, 19 Dec 2024 21:56:04 +0800 Subject: [PATCH] filter chain: prefer vector over list for filter chain (#37665) Commit Message: filter chain: prefer vector over list for filter chain Additional Description: Use vector to store the filter chain rather than list. Note, we never require to insert/remove element in the intermediate node of filter chain container. In this scenario, vector is much more memory-friend than list. Risk Level: mid. Testing: n/a. Docs Changes: n/a. Release Notes: n/a. Platform Specific Features: n/a. --------- Signed-off-by: wangbaiping(wbpcode) --- source/common/http/filter_manager.cc | 58 +++++++------ source/common/http/filter_manager.h | 122 +++++++++++++++------------ 2 files changed, 101 insertions(+), 79 deletions(-) diff --git a/source/common/http/filter_manager.cc b/source/common/http/filter_manager.cc index a253eb3300e5..cc4fb77ec55b 100644 --- a/source/common/http/filter_manager.cc +++ b/source/common/http/filter_manager.cc @@ -20,12 +20,10 @@ namespace Http { namespace { -template using FilterList = std::list>; - // Shared helper for recording the latest filter used. -template -void recordLatestDataFilter(const typename FilterList::iterator current_filter, - T*& latest_filter, const FilterList& filters) { +template +void recordLatestDataFilter(typename Filters::Iterator current_filter, + typename Filters::Element*& latest_filter, Filters& filters) { // If this is the first time we're calling onData, just record the current filter. if (latest_filter == nullptr) { latest_filter = current_filter->get(); @@ -520,8 +518,7 @@ void FilterManager::applyFilterFactoryCb(FilterContext context, FilterFactoryCb& factory(callbacks); } -void FilterManager::maybeContinueDecoding( - const std::list::iterator& continue_data_entry) { +void FilterManager::maybeContinueDecoding(StreamDecoderFilters::Iterator continue_data_entry) { if (continue_data_entry != decoder_filters_.end()) { // We use the continueDecoding() code since it will correctly handle not calling // decodeHeaders() again. Fake setting StopSingleIteration since the continueDecoding() code @@ -536,9 +533,9 @@ void FilterManager::maybeContinueDecoding( void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers, bool end_stream) { // Headers filter iteration should always start with the next filter if available. - std::list::iterator entry = + StreamDecoderFilters::Iterator entry = commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext); - std::list::iterator continue_data_entry = decoder_filters_.end(); + StreamDecoderFilters::Iterator continue_data_entry = decoder_filters_.end(); bool terminal_filter_decoded_end_stream = false; ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end() || (*entry)->end_stream_); @@ -649,8 +646,7 @@ void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instan auto trailers_added_entry = decoder_filters_.end(); const bool trailers_exists_at_start = filter_manager_callbacks_.requestTrailers().has_value(); // Filter iteration may start at the current filter. - std::list::iterator entry = - commonDecodePrefix(filter, filter_iteration_start_state); + StreamDecoderFilters::Iterator entry = commonDecodePrefix(filter, filter_iteration_start_state); bool terminal_filter_decoded_end_stream = false; ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end() || (*entry)->end_stream_); @@ -801,7 +797,7 @@ void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTra } // Filter iteration may start at the current filter. - std::list::iterator entry = + StreamDecoderFilters::Iterator entry = commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent); bool terminal_filter_reached = false; ASSERT(!state_.decoder_filter_chain_complete_ || entry == decoder_filters_.end()); @@ -845,7 +841,7 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa filter_manager_callbacks_.resetIdleTimer(); // Filter iteration may start at the current filter. - std::list::iterator entry = + StreamDecoderFilters::Iterator entry = commonDecodePrefix(filter, FilterIterationStartState::CanStartFromCurrent); ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeMetadata)); @@ -897,7 +893,7 @@ void FilterManager::decodeMetadata(ActiveStreamDecoderFilter* filter, MetadataMa void FilterManager::disarmRequestTimeout() { filter_manager_callbacks_.disarmRequestTimeout(); } -std::list::iterator +StreamEncoderFilters::Iterator FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream, FilterIterationStartState filter_iteration_start_state) { // Only do base state setting on the initial call. Subsequent calls for filtering do not touch @@ -931,7 +927,7 @@ FilterManager::commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_st return std::next(filter->entry()); } -std::list::iterator +StreamDecoderFilters::Iterator FilterManager::commonDecodePrefix(ActiveStreamDecoderFilter* filter, FilterIterationStartState filter_iteration_start_state) { if (!filter) { @@ -1184,7 +1180,7 @@ void FilterManager::encode1xxHeaders(ActiveStreamEncoderFilter* filter, // end-stream, and because there are normal headers coming there's no need for // complex continuation logic. // 100-continue filter iteration should always start with the next filter if available. - std::list::iterator entry = + StreamEncoderFilters::Iterator entry = commonEncodePrefix(filter, false, FilterIterationStartState::AlwaysStartFromNext); for (; entry != encoder_filters_.end(); entry++) { ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_); @@ -1210,8 +1206,7 @@ void FilterManager::encode1xxHeaders(ActiveStreamEncoderFilter* filter, filter_manager_callbacks_.encode1xxHeaders(headers); } -void FilterManager::maybeContinueEncoding( - const std::list::iterator& continue_data_entry) { +void FilterManager::maybeContinueEncoding(StreamEncoderFilters::Iterator continue_data_entry) { if (continue_data_entry != encoder_filters_.end()) { // We use the continueEncoding() code since it will correctly handle not calling // encodeHeaders() again. Fake setting StopSingleIteration since the continueEncoding() code @@ -1232,9 +1227,9 @@ void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHea disarmRequestTimeout(); // Headers filter iteration should always start with the next filter if available. - std::list::iterator entry = + StreamEncoderFilters::Iterator entry = commonEncodePrefix(filter, end_stream, FilterIterationStartState::AlwaysStartFromNext); - std::list::iterator continue_data_entry = encoder_filters_.end(); + StreamEncoderFilters::Iterator continue_data_entry = encoder_filters_.end(); for (; entry != encoder_filters_.end(); entry++) { ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_); @@ -1318,7 +1313,7 @@ void FilterManager::encodeMetadata(ActiveStreamEncoderFilter* filter, MetadataMapPtr&& metadata_map_ptr) { filter_manager_callbacks_.resetIdleTimer(); - std::list::iterator entry = + StreamEncoderFilters::Iterator entry = commonEncodePrefix(filter, false, FilterIterationStartState::CanStartFromCurrent); for (; entry != encoder_filters_.end(); entry++) { @@ -1405,9 +1400,9 @@ void FilterManager::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instan filter_manager_callbacks_.resetIdleTimer(); // Filter iteration may start at the current filter. - std::list::iterator entry = + StreamEncoderFilters::Iterator entry = commonEncodePrefix(filter, end_stream, filter_iteration_start_state); - auto trailers_added_entry = encoder_filters_.end(); + StreamEncoderFilters::Iterator trailers_added_entry = encoder_filters_.end(); const bool trailers_exists_at_start = filter_manager_callbacks_.responseTrailers().has_value(); for (; entry != encoder_filters_.end(); entry++) { @@ -1479,7 +1474,7 @@ void FilterManager::encodeTrailers(ActiveStreamEncoderFilter* filter, filter_manager_callbacks_.resetIdleTimer(); // Filter iteration may start at the current filter. - std::list::iterator entry = + StreamEncoderFilters::Iterator entry = commonEncodePrefix(filter, true, FilterIterationStartState::CanStartFromCurrent); for (; entry != encoder_filters_.end(); entry++) { ENVOY_EXECUTION_SCOPE(trackedStream(), &(*entry)->filter_context_); @@ -1664,6 +1659,19 @@ FilterManager::createFilterChain(const FilterChainFactory& filter_chain_factory) return state_.create_chain_result_; } + // After the filter chain creation is completed, the filter chain containers will not be + // modified. So, we can safely complete the initialization of the filter chain now. + Cleanup initialize_filter_chain([this]() { + for (auto iter = decoder_filters_.begin(); iter != decoder_filters_.end(); ++iter) { + (*iter)->entry_ = iter; + } + for (auto iter = encoder_filters_.begin(); iter != encoder_filters_.end(); ++iter) { + (*iter)->entry_ = iter; + } + }); + + // TODO(wbpcode): reserve memory for filters to avoid frequent reallocation. + OptRef downstream_callbacks = filter_manager_callbacks_.downstreamCallbacks(); @@ -1918,7 +1926,7 @@ void FilterManager::resetStream(StreamResetReason reason, } bool FilterManager::isTerminalDecoderFilter(const ActiveStreamDecoderFilter& filter) const { - return !decoder_filters_.empty() && decoder_filters_.back().get() == &filter; + return !decoder_filters_.entries_.empty() && decoder_filters_.entries_.back().get() == &filter; } void ActiveStreamFilterBase::resetStream(Http::StreamResetReason reset_reason, diff --git a/source/common/http/filter_manager.h b/source/common/http/filter_manager.h index 834b1397a3fc..db1fadb2da5a 100644 --- a/source/common/http/filter_manager.h +++ b/source/common/http/filter_manager.h @@ -35,6 +35,10 @@ class FilterManager; class DownstreamFilterManager; struct ActiveStreamFilterBase; +struct ActiveStreamDecoderFilter; +struct ActiveStreamEncoderFilter; +using ActiveStreamDecoderFilterPtr = std::unique_ptr; +using ActiveStreamEncoderFilterPtr = std::unique_ptr; constexpr absl::string_view LocalReplyFilterStateKey = "envoy.filters.network.http_connection_manager.local_reply_owner"; @@ -55,6 +59,46 @@ class LocalReplyOwnerObject : public StreamInfo::FilterState::Object { const std::string filter_config_name_; }; +// TODO(wbpcode): Rather than allocating every filter with an unique pointer, we could +// construct the filter in place in the vector. This should reduce the heap allocation and +// memory fragmentation. + +// HTTP decoder filters. If filters are configured in the following order (assume all three +// filters are both decoder/encoder filters): +// http_filters: +// - A +// - B +// - C +// The decoder filter chain will iterate through filters A, B, C. +struct StreamDecoderFilters { + using Element = ActiveStreamDecoderFilter; + using Iterator = std::vector::iterator; + + Iterator begin() { return entries_.begin(); } + Iterator end() { return entries_.end(); } + + std::vector entries_; +}; + +// HTTP encoder filters. If filters are configured in the following order (assume all three +// filters are both decoder/encoder filters): +// http_filters: +// - A +// - B +// - C +// Unlike the decoder filter, the encoder filter chain will iterate with the +// reverse order of the configured filters, i.e., C, B, A. This is why we use reverse_iterator +// here. +struct StreamEncoderFilters { + using Element = ActiveStreamEncoderFilter; + using Iterator = std::vector::reverse_iterator; + + Iterator begin() { return entries_.rbegin(); } + Iterator end() { return entries_.rend(); } + + std::vector entries_; +}; + /** * Base class wrapper for both stream encoder and decoder filters. * @@ -164,7 +208,7 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks, std::unique_ptr saved_request_metadata_{nullptr}; std::unique_ptr saved_response_metadata_{nullptr}; // The state of iteration. - enum class IterationState { + enum class IterationState : uint8_t { Continue, // Iteration has not stopped for any frame type. StopSingleIteration, // Iteration has stopped for headers, 100-continue, or data. StopAllBuffer, // Iteration has stopped for all frame types, and following data should @@ -194,8 +238,7 @@ struct ActiveStreamFilterBase : public virtual StreamFilterCallbacks, * Wrapper for a stream decoder filter. */ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, - public StreamDecoderFilterCallbacks, - LinkedObject { + public StreamDecoderFilterCallbacks { ActiveStreamDecoderFilter(FilterManager& parent, StreamDecoderFilterSharedPtr filter, FilterContext filter_context) : ActiveStreamFilterBase(parent, std::move(filter_context)), handle_(std::move(filter)) { @@ -279,19 +322,18 @@ struct ActiveStreamDecoderFilter : public ActiveStreamFilterBase, // be stopped even if independent half-close is enabled. For simplicity, independent half-close is // enabled only when the terminal filter is encoding the response. void stopDecodingIfNonTerminalFilterEncodedEndStream(bool encoded_end_stream); + StreamDecoderFilters::Iterator entry() const { return entry_; } StreamDecoderFilterSharedPtr handle_; + StreamDecoderFilters::Iterator entry_{}; bool is_grpc_request_{}; }; -using ActiveStreamDecoderFilterPtr = std::unique_ptr; - /** * Wrapper for a stream encoder filter. */ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase, - public StreamEncoderFilterCallbacks, - LinkedObject { + public StreamEncoderFilterCallbacks { ActiveStreamEncoderFilter(FilterManager& parent, StreamEncoderFilterSharedPtr filter, FilterContext filter_context) : ActiveStreamFilterBase(parent, std::move(filter_context)), handle_(std::move(filter)) { @@ -338,12 +380,12 @@ struct ActiveStreamEncoderFilter : public ActiveStreamFilterBase, void responseDataTooLarge(); void responseDataDrained(); + StreamEncoderFilters::Iterator entry() const { return entry_; } StreamEncoderFilterSharedPtr handle_; + StreamEncoderFilters::Iterator entry_{}; }; -using ActiveStreamEncoderFilterPtr = std::unique_ptr; - /** * Callbacks invoked by the FilterManager to pass filter data/events back to the caller. */ @@ -670,33 +712,6 @@ class FilterManager : public ScopeTrackedObject, DUMP_DETAILS(&streamInfo()); } - void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) { - access_log_handlers_.push_back(std::move(handler)); - } - void addStreamDecoderFilter(ActiveStreamDecoderFilterPtr filter) { - // Note: configured decoder filters are appended to decoder_filters_. - // This means that if filters are configured in the following order (assume all three filters - // are both decoder/encoder filters): - // http_filters: - // - A - // - B - // - C - // The decoder filter chain will iterate through filters A, B, C. - LinkedList::moveIntoListBack(std::move(filter), decoder_filters_); - } - void addStreamEncoderFilter(ActiveStreamEncoderFilterPtr filter) { - // Note: configured encoder filters are prepended to encoder_filters_. - // This means that if filters are configured in the following order (assume all three filters - // are both decoder/encoder filters): - // http_filters: - // - A - // - B - // - C - // The encoder filter chain will iterate through filters C, B, A. - LinkedList::moveIntoList(std::move(filter), encoder_filters_); - } - void addStreamFilterBase(StreamFilterBase* filter) { filters_.push_back(filter); } - // FilterChainManager void applyFilterFactoryCb(FilterContext context, FilterFactoryCb& factory) override; @@ -971,29 +986,30 @@ class FilterManager : public ScopeTrackedObject, : manager_(manager), context_(context) {} void addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr filter) override { - manager_.addStreamFilterBase(filter.get()); - manager_.addStreamDecoderFilter( + manager_.filters_.push_back(filter.get()); + + manager_.decoder_filters_.entries_.emplace_back( std::make_unique(manager_, std::move(filter), context_)); } void addStreamEncoderFilter(Http::StreamEncoderFilterSharedPtr filter) override { - manager_.addStreamFilterBase(filter.get()); - manager_.addStreamEncoderFilter( + manager_.filters_.push_back(filter.get()); + + manager_.encoder_filters_.entries_.emplace_back( std::make_unique(manager_, std::move(filter), context_)); } void addStreamFilter(Http::StreamFilterSharedPtr filter) override { - StreamDecoderFilter* decoder_filter = filter.get(); - manager_.addStreamFilterBase(decoder_filter); + manager_.filters_.push_back(filter.get()); - manager_.addStreamDecoderFilter( + manager_.decoder_filters_.entries_.emplace_back( std::make_unique(manager_, filter, context_)); - manager_.addStreamEncoderFilter( + manager_.encoder_filters_.entries_.emplace_back( std::make_unique(manager_, std::move(filter), context_)); } void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) override { - manager_.addAccessLogHandler(std::move(handler)); + manager_.access_log_handlers_.push_back(std::move(handler)); } Event::Dispatcher& dispatcher() override { return manager_.dispatcher_; } @@ -1022,11 +1038,11 @@ class FilterManager : public ScopeTrackedObject, const FilterChainOptionsImpl& options); // Returns the encoder filter to start iteration with. - std::list::iterator + StreamEncoderFilters::Iterator commonEncodePrefix(ActiveStreamEncoderFilter* filter, bool end_stream, FilterIterationStartState filter_iteration_start_state); // Returns the decoder filter to start iteration with. - std::list::iterator + StreamDecoderFilters::Iterator commonDecodePrefix(ActiveStreamDecoderFilter* filter, FilterIterationStartState filter_iteration_start_state); void addDecodedData(ActiveStreamDecoderFilter& filter, Buffer::Instance& data, bool streaming); @@ -1034,8 +1050,7 @@ class FilterManager : public ScopeTrackedObject, MetadataMapVector& addDecodedMetadata(); // Helper function for the case where we have a header only request, but a filter adds a body // to it. - void maybeContinueDecoding( - const std::list::iterator& maybe_continue_data_entry); + void maybeContinueDecoding(StreamDecoderFilters::Iterator maybe_continue_data_entry); void decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers, bool end_stream); // Sends data through decoding filter chains. filter_iteration_start_state indicates which // filter to start the iteration with. @@ -1049,8 +1064,7 @@ class FilterManager : public ScopeTrackedObject, // As with most of the encode functions, this runs encodeHeaders on various // filters before calling encodeHeadersInternal which does final header munging and passes the // headers to the encoder. - void maybeContinueEncoding( - const std::list::iterator& maybe_continue_data_entry); + void maybeContinueEncoding(StreamEncoderFilters::Iterator maybe_continue_data_entry); void encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers, bool end_stream); // Sends data through encoding filter chains. filter_iteration_start_state indicates which @@ -1090,9 +1104,9 @@ class FilterManager : public ScopeTrackedObject, Buffer::BufferMemoryAccountSharedPtr account_; const bool proxy_100_continue_; - std::list decoder_filters_; - std::list encoder_filters_; - std::list filters_; + StreamDecoderFilters decoder_filters_; + StreamEncoderFilters encoder_filters_; + std::vector filters_; AccessLog::InstanceSharedPtrVector access_log_handlers_; // Stores metadata added in the decoding filter that is being processed. Will be cleared before