diff --git a/api/envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto b/api/envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto index 66c04acc6426..41403211a2bb 100644 --- a/api/envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto +++ b/api/envoy/extensions/filters/http/ext_proc/v3/processing_mode.proto @@ -36,11 +36,12 @@ message ProcessingMode { // Control how the request and response bodies are handled // When body mutation by external processor is enabled, ext_proc filter will always remove - // the content length header in three cases below because content length can not be guaranteed + // the content length header in four cases below because content length can not be guaranteed // to be set correctly: // 1) STREAMED BodySendMode: header processing completes before body mutation comes back. // 2) BUFFERED_PARTIAL BodySendMode: body is buffered and could be injected in different phases. // 3) BUFFERED BodySendMode + SKIP HeaderSendMode: header processing (e.g., update content-length) is skipped. + // 4) FULL_DUPLEX_STREAMED BodySendMode: header processing completes before body mutation comes back. // // In Envoy's http1 codec implementation, removing content length will enable chunked transfer // encoding whenever feasible. The recipient (either client or server) must be able @@ -68,6 +69,37 @@ message ProcessingMode { // chunk. If the body exceeds the configured buffer limit, then the body contents // up to the buffer limit will be sent. BUFFERED_PARTIAL = 3; + + // [#not-implemented-hide:] + // Envoy streams the body to the server in pieces as they arrive. + // + // 1) The server may choose to buffer any number chunks of data before processing them. + // After it finishes buffering, the server processes the buffered data. Then it splits the processed + // data into any number of chunks, and streams them back to Envoy one by one. + // The server may continuously do so until the complete body is processed. + // The individual response chunk size is recommended to be no greater than 64K bytes, or + // :ref:`max_receive_message_length ` + // if EnvoyGrpc is used. + // + // 2) The server may also choose to buffer the entire message, including the headers (if header mode is + // ``SEND``), the entire body, and the trailers (if present), before sending back any response. + // The server response has to maintain the headers-body-trailers ordering. + // + // 3) Note that the server might also choose not to buffer data. That is, upon receiving a + // body request, it could process the data and send back a body response immediately. + // + // In this body mode: + // * The corresponding trailer mode has to be set to ``SEND``. + // * Envoy will send body and trailers (if present) to the server as they arrive. + // Sending the trailers (if present) is to inform the server the complete body arrives. + // In case there are no trailers, then Envoy will set + // :ref:`end_of_stream ` + // to true as part of the last body chunk request to notify the server that no other data is to be sent. + // * The server needs to send + // :ref:`StreamedBodyResponse ` + // to Envoy in the body response. + // * Envoy will stream the body chunks in the responses from the server to the upstream/downstream as they arrive. + FULL_DUPLEX_STREAMED = 4; } // How to handle the request header. Default is "SEND". diff --git a/api/envoy/service/ext_proc/v3/external_processor.proto b/api/envoy/service/ext_proc/v3/external_processor.proto index d1e5dcd70e17..ab193f491aae 100644 --- a/api/envoy/service/ext_proc/v3/external_processor.proto +++ b/api/envoy/service/ext_proc/v3/external_processor.proto @@ -377,15 +377,39 @@ message HeaderMutation { repeated string remove_headers = 2; } -// Replace the entire message body chunk received in the corresponding -// HttpBody message with this new body, or clear the body. +// [#not-implemented-hide:] +// The body response message corresponding to FULL_DUPLEX_STREAMED body mode. +message StreamedBodyResponse { + // The body response chunk that will be passed to the upstream/downstream by Envoy. + bytes body = 1; + + // The server sets this flag to true if it has received a body request with + // :ref:`end_of_stream ` set to true, + // and this is the last chunk of body responses. + bool end_of_stream = 2; +} + +// This message specifies the body mutation the server sends to Envoy. message BodyMutation { // The type of mutation for the body. oneof mutation { // The entire body to replace. + // Should only be used when the corresponding ``BodySendMode`` in the + // :ref:`processing_mode ` + // is not set to ``FULL_DUPLEX_STREAMED``. bytes body = 1; + // Clear the corresponding body chunk. + // Should only be used when the corresponding ``BodySendMode`` in the + // :ref:`processing_mode ` + // is not set to ``FULL_DUPLEX_STREAMED``. // Clear the corresponding body chunk. bool clear_body = 2; + + // [#not-implemented-hide:] + // Must be used when the corresponding ``BodySendMode`` in the + // :ref:`processing_mode ` + // is set to ``FULL_DUPLEX_STREAMED``. + StreamedBodyResponse streamed_response = 3; } } diff --git a/source/extensions/filters/http/ext_proc/ext_proc.cc b/source/extensions/filters/http/ext_proc/ext_proc.cc index d0fd9066fc8a..670938d5b98d 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.cc +++ b/source/extensions/filters/http/ext_proc/ext_proc.cc @@ -72,6 +72,36 @@ initGrpcService(const ExtProcPerRoute& config) { return absl::nullopt; } +// TODO(#37046) Refactoring the exception throwing logic. +void verifyProcessingModeConfig(const ExternalProcessor& config) { + const ProcessingMode& processing_mode = config.processing_mode(); + if (config.has_http_service()) { + // In case http_service configured, the processing mode can only support sending headers. + if (processing_mode.request_body_mode() != ProcessingMode::NONE || + processing_mode.response_body_mode() != ProcessingMode::NONE || + processing_mode.request_trailer_mode() == ProcessingMode::SEND || + processing_mode.response_trailer_mode() == ProcessingMode::SEND) { + throw EnvoyException( + "If the ext_proc filter is configured with http_service instead of gRPC service, " + "then the processing modes of this filter can not be configured to send body or " + "trailer."); + } + } + + if ((processing_mode.request_body_mode() == ProcessingMode::FULL_DUPLEX_STREAMED) && + (processing_mode.request_trailer_mode() != ProcessingMode::SEND)) { + throw EnvoyException( + "If the ext_proc filter has the request_body_mode set to FULL_DUPLEX_STREAMED, " + "then the request_trailer_mode has to be set to SEND"); + } + if ((processing_mode.response_body_mode() == ProcessingMode::FULL_DUPLEX_STREAMED) && + (processing_mode.response_trailer_mode() != ProcessingMode::SEND)) { + throw EnvoyException( + "If the ext_proc filter has the response_body_mode set to FULL_DUPLEX_STREAMED, " + "then the response_trailer_mode has to be set to SEND"); + } +} + std::vector initNamespaces(const Protobuf::RepeatedPtrField& ns) { std::vector namespaces; for (const auto& single_ns : ns) { @@ -231,16 +261,8 @@ FilterConfig::FilterConfig(const ExternalProcessor& config, config.response_attributes()), immediate_mutation_checker_(context.regexEngine()), thread_local_stream_manager_slot_(context.threadLocal().allocateSlot()) { - if (!grpc_service_.has_value()) { - // In case http_service configured, the processing mode can only support sending headers. - if (processing_mode_.request_body_mode() != ProcessingMode::NONE || - processing_mode_.response_body_mode() != ProcessingMode::NONE || - processing_mode_.request_trailer_mode() == ProcessingMode::SEND || - processing_mode_.response_trailer_mode() == ProcessingMode::SEND) { - throw EnvoyException( - "If http_service is configured, processing modes can not send any body or trailer."); - } - } + // Validate processing mode configuration. + verifyProcessingModeConfig(config); if (config.disable_clear_route_cache() && (route_cache_action_ != ExternalProcessor::DEFAULT)) { throw EnvoyException("disable_clear_route_cache and route_cache_action can not " "be set to none-default at the same time."); @@ -562,7 +584,6 @@ FilterDataStatus Filter::handleDataBufferedMode(ProcessorState& state, Buffer::I case StreamOpenState::IgnoreError: return FilterDataStatus::Continue; case StreamOpenState::Ok: - // Fall through break; } @@ -581,37 +602,24 @@ FilterDataStatus Filter::handleDataBufferedMode(ProcessorState& state, Buffer::I return FilterDataStatus::StopIterationAndBuffer; } -FilterDataStatus Filter::handleDataStreamedMode(ProcessorState& state, Buffer::Instance& data, - bool end_stream) { - // STREAMED body mode works as follows: - // - // 1) As data callbacks come in to the filter, it "moves" the data into a new buffer, which it - // dispatches via gRPC message to the external processor, and then keeps in a queue. It - // may request a watermark if the queue is higher than the buffer limit to prevent running - // out of memory. - // 2) As a result, filters farther down the chain see empty buffers in some data callbacks. - // 3) When a response comes back from the external processor, it injects the processor's result - // into the filter chain using "inject**codedData". (The processor may respond indicating that - // there is no change, which means that the original buffer stored in the queue is what gets - // injected.) - // - // This way, we pipeline data from the proxy to the external processor, and give the processor - // the ability to modify each chunk, in order. Doing this any other way would have required - // substantial changes to the filter manager. See - // https://github.com/envoyproxy/envoy/issues/16760 for a discussion. +FilterDataStatus Filter::handleDataStreamedModeBase(ProcessorState& state, Buffer::Instance& data, + bool end_stream) { switch (openStream()) { case StreamOpenState::Error: return FilterDataStatus::StopIterationNoBuffer; case StreamOpenState::IgnoreError: return FilterDataStatus::Continue; case StreamOpenState::Ok: - // Fall through break; } - // Need to first enqueue the data into the chunk queue before sending. ProcessingRequest req = setupBodyChunk(state, data, end_stream); - state.enqueueStreamingChunk(data, end_stream); + if (state.bodyMode() != ProcessingMode::FULL_DUPLEX_STREAMED) { + state.enqueueStreamingChunk(data, end_stream); + } else { + // For FULL_DUPLEX_STREAMED mode, just drain the data. + data.drain(data.length()); + } // If the current state is HeadersCallback, stays in that state. if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { sendBodyChunk(state, ProcessorState::CallbackState::HeadersCallback, req); @@ -626,6 +634,35 @@ FilterDataStatus Filter::handleDataStreamedMode(ProcessorState& state, Buffer::I } } +FilterDataStatus Filter::handleDataStreamedMode(ProcessorState& state, Buffer::Instance& data, + bool end_stream) { + // STREAMED body mode works as follows: + // + // 1) As data callbacks come in to the filter, it "moves" the data into a new buffer, which it + // dispatches via gRPC message to the external processor, and then keeps in a queue. It + // may request a watermark if the queue is higher than the buffer limit to prevent running + // out of memory. + // 2) As a result, filters farther down the chain see empty buffers in some data callbacks. + // 3) When a response comes back from the external processor, it injects the processor's result + // into the filter chain using "inject**codedData". (The processor may respond indicating that + // there is no change, which means that the original buffer stored in the queue is what gets + // injected.) + // + // This way, we pipeline data from the proxy to the external processor, and give the processor + // the ability to modify each chunk, in order. Doing this any other way would have required + // substantial changes to the filter manager. See + // https://github.com/envoyproxy/envoy/issues/16760 for a discussion. + return handleDataStreamedModeBase(state, data, end_stream); +} + +FilterDataStatus Filter::handleDataFullDuplexStreamedMode(ProcessorState& state, + Buffer::Instance& data, bool end_stream) { + // FULL_DUPLEX_STREAMED body mode works similar to STREAMED except it does not put the data + // into the internal queue. And there is no internal queue based flow control. A copy of the + // data is dispatched to the external processor and the original data is drained. + return handleDataStreamedModeBase(state, data, end_stream); +} + FilterDataStatus Filter::handleDataBufferedPartialMode(ProcessorState& state, Buffer::Instance& data, bool end_stream) { // BUFFERED_PARTIAL mode works as follows: @@ -672,6 +709,8 @@ FilterDataStatus Filter::handleDataBufferedPartialMode(ProcessorState& state, } FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, bool end_stream) { + state.setBodyReceived(true); + if (config_->observabilityMode()) { return sendDataInObservabilityMode(data, state, end_stream); } @@ -690,10 +729,13 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b } if (state.callbackState() == ProcessorState::CallbackState::HeadersCallback) { - if (state.bodyMode() == ProcessingMode::STREAMED && - config_->sendBodyWithoutWaitingForHeaderResponse()) { - ENVOY_LOG(trace, "Sending body data even header processing is still in progress as body mode " - "is STREAMED and send_body_without_waiting_for_header_response is enabled"); + if ((state.bodyMode() == ProcessingMode::STREAMED && + config_->sendBodyWithoutWaitingForHeaderResponse()) || + state.bodyMode() == ProcessingMode::FULL_DUPLEX_STREAMED) { + ENVOY_LOG(trace, + "Sending body data even though header processing is still in progress as body mode " + "is FULL_DUPLEX_STREAMED or STREAMED and " + "send_body_without_waiting_for_header_response is enabled"); } else { ENVOY_LOG(trace, "Header processing still in progress -- holding body data"); // We don't know what to do with the body until the response comes back. @@ -714,6 +756,9 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b case ProcessingMode::STREAMED: result = handleDataStreamedMode(state, data, end_stream); break; + case ProcessingMode::FULL_DUPLEX_STREAMED: + result = handleDataFullDuplexStreamedMode(state, data, end_stream); + break; case ProcessingMode::BUFFERED_PARTIAL: result = handleDataBufferedPartialMode(state, data, end_stream); break; @@ -723,7 +768,6 @@ FilterDataStatus Filter::onData(ProcessorState& state, Buffer::Instance& data, b result = FilterDataStatus::Continue; break; } - return result; } @@ -847,7 +891,8 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap& state.setTrailersAvailable(true); state.setTrailers(&trailers); - if (state.callbackState() != ProcessorState::CallbackState::Idle) { + if ((state.callbackState() != ProcessorState::CallbackState::Idle) && + (state.bodyMode() != ProcessingMode::FULL_DUPLEX_STREAMED)) { ENVOY_LOG(trace, "Previous callback still executing -- holding header iteration"); state.setPaused(true); return FilterTrailersStatus::StopIteration; @@ -982,6 +1027,12 @@ void Filter::sendBodyChunk(ProcessorState& state, ProcessorState::CallbackState void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers, bool observability_mode) { + // Skip if the trailers is already sent to the server. + if (state.trailersSentToServer()) { + return; + } + state.setTrailersSentToServer(true); + ProcessingRequest req; req.set_observability_mode(observability_mode); addAttributes(state, req); @@ -989,13 +1040,15 @@ void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers auto* trailers_req = state.mutableTrailers(req); MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(), *trailers_req->mutable_trailers()); - if (observability_mode) { ENVOY_LOG(debug, "Sending trailers message in observability mode"); } else { + ProcessorState::CallbackState callback_state = state.callbackState(); + if (callback_state == ProcessorState::CallbackState::Idle) { + callback_state = ProcessorState::CallbackState::TrailersCallback; + } state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), - config_->messageTimeout(), - ProcessorState::CallbackState::TrailersCallback); + config_->messageTimeout(), callback_state); ENVOY_LOG(debug, "Sending trailers message"); } @@ -1181,6 +1234,8 @@ void Filter::onReceiveMessage(std::unique_ptr&& r) { // and filter is waiting for header processing response. // Otherwise, the response mode_override proto field is ignored. if (config_->allowModeOverride() && !config_->sendBodyWithoutWaitingForHeaderResponse() && + (config_->processingMode().request_body_mode() != ProcessingMode::FULL_DUPLEX_STREAMED) && + (config_->processingMode().response_body_mode() != ProcessingMode::FULL_DUPLEX_STREAMED) && inHeaderProcessState() && response->has_mode_override()) { bool mode_override_allowed = true; const auto& mode_overide = response->mode_override(); diff --git a/source/extensions/filters/http/ext_proc/ext_proc.h b/source/extensions/filters/http/ext_proc/ext_proc.h index 5a2a1be309a9..0e2ac47acd7e 100644 --- a/source/extensions/filters/http/ext_proc/ext_proc.h +++ b/source/extensions/filters/http/ext_proc/ext_proc.h @@ -472,8 +472,12 @@ class Filter : public Logger::Loggable, Http::FilterDataStatus handleDataBufferedMode(ProcessorState& state, Buffer::Instance& data, bool end_stream); + Http::FilterDataStatus handleDataStreamedModeBase(ProcessorState& state, Buffer::Instance& data, + bool end_stream); Http::FilterDataStatus handleDataStreamedMode(ProcessorState& state, Buffer::Instance& data, bool end_stream); + Http::FilterDataStatus handleDataFullDuplexStreamedMode(ProcessorState& state, + Buffer::Instance& data, bool end_stream); Http::FilterDataStatus handleDataBufferedPartialMode(ProcessorState& state, Buffer::Instance& data, bool end_stream); Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream); diff --git a/source/extensions/filters/http/ext_proc/processor_state.cc b/source/extensions/filters/http/ext_proc/processor_state.cc index efe972a4fc70..8b2b3298b776 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.cc +++ b/source/extensions/filters/http/ext_proc/processor_state.cc @@ -20,19 +20,26 @@ using envoy::service::ext_proc::v3::TrailersResponse; void ProcessorState::onStartProcessorCall(Event::TimerCb cb, std::chrono::milliseconds timeout, CallbackState callback_state) { + ENVOY_LOG(debug, "Start external processing call"); callback_state_ = callback_state; - if (!message_timer_) { - message_timer_ = filter_callbacks_->dispatcher().createTimer(cb); + + // Skip starting timer For FULL_DUPLEX_STREAMED body mode. + if (bodyMode() != ProcessingMode::FULL_DUPLEX_STREAMED) { + if (message_timer_ == nullptr) { + message_timer_ = filter_callbacks_->dispatcher().createTimer(cb); + } + message_timer_->enableTimer(timeout); + ENVOY_LOG(debug, "Traffic direction {}: {} ms timer enabled", trafficDirectionDebugStr(), + timeout.count()); } - message_timer_->enableTimer(timeout); - ENVOY_LOG(debug, "Traffic direction {}: {} ms timer enabled", trafficDirectionDebugStr(), - timeout.count()); + call_start_time_ = filter_callbacks_->dispatcher().timeSource().monotonicTime(); new_timeout_received_ = false; } void ProcessorState::onFinishProcessorCall(Grpc::Status::GrpcStatus call_status, CallbackState next_state) { + ENVOY_LOG(debug, "Finish external processing call"); filter_.logGrpcStreamInfo(); stopMessageTimer(); @@ -110,14 +117,29 @@ absl::Status ProcessorState::processHeaderMutation(const CommonResponse& common_ ProcessorState::CallbackState ProcessorState::getCallbackStateAfterHeaderResp(const CommonResponse& common_response) const { - if (bodyMode() == ProcessingMode::STREAMED && - filter_.config().sendBodyWithoutWaitingForHeaderResponse() && !chunk_queue_.empty() && - (common_response.status() != CommonResponse::CONTINUE_AND_REPLACE)) { + if (common_response.status() == CommonResponse::CONTINUE_AND_REPLACE) { + return ProcessorState::CallbackState::Idle; + } + + if ((bodyMode() == ProcessingMode::STREAMED && + filter_.config().sendBodyWithoutWaitingForHeaderResponse()) && + !chunk_queue_.empty()) { return ProcessorState::CallbackState::StreamedBodyCallback; } + + if (bodyMode() == ProcessingMode::FULL_DUPLEX_STREAMED) { + if (bodyReceived()) { + return ProcessorState::CallbackState::StreamedBodyCallback; + } + if (trailers_available_) { + return ProcessorState::CallbackState::TrailersCallback; + } + } + return ProcessorState::CallbackState::Idle; } +// TODO(#37047) Refactoring this function by adding one helper function for each body mode. absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& response) { if (callback_state_ == CallbackState::HeadersCallback) { ENVOY_LOG(debug, "applying headers response. body mode = {}", @@ -165,7 +187,7 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon // Fall through if there was never a body in the first place. ENVOY_LOG(debug, "The message had no body"); } else if (complete_body_available_ && body_mode_ != ProcessingMode::NONE) { - if (callback_state_ != CallbackState::StreamedBodyCallback) { + if (callback_state_ == CallbackState::Idle) { // If we get here, then all the body data came in before the header message // was complete, and the server wants the body. It doesn't matter whether the // processing mode is buffered, streamed, or partially buffered. @@ -197,6 +219,10 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon sendBufferedDataInStreamedMode(false); continueIfNecessary(); return absl::OkStatus(); + } else if (body_mode_ == ProcessingMode::FULL_DUPLEX_STREAMED) { + // There is no buffered data in this mode. + continueIfNecessary(); + return absl::OkStatus(); } else if (body_mode_ == ProcessingMode::BUFFERED_PARTIAL) { if (hasBufferedData()) { // Put the data buffered so far into the buffer queue. When more data comes in @@ -246,6 +272,7 @@ absl::Status ProcessorState::handleHeadersResponse(const HeadersResponse& respon return absl::FailedPreconditionError("spurious message"); } +// TODO(#37048) Refactoring this function by adding one helper function for each callback state. absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { bool should_continue = false; const auto& common_response = response.response(); @@ -288,27 +315,11 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { onFinishProcessorCall(Grpc::Status::Ok); should_continue = true; } else if (callback_state_ == CallbackState::StreamedBodyCallback) { - Buffer::OwnedImpl chunk_data; - auto chunk = dequeueStreamingChunk(chunk_data); - ENVOY_BUG(chunk != nullptr, "Bad streamed body callback state"); - if (common_response.has_body_mutation()) { - ENVOY_LOG(debug, "Applying body response to chunk of data. Size = {}", chunk->length); - MutationUtils::applyBodyMutations(common_response.body_mutation(), chunk_data); - } - should_continue = chunk->end_stream; - if (chunk_data.length() > 0) { - ENVOY_LOG(trace, "Injecting {} bytes of data to filter stream", chunk_data.length()); - injectDataToFilterChain(chunk_data, chunk->end_stream); - } - - if (queueBelowLowLimit()) { - clearWatermark(); - } - if (chunk_queue_.empty()) { - onFinishProcessorCall(Grpc::Status::Ok); - } else { - onFinishProcessorCall(Grpc::Status::Ok, callback_state_); + absl::StatusOr result = handleBodyInStreamedState(common_response); + if (!result.ok()) { + return result.status(); } + should_continue = *result; } else if (callback_state_ == CallbackState::BufferedPartialBodyCallback) { // Apply changes to the buffer that we sent to the server Buffer::OwnedImpl chunk_data; @@ -368,9 +379,13 @@ absl::Status ProcessorState::handleBodyResponse(const BodyResponse& response) { return absl::FailedPreconditionError("spurious message"); } +// If the body mode is FULL_DUPLEX_STREAMED, then the trailers response may come back when +// the state is still waiting for body response. absl::Status ProcessorState::handleTrailersResponse(const TrailersResponse& response) { - if (callback_state_ == CallbackState::TrailersCallback) { - ENVOY_LOG(debug, "Applying response to buffered trailers"); + if (callback_state_ == CallbackState::TrailersCallback || + bodyMode() == ProcessingMode::FULL_DUPLEX_STREAMED) { + ENVOY_LOG(debug, "Applying response to buffered trailers, body_mode_ {}", + ProcessingMode::BodySendMode_Name(body_mode_)); if (response.has_header_mutation()) { auto mut_status = MutationUtils::applyHeaderMutations( response.header_mutation(), *trailers_, false, filter_.config().mutationChecker(), @@ -394,6 +409,10 @@ void ProcessorState::enqueueStreamingChunk(Buffer::Instance& data, bool end_stre } } +QueuedChunkPtr ProcessorState::dequeueStreamingChunk(Buffer::OwnedImpl& out_data) { + return chunk_queue_.pop(out_data); +} + void ProcessorState::clearAsyncState() { onFinishProcessorCall(Grpc::Status::Aborted); if (chunkQueue().receivedData().length() > 0) { @@ -415,6 +434,84 @@ void ProcessorState::continueIfNecessary() { } } +bool ProcessorState::handleStreamedBodyResponse(const CommonResponse& common_response) { + Buffer::OwnedImpl chunk_data; + QueuedChunkPtr chunk = dequeueStreamingChunk(chunk_data); + ENVOY_BUG(chunk != nullptr, "Bad streamed body callback state"); + if (common_response.has_body_mutation()) { + ENVOY_LOG(debug, "Applying body response to chunk of data. Size = {}", chunk->length); + MutationUtils::applyBodyMutations(common_response.body_mutation(), chunk_data); + } + bool should_continue = chunk->end_stream; + if (chunk_data.length() > 0) { + ENVOY_LOG(trace, "Injecting {} bytes of data to filter stream", chunk_data.length()); + injectDataToFilterChain(chunk_data, chunk->end_stream); + } + + if (queueBelowLowLimit()) { + clearWatermark(); + } + if (chunk_queue_.empty()) { + onFinishProcessorCall(Grpc::Status::Ok); + } else { + onFinishProcessorCall(Grpc::Status::Ok, callback_state_); + } + + return should_continue; +} + +bool ProcessorState::handleDuplexStreamedBodyResponse(const CommonResponse& common_response) { + const envoy::service::ext_proc::v3::StreamedBodyResponse& streamed_response = + common_response.body_mutation().streamed_response(); + const std::string& body = streamed_response.body(); + const bool end_of_stream = streamed_response.end_of_stream(); + + if (body.size() > 0) { + Buffer::OwnedImpl buffer; + buffer.add(body); + ENVOY_LOG(trace, + "Injecting {} bytes of data to filter stream in FULL_DUPLEX_STREAMED mode. " + "end_of_stream is {}", + buffer.length(), end_of_stream); + injectDataToFilterChain(buffer, end_of_stream); + } + + if (end_of_stream) { + onFinishProcessorCall(Grpc::Status::Ok); + } else { + // Set the state to CallbackState::StreamedBodyCallback to wait for more bodies. + // However, this could be the last chunk of body, and trailers are right after it. + // The function to handle trailers response needs to consider this. + onFinishProcessorCall(Grpc::Status::Ok, CallbackState::StreamedBodyCallback); + } + // If end_of_stream is true, Envoy should continue the filter chain operations. + return end_of_stream; +} + +absl::StatusOr +ProcessorState::handleBodyInStreamedState(const CommonResponse& common_response) { + if (common_response.has_body_mutation() && + common_response.body_mutation().has_streamed_response()) { + ENVOY_LOG(debug, "FULL_DUPLEX_STREAMED body response is received and body_mode_: {} ", + ProcessingMode::BodySendMode_Name(body_mode_)); + // streamed_response will only be supported if the ext_proc filter has body_mode set to + // FULL_DUPLEX_STREAMED. + if (body_mode_ != ProcessingMode::FULL_DUPLEX_STREAMED) { + return absl::FailedPreconditionError( + "spurious message: streamed_response is received while body_mode_ is not " + "FULL_DUPLEX_STREAMED"); + } + return handleDuplexStreamedBodyResponse(common_response); + } else { + if (body_mode_ == ProcessingMode::FULL_DUPLEX_STREAMED) { + return absl::FailedPreconditionError( + "spurious message: Normal body mutation response is received while body_mode_ is " + "FULL_DUPLEX_STREAMED"); + } + return handleStreamedBodyResponse(common_response); + } +} + void DecodingProcessorState::setProcessingModeInternal(const ProcessingMode& mode) { // Account for the different default behaviors of headers and trailers -- // headers are sent by default and trailers are not. diff --git a/source/extensions/filters/http/ext_proc/processor_state.h b/source/extensions/filters/http/ext_proc/processor_state.h index 51d8aa791891..2ca477521ea4 100644 --- a/source/extensions/filters/http/ext_proc/processor_state.h +++ b/source/extensions/filters/http/ext_proc/processor_state.h @@ -6,6 +6,7 @@ #include "envoy/buffer/buffer.h" #include "envoy/config/core/v3/base.pb.h" #include "envoy/event/timer.h" +#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h" #include "envoy/extensions/filters/http/ext_proc/v3/processing_mode.pb.h" #include "envoy/http/filter.h" #include "envoy/http/header_map.h" @@ -80,7 +81,8 @@ class ProcessorState : public Logger::Loggable { const std::vector& typed_forwarding_namespaces, const std::vector& untyped_receiving_namespaces) : filter_(filter), watermark_requested_(false), paused_(false), no_body_(false), - complete_body_available_(false), trailers_available_(false), body_replaced_(false), + complete_body_available_(false), trailers_available_(false), + trailers_sent_to_server_(false), body_replaced_(false), body_received_(false), partial_body_processed_(false), traffic_direction_(traffic_direction), untyped_forwarding_namespaces_(&untyped_forwarding_namespaces), typed_forwarding_namespaces_(&typed_forwarding_namespaces), @@ -104,6 +106,8 @@ class ProcessorState : public Logger::Loggable { void setHasNoBody(bool b) { no_body_ = b; } void setTrailersAvailable(bool d) { trailers_available_ = d; } bool bodyReplaced() const { return body_replaced_; } + bool bodyReceived() const { return body_received_; } + void setBodyReceived(bool b) { body_received_ = b; } bool partialBodyProcessed() const { return partial_body_processed_; } virtual void setProcessingMode( @@ -132,6 +136,9 @@ class ProcessorState : public Logger::Loggable { bool sendHeaders() const { return send_headers_; } bool sendTrailers() const { return send_trailers_; } + bool trailersSentToServer() const { return trailers_sent_to_server_; } + void setTrailersSentToServer(bool b) { trailers_sent_to_server_ = b; } + envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode bodyMode() const { return body_mode_; } @@ -170,9 +177,7 @@ class ProcessorState : public Logger::Loggable { // Move the contents of "data" into a QueuedChunk object on the streaming queue. void enqueueStreamingChunk(Buffer::Instance& data, bool end_stream); // If the queue has chunks, return the head of the queue. - QueuedChunkPtr dequeueStreamingChunk(Buffer::OwnedImpl& out_data) { - return chunk_queue_.pop(out_data); - } + QueuedChunkPtr dequeueStreamingChunk(Buffer::OwnedImpl& out_data); // Consolidate all the chunks on the queue into a single one and return a reference. const QueuedChunk& consolidateStreamedChunks() { return chunk_queue_.consolidate(); } bool queueOverHighLimit() const { return chunk_queue_.bytesEnqueued() > bufferLimit(); } @@ -182,11 +187,14 @@ class ProcessorState : public Logger::Loggable { // 1) STREAMED BodySendMode // 2) BUFFERED_PARTIAL BodySendMode // 3) BUFFERED BodySendMode + SKIP HeaderSendMode + // 4) FULL_DUPLEX_STREAMED BodySendMode // In these modes, ext_proc filter can not guarantee to set the content length correctly if // body is mutated by external processor later. // In http1 codec, removing content length will enable chunked encoding whenever feasible. return ( body_mode_ == envoy::extensions::filters::http::ext_proc::v3::ProcessingMode::STREAMED || + body_mode_ == + envoy::extensions::filters::http::ext_proc::v3::ProcessingMode::FULL_DUPLEX_STREAMED || body_mode_ == envoy::extensions::filters::http::ext_proc::v3::ProcessingMode::BUFFERED_PARTIAL || (body_mode_ == envoy::extensions::filters::http::ext_proc::v3::ProcessingMode::BUFFERED && @@ -236,8 +244,12 @@ class ProcessorState : public Logger::Loggable { bool complete_body_available_ : 1; // If true, then the filter received the trailers bool trailers_available_ : 1; + // If true, the trailers is already sent to the server. + bool trailers_sent_to_server_ : 1; // If true, then a CONTINUE_AND_REPLACE status was used on a response bool body_replaced_ : 1; + // If true, some body data is received. + bool body_received_ : 1; // If true, we are in "buffered partial" mode and we already reached the buffer // limit, sent the body in a message, and got back a reply. bool partial_body_processed_ : 1; @@ -273,6 +285,12 @@ class ProcessorState : public Logger::Loggable { private: virtual void clearRouteCache(const envoy::service::ext_proc::v3::CommonResponse&) {} + bool + handleStreamedBodyResponse(const envoy::service::ext_proc::v3::CommonResponse& common_response); + bool handleDuplexStreamedBodyResponse( + const envoy::service::ext_proc::v3::CommonResponse& common_response); + absl::StatusOr + handleBodyInStreamedState(const envoy::service::ext_proc::v3::CommonResponse& common_response); void sendBufferedDataInStreamedMode(bool end_stream); absl::Status processHeaderMutation(const envoy::service::ext_proc::v3::CommonResponse& common_response); diff --git a/test/extensions/filters/http/ext_proc/BUILD b/test/extensions/filters/http/ext_proc/BUILD index 7487855aa6a4..18af3bb9562d 100644 --- a/test/extensions/filters/http/ext_proc/BUILD +++ b/test/extensions/filters/http/ext_proc/BUILD @@ -39,6 +39,7 @@ envoy_extension_cc_test( }), extension_names = ["envoy.filters.http.ext_proc"], rbe_pool = "2core", + shard_count = 8, tags = ["skip_on_windows"], deps = [ ":mock_server_lib", diff --git a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc index e98acc95bb8a..b85f337b373b 100644 --- a/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc +++ b/test/extensions/filters/http/ext_proc/ext_proc_integration_test.cc @@ -708,6 +708,56 @@ class ExtProcIntegrationTest : public HttpIntegrationTest, verifyDownstreamResponse(*response, 200); } + IntegrationStreamDecoderPtr initAndSendDataDuplexStreamedMode(absl::string_view body_sent, + bool end_of_stream) { + config_helper_.setBufferLimits(1024, 1024); + auto* processing_mode = proto_config_.mutable_processing_mode(); + processing_mode->set_request_header_mode(ProcessingMode::SEND); + processing_mode->set_request_body_mode(ProcessingMode::FULL_DUPLEX_STREAMED); + processing_mode->set_request_trailer_mode(ProcessingMode::SEND); + processing_mode->set_response_header_mode(ProcessingMode::SKIP); + + initializeConfig(); + HttpIntegrationTest::initialize(); + codec_client_ = makeHttpConnection(lookupPort("http")); + Http::TestRequestHeaderMapImpl default_headers; + HttpTestUtility::addDefaultHeaders(default_headers); + + std::pair encoder_decoder = + codec_client_->startRequest(default_headers); + request_encoder_ = &encoder_decoder.first; + IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second); + codec_client_->sendData(*request_encoder_, body_sent, end_of_stream); + return response; + } + + void serverReceiveHeaderDuplexStreamed(ProcessingRequest& header_request) { + EXPECT_TRUE(grpc_upstreams_[0]->waitForHttpConnection(*dispatcher_, processor_connection_)); + EXPECT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_)); + EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, header_request)); + EXPECT_TRUE(header_request.has_request_headers()); + } + + void serverSendHeaderRespDuplexStreamed() { + processor_stream_->startGrpcStream(); + ProcessingResponse response_header; + auto* header_resp = response_header.mutable_request_headers(); + auto* header_mutation = header_resp->mutable_response()->mutable_header_mutation(); + auto* header = header_mutation->add_set_headers()->mutable_header(); + header->set_key("x-new-header"); + header->set_raw_value("new"); + processor_stream_->sendGrpcMessage(response_header); + } + + void serverSendTrailerRespDuplexStreamed() { + ProcessingResponse response_trailer; + auto* trailer_resp = response_trailer.mutable_request_trailers()->mutable_header_mutation(); + auto* header = trailer_resp->add_set_headers()->mutable_header(); + header->set_key("x-new-trailer"); + header->set_raw_value("new"); + processor_stream_->sendGrpcMessage(response_trailer); + } + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config_{}; uint32_t max_message_timeout_ms_{0}; std::vector grpc_upstreams_; @@ -4936,6 +4986,187 @@ TEST_P(ExtProcIntegrationTest, SendHeaderBodyNotSendTrailerTest) { verifyDownstreamResponse(*response, 200); } +TEST_P(ExtProcIntegrationTest, ServerWaitForBodyBeforeSendsHeaderRespDuplexStreamed) { + const std::string body_sent(64 * 1024, 's'); + IntegrationStreamDecoderPtr response = initAndSendDataDuplexStreamedMode(body_sent, true); + + // The ext_proc server receives the headers. + ProcessingRequest header_request; + serverReceiveHeaderDuplexStreamed(header_request); + + std::string body_received; + bool end_stream = false; + uint32_t total_req_body_msg = 0; + while (!end_stream) { + ProcessingRequest body_request; + EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, body_request)); + EXPECT_TRUE(body_request.has_request_body()); + body_received = absl::StrCat(body_received, body_request.request_body().body()); + end_stream = body_request.request_body().end_of_stream(); + total_req_body_msg++; + } + EXPECT_TRUE(end_stream); + EXPECT_EQ(body_received, body_sent); + + // The ext_proc server sends back the header response. + serverSendHeaderRespDuplexStreamed(); + + // The ext_proc server sends back the body response. + uint32_t total_resp_body_msg = 2 * total_req_body_msg; + const std::string body_upstream(total_resp_body_msg, 'r'); + for (uint32_t i = 0; i < total_resp_body_msg; i++) { + ProcessingResponse response_body; + auto* body_resp = response_body.mutable_request_body(); + auto* body_mut = body_resp->mutable_response()->mutable_body_mutation(); + auto* streamed_response = body_mut->mutable_streamed_response(); + streamed_response->set_body("r"); + const bool end_of_stream = (i == total_resp_body_msg - 1) ? true : false; + streamed_response->set_end_of_stream(end_of_stream); + processor_stream_->sendGrpcMessage(response_body); + } + + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), body_upstream); + verifyDownstreamResponse(*response, 200); +} + +// Buffer the whole message including header, body and trailer before sending response. +TEST_P(ExtProcIntegrationTest, + ServerWaitForBodyAndTrailerBeforeSendsHeaderRespDuplexStreamedSmallBody) { + const std::string body_sent(128 * 1024, 's'); + IntegrationStreamDecoderPtr response = initAndSendDataDuplexStreamedMode(body_sent, false); + Http::TestRequestTrailerMapImpl request_trailers{{"x-trailer-foo", "yes"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + // The ext_proc server receives the headers. + ProcessingRequest header_request; + serverReceiveHeaderDuplexStreamed(header_request); + + std::string body_received; + bool end_stream = false; + uint32_t total_req_body_msg = 0; + while (!end_stream) { + ProcessingRequest request; + EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); + EXPECT_TRUE(request.has_request_body() || request.has_request_trailers()); + if (!request.has_request_trailers()) { + // request_body is received + body_received = absl::StrCat(body_received, request.request_body().body()); + total_req_body_msg++; + } else { + // request_trailer is received. + end_stream = true; + } + } + EXPECT_TRUE(end_stream); + EXPECT_EQ(body_received, body_sent); + + // The ext_proc server sends back the header response. + serverSendHeaderRespDuplexStreamed(); + + // The ext_proc server sends back the body response. + uint32_t total_resp_body_msg = total_req_body_msg / 2; + const std::string body_upstream(total_resp_body_msg, 'r'); + for (uint32_t i = 0; i < total_resp_body_msg; i++) { + ProcessingResponse response_body; + auto* streamed_response = response_body.mutable_request_body() + ->mutable_response() + ->mutable_body_mutation() + ->mutable_streamed_response(); + streamed_response->set_body("r"); + processor_stream_->sendGrpcMessage(response_body); + } + + // The ext_proc server sends back the trailer response. + serverSendTrailerRespDuplexStreamed(); + + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), body_upstream); + verifyDownstreamResponse(*response, 200); +} + +// The body is large. The server sends some body responses after buffering some amount of data. +// The server continuously does so until the entire body processing is done. +TEST_P(ExtProcIntegrationTest, ServerSendBodyRespWithouRecvEntireBodyDuplexStreamed) { + const std::string body_sent(256 * 1024, 's'); + IntegrationStreamDecoderPtr response = initAndSendDataDuplexStreamedMode(body_sent, false); + Http::TestRequestTrailerMapImpl request_trailers{{"x-trailer-foo", "yes"}}; + codec_client_->sendTrailers(*request_encoder_, request_trailers); + + // The ext_proc server receives the headers. + ProcessingRequest header_request; + serverReceiveHeaderDuplexStreamed(header_request); + Http::TestRequestHeaderMapImpl expected_request_headers{{":scheme", "http"}, + {":method", "GET"}, + {"host", "host"}, + {":path", "/"}, + {"x-forwarded-proto", "http"}}; + EXPECT_THAT(header_request.request_headers().headers(), + HeaderProtosEqual(expected_request_headers)); + + std::string body_received; + bool end_stream = false; + uint32_t total_req_body_msg = 0; + bool header_resp_sent = false; + std::string body_upstream; + + while (!end_stream) { + ProcessingRequest request; + EXPECT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request)); + EXPECT_TRUE(request.has_request_body() || request.has_request_trailers()); + if (!request.has_request_trailers()) { + // Buffer the entire body. + body_received = absl::StrCat(body_received, request.request_body().body()); + total_req_body_msg++; + // After receiving every 7 body chunks, the server sends back three body responses. + if (total_req_body_msg % 7 == 0) { + if (!header_resp_sent) { + // Before sending the 1st body response, sends a header response. + serverSendHeaderRespDuplexStreamed(); + header_resp_sent = true; + } + ProcessingResponse response_body; + for (uint32_t i = 0; i < 3; i++) { + body_upstream += std::to_string(i); + auto* streamed_response = response_body.mutable_request_body() + ->mutable_response() + ->mutable_body_mutation() + ->mutable_streamed_response(); + streamed_response->set_body(std::to_string(i)); + processor_stream_->sendGrpcMessage(response_body); + } + } + } else { + // request_trailer is received. + end_stream = true; + Http::TestResponseTrailerMapImpl expected_trailers{{"x-trailer-foo", "yes"}}; + EXPECT_THAT(request.request_trailers().trailers(), HeaderProtosEqual(expected_trailers)); + } + } + EXPECT_TRUE(end_stream); + EXPECT_EQ(body_received, body_sent); + + // Send one more body response at the end. + ProcessingResponse response_body; + auto* streamed_response = response_body.mutable_request_body() + ->mutable_response() + ->mutable_body_mutation() + ->mutable_streamed_response(); + streamed_response->set_body("END"); + processor_stream_->sendGrpcMessage(response_body); + body_upstream += "END"; + + // The ext_proc server sends back the trailer response. + serverSendTrailerRespDuplexStreamed(); + + handleUpstreamRequest(); + EXPECT_THAT(upstream_request_->headers(), SingleHeaderValueIs("x-new-header", "new")); + EXPECT_EQ(upstream_request_->body().toString(), body_upstream); + verifyDownstreamResponse(*response, 200); +} + TEST_P(ExtProcIntegrationTest, ModeOverrideAllowed) { proto_config_.mutable_processing_mode()->set_response_header_mode(ProcessingMode::SKIP); proto_config_.set_allow_mode_override(true); diff --git a/test/extensions/filters/http/ext_proc/filter_test.cc b/test/extensions/filters/http/ext_proc/filter_test.cc index 5193c769b5df..07cb946e1cf8 100644 --- a/test/extensions/filters/http/ext_proc/filter_test.cc +++ b/test/extensions/filters/http/ext_proc/filter_test.cc @@ -309,6 +309,19 @@ class HttpFilterTest : public testing::Test { stream_callbacks_->onReceiveMessage(std::move(response)); } + void processResponseHeadersAfterTrailer( + absl::optional> + cb) { + HttpHeaders headers; + auto response = std::make_unique(); + auto* headers_response = response->mutable_response_headers(); + if (cb) { + (*cb)(headers, *response, *headers_response); + } + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + stream_callbacks_->onReceiveMessage(std::move(response)); + } + // Expect a request_body request, and send back a valid response void processRequestBody( absl::optional> cb, @@ -360,6 +373,31 @@ class HttpFilterTest : public testing::Test { stream_callbacks_->onReceiveMessage(std::move(response)); } + void processResponseBodyHelper(absl::string_view data, Buffer::OwnedImpl& want_response_body, + bool end_of_stream = false, bool should_continue = false) { + processResponseBody( + [&](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* streamed_response = + resp.mutable_response()->mutable_body_mutation()->mutable_streamed_response(); + streamed_response->set_end_of_stream(end_of_stream); + streamed_response->set_body(data); + want_response_body.add(data); + }, + should_continue); + } + + void processResponseBodyStreamedAfterTrailer(absl::string_view data, + Buffer::OwnedImpl& want_response_body) { + auto response = std::make_unique(); + auto* body_response = response->mutable_response_body(); + auto* streamed_response = + body_response->mutable_response()->mutable_body_mutation()->mutable_streamed_response(); + streamed_response->set_body(data); + want_response_body.add(data); + test_time_->advanceTimeWait(std::chrono::microseconds(10)); + stream_callbacks_->onReceiveMessage(std::move(response)); + } + void processRequestTrailers( absl::optional< std::function> @@ -2554,6 +2592,58 @@ TEST_F(HttpFilterTest, ProcessingModeOverrideResponseHeaders) { EXPECT_EQ(1, config_->stats().streams_closed_.value()); } +// Set allow_mode_override in filter config to be true. +// Set request_body_mode: FULL_DUPLEX_STREAMED +// In such case, the mode_override in the response will be ignored. +TEST_F(HttpFilterTest, DisableResponseModeOverrideByStreamedBodyMode) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_header_mode: "SEND" + response_header_mode: "SEND" + request_body_mode: "FULL_DUPLEX_STREAMED" + response_body_mode: "FULL_DUPLEX_STREAMED" + request_trailer_mode: "SEND" + response_trailer_mode: "SEND" + allow_mode_override: true + )EOF"); + + EXPECT_EQ(filter_->config().allowModeOverride(), true); + EXPECT_EQ(filter_->config().sendBodyWithoutWaitingForHeaderResponse(), false); + EXPECT_EQ(filter_->config().processingMode().response_header_mode(), ProcessingMode::SEND); + EXPECT_EQ(filter_->config().processingMode().response_body_mode(), + ProcessingMode::FULL_DUPLEX_STREAMED); + EXPECT_EQ(filter_->config().processingMode().request_body_mode(), + ProcessingMode::FULL_DUPLEX_STREAMED); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, true)); + + // When ext_proc server sends back the request header response, it contains the + // mode_override for the response_header_mode to be SKIP. + processRequestHeaders( + false, [](const HttpHeaders&, ProcessingResponse& response, HeadersResponse&) { + response.mutable_mode_override()->set_response_header_mode(ProcessingMode::SKIP); + }); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, true)); + + // Verify such mode_override is ignored. The response header is still sent to the ext_proc server. + processResponseHeaders(false, [](const HttpHeaders& header_resp, ProcessingResponse&, + HeadersResponse&) { + EXPECT_TRUE(header_resp.end_of_stream()); + TestRequestHeaderMapImpl expected_response{{":status", "200"}, {"content-type", "text/plain"}}; + EXPECT_THAT(header_resp.headers(), HeaderProtosEqual(expected_response)); + }); + + TestRequestHeaderMapImpl final_expected_response{{":status", "200"}, + {"content-type", "text/plain"}}; + EXPECT_THAT(&response_headers_, HeaderMapEqualIgnoreOrder(&final_expected_response)); + filter_->onDestroy(); +} + // Set allow_mode_override in filter config to be true. // Set send_body_without_waiting_for_header_response to be true // In such case, the mode_override in the response will be ignored. @@ -2791,7 +2881,8 @@ TEST_F(HttpFilterTest, HttpServiceBodyProcessingModeNotNone) { factory_context_); }, EnvoyException, - "If http_service is configured, processing modes can not send any body or trailer."); + "If the ext_proc filter is configured with http_service instead of gRPC service, " + "then the processing modes of this filter can not be configured to send body or trailer."); } TEST_F(HttpFilterTest, HttpServiceTrailerProcessingModeNotSKIP) { @@ -2821,7 +2912,58 @@ TEST_F(HttpFilterTest, HttpServiceTrailerProcessingModeNotSKIP) { factory_context_); }, EnvoyException, - "If http_service is configured, processing modes can not send any body or trailer."); + "If the ext_proc filter is configured with http_service instead of gRPC service, " + "then the processing modes of this filter can not be configured to send body or trailer."); +} + +TEST_F(HttpFilterTest, RequestBodyModeStreamedTrailerModeSKIP) { + std::string yaml = R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + request_body_mode: "FULL_DUPLEX_STREAMED" + request_trailer_mode: "SKIP" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, + "If the ext_proc filter has the request_body_mode set to FULL_DUPLEX_STREAMED, " + "then the request_trailer_mode has to be set to SEND"); +} + +TEST_F(HttpFilterTest, ResponseBodyModeStreamedTrailerModeSKIP) { + std::string yaml = R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_body_mode: "FULL_DUPLEX_STREAMED" + response_trailer_mode: "SKIP" + )EOF"; + + envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor proto_config{}; + TestUtility::loadFromYaml(yaml, proto_config); + EXPECT_THROW_WITH_MESSAGE( + { + auto config = std::make_shared( + proto_config, 200ms, 10000, *stats_store_.rootScope(), "", false, + std::make_shared( + Envoy::Extensions::Filters::Common::Expr::createBuilder(nullptr)), + factory_context_); + }, + EnvoyException, + "If the ext_proc filter has the response_body_mode set to FULL_DUPLEX_STREAMED, " + "then the response_trailer_mode has to be set to SEND"); } // Using the default configuration, verify that the "clear_route_cache" flag makes the appropriate @@ -4356,6 +4498,322 @@ TEST_F(HttpFilterTest, StreamedTestInBothDirection) { filter_->onDestroy(); } +TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestNormal) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_body_mode: "FULL_DUPLEX_STREAMED" + response_trailer_mode: "SEND" + )EOF"); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + Buffer::OwnedImpl want_response_body; + Buffer::OwnedImpl got_response_body; + EXPECT_CALL(encoder_callbacks_, injectEncodedDataToFilterChain(_, _)) + .WillRepeatedly(Invoke( + [&got_response_body](Buffer::Instance& data, Unused) { got_response_body.move(data); })); + + // Test 7x3 streaming. + for (int i = 0; i < 7; i++) { + // 7 request chunks are sent to the ext_proc server. + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + + processResponseBodyHelper(" AAAAA ", want_response_body); + processResponseBodyHelper(" BBBB ", want_response_body); + processResponseBodyHelper(" CCC ", want_response_body); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + + // Now do 1:1 streaming for a few chunks. + for (int i = 0; i < 3; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + processResponseBodyHelper(std::to_string(i), want_response_body); + } + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + + // Now send another 10 chunks. + for (int i = 0; i < 10; i++) { + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 10); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + // Send the last chunk. + Buffer::OwnedImpl last_resp_chunk; + TestUtility::feedBufferWithRandomCharacters(last_resp_chunk, 10); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(last_resp_chunk, true)); + + processResponseBodyHelper(" EEEEEEE ", want_response_body); + processResponseBodyHelper(" F ", want_response_body); + processResponseBodyHelper(" GGGGGGGGG ", want_response_body); + processResponseBodyHelper(" HH ", want_response_body, true, true); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithTrailer) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_body_mode: "FULL_DUPLEX_STREAMED" + response_trailer_mode: "SEND" + )EOF"); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + // Server sending headers response without waiting for body. + processResponseHeaders(false, absl::nullopt); + + Buffer::OwnedImpl want_response_body; + Buffer::OwnedImpl got_response_body; + EXPECT_CALL(encoder_callbacks_, injectEncodedDataToFilterChain(_, _)) + .WillRepeatedly(Invoke( + [&got_response_body](Buffer::Instance& data, Unused) { got_response_body.move(data); })); + + for (int i = 0; i < 7; i++) { + // 7 request chunks are sent to the ext_proc server. + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + + EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->encodeTrailers(response_trailers_)); + + processResponseBodyStreamedAfterTrailer(" AAAAA ", want_response_body); + processResponseBodyStreamedAfterTrailer(" BBBB ", want_response_body); + processResponseTrailers(absl::nullopt, true); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithHeaderAndTrailer) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_header_mode: "SEND" + response_body_mode: "FULL_DUPLEX_STREAMED" + response_trailer_mode: "SEND" + )EOF"); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + // Server buffer header, body and trailer before sending header response. + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + + Buffer::OwnedImpl want_response_body; + Buffer::OwnedImpl got_response_body; + EXPECT_CALL(encoder_callbacks_, injectEncodedDataToFilterChain(_, _)) + .WillRepeatedly(Invoke( + [&got_response_body](Buffer::Instance& data, Unused) { got_response_body.move(data); })); + + for (int i = 0; i < 7; i++) { + // 7 request chunks are sent to the ext_proc server. + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, false)); + } + + EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->encodeTrailers(response_trailers_)); + + // Server now sends back response. + processResponseHeadersAfterTrailer(absl::nullopt); + processResponseBodyStreamedAfterTrailer(" AAAAA ", want_response_body); + processResponseBodyStreamedAfterTrailer(" BBBB ", want_response_body); + processResponseTrailers(absl::nullopt, true); + + // The two buffers should match. + EXPECT_EQ(want_response_body.toString(), got_response_body.toString()); + EXPECT_FALSE(encoding_watermarked); + + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithHeaderAndTrailerNoBody) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_header_mode: "SEND" + response_body_mode: "FULL_DUPLEX_STREAMED" + response_trailer_mode: "SEND" + )EOF"); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + // Envoy sends header, body and trailer. + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + EXPECT_EQ(FilterTrailersStatus::StopIteration, filter_->encodeTrailers(response_trailers_)); + + // Server now sends back response. + processResponseHeadersAfterTrailer(absl::nullopt); + processResponseTrailers(absl::nullopt, true); + + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 0); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, DuplexStreamedBodyProcessingTestWithFilterConfigMissing) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_body_mode: "STREAMED" + )EOF"); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + for (int i = 0; i < 4; i++) { + // 4 request chunks are sent to the ext_proc server. + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::Continue, filter_->encodeData(resp_chunk, false)); + } + + processResponseBody( + [](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* streamed_response = + resp.mutable_response()->mutable_body_mutation()->mutable_streamed_response(); + streamed_response->set_body("AAA"); + }, + false); + + // Verify spurious message is received. + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 1); + filter_->onDestroy(); +} + +TEST_F(HttpFilterTest, SendNormalBodyMutationTestWithFilterConfigDuplexStreamed) { + initialize(R"EOF( + grpc_service: + envoy_grpc: + cluster_name: "ext_proc_server" + processing_mode: + response_body_mode: "FULL_DUPLEX_STREAMED" + response_trailer_mode: "SEND" + )EOF"); + + // Create synthetic HTTP request + HttpTestUtility::addDefaultHeaders(request_headers_); + request_headers_.setMethod("POST"); + request_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->decodeHeaders(request_headers_, false)); + processRequestHeaders(false, absl::nullopt); + + response_headers_.addCopy(LowerCaseString(":status"), "200"); + response_headers_.addCopy(LowerCaseString("content-type"), "text/plain"); + + bool encoding_watermarked = false; + setUpEncodingWatermarking(encoding_watermarked); + EXPECT_EQ(FilterHeadersStatus::StopIteration, filter_->encodeHeaders(response_headers_, false)); + processResponseHeaders(false, absl::nullopt); + + Buffer::OwnedImpl resp_chunk; + TestUtility::feedBufferWithRandomCharacters(resp_chunk, 100); + EXPECT_EQ(FilterDataStatus::StopIterationNoBuffer, filter_->encodeData(resp_chunk, true)); + + processResponseBody( + [](const HttpBody&, ProcessingResponse&, BodyResponse& resp) { + auto* body_mut = resp.mutable_response()->mutable_body_mutation(); + body_mut->set_body("AAA"); + }, + true); + + // Verify spurious message is received. + EXPECT_EQ(config_->stats().spurious_msgs_received_.value(), 1); + filter_->onDestroy(); +} + // Verify if ext_proc filter is in the upstream filter chain, and if the ext_proc server // sends back response with clear_route_cache set to true, it is ignored. TEST_F(HttpFilterTest, ClearRouteCacheHeaderMutationUpstreamIgnored) {