Skip to content

Commit

Permalink
mobile: fixing a flow control bug for multiple large uploads (envoypr…
Browse files Browse the repository at this point in the history
…oxy#36474)

Fixing two bugs in the send window code.
One where the callback could be called late, after stream completion.  
One where the callback was per-client not per-stream so streams could
overwrite each others upcalls.

Risk Level: low
Testing: first is tested by existing tests no longer flaking. second
TBTested
Docs Changes: n/a
Release Notes: n/a
Fixes envoyproxy#36493

Signed-off-by: Alyssa Wilk <[email protected]>
  • Loading branch information
alyssawilk authored Oct 8, 2024
1 parent 80b39e4 commit a9ce686
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 5 deletions.
6 changes: 3 additions & 3 deletions mobile/library/common/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,9 @@ void Client::sendData(envoy_stream_t stream, Buffer::InstancePtr buffer, bool en
direct_stream->wants_write_notification_ = false;
// A new callback must be scheduled each time to capture any changes to the
// DirectStream's callbacks from call to call.
scheduled_callback_ = dispatcher_.createSchedulableCallback(
direct_stream->scheduled_callback_ = dispatcher_.createSchedulableCallback(
[direct_stream] { direct_stream->callbacks_->onSendWindowAvailable(); });
scheduled_callback_->scheduleCallbackNextIteration();
direct_stream->scheduled_callback_->scheduleCallbackNextIteration();
} else {
// Otherwise, make sure the stack will send a notification when the
// buffers are drained.
Expand Down Expand Up @@ -699,7 +699,6 @@ void Client::cancelStream(envoy_stream_t stream) {
// whether it was closed or not.
Client::DirectStreamSharedPtr direct_stream =
getStream(stream, GetStreamFilters::AllowForAllStreams);
scheduled_callback_ = nullptr;
if (direct_stream) {
// Attempt to latch the latest stream info. This will be a no-op if the stream
// is already complete.
Expand Down Expand Up @@ -759,6 +758,7 @@ void Client::removeStream(envoy_stream_t stream_handle) {
"[S{}] removeStream is a private method that is only called with stream ids that exist",
stream_handle));

direct_stream->scheduled_callback_ = nullptr;
// The DirectStream should live through synchronous code that already has a reference to it.
// Hence why it is scheduled for deferred deletion. If this was all that was needed then it
// would be sufficient to return a shared_ptr in getStream. However, deferred deletion is still
Expand Down
2 changes: 1 addition & 1 deletion mobile/library/common/http/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ class Client : public Logger::Loggable<Logger::Id::http> {
// Set true in explicit flow control mode if the library has sent body data and may want to
// send more when buffer is available.
bool wants_write_notification_{};
Event::SchedulableCallbackPtr scheduled_callback_;
// True if the bridge should operate in explicit flow control mode.
//
// In this mode only one callback can be sent to the bridge until more is
Expand Down Expand Up @@ -383,7 +384,6 @@ class Client : public Logger::Loggable<Logger::Id::http> {

ApiListenerPtr api_listener_;
Event::ProvisionalDispatcher& dispatcher_;
Event::SchedulableCallbackPtr scheduled_callback_;
HttpClientStats stats_;
// The set of open streams, which can safely have request data sent on them
// or response data received.
Expand Down
79 changes: 79 additions & 0 deletions mobile/test/common/http/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,85 @@ TEST_P(ClientTest, MultipleStreams) {
ASSERT_EQ(callbacks_called1.on_complete_calls_, 1);
}

TEST_P(ClientTest, MultipleUploads) {
envoy_stream_t stream1 = 1;
envoy_stream_t stream2 = 2;
auto request_data1 = std::make_unique<Buffer::OwnedImpl>("request body1");
auto request_data2 = std::make_unique<Buffer::OwnedImpl>("request body2");

// Create a stream, and set up request_decoder_ and response_encoder_
StreamCallbacksCalled callbacks_called1;
auto stream_callbacks1 = createDefaultStreamCallbacks(callbacks_called1);
createStream(std::move(stream_callbacks1));

// Send request headers.
EXPECT_CALL(*request_decoder_, decodeHeaders_(_, false));
http_client_.sendHeaders(stream1, createDefaultRequestHeaders(), false);
http_client_.sendData(stream1, std::move(request_data1), true);

// Start stream2.
// Setup EnvoyStreamCallbacks to handle the response headers.
NiceMock<MockRequestDecoder> request_decoder2;
ON_CALL(request_decoder2, streamInfo()).WillByDefault(ReturnRef(stream_info_));
ResponseEncoder* response_encoder2{};
StreamCallbacksCalled callbacks_called2;
auto stream_callbacks2 = createDefaultStreamCallbacks(callbacks_called1);
stream_callbacks2.on_headers_ = [&](const ResponseHeaderMap& headers, bool end_stream,
envoy_stream_intel) -> void {
EXPECT_TRUE(end_stream);
EXPECT_EQ(headers.Status()->value().getStringView(), "200");
callbacks_called2.on_headers_calls_ = true;
};
stream_callbacks2.on_complete_ = [&](envoy_stream_intel, envoy_final_stream_intel) -> void {
callbacks_called2.on_complete_calls_++;
};

std::vector<Event::SchedulableCallback*> window_callbacks;
ON_CALL(dispatcher_, createSchedulableCallback).WillByDefault([&](std::function<void()> cb) {
Event::SchedulableCallbackPtr scheduler =
dispatcher_.Event::ProvisionalDispatcher::createSchedulableCallback(cb);
window_callbacks.push_back(scheduler.get());
return scheduler;
});

// Create a stream.
ON_CALL(dispatcher_, isThreadSafe()).WillByDefault(Return(true));

// Grab the response encoder in order to dispatch responses on the stream.
// Return the request decoder to make sure calls are dispatched to the decoder via the dispatcher
// API.
EXPECT_CALL(*api_listener_, newStreamHandle(_, _))
.WillOnce(Invoke([&](ResponseEncoder& encoder, bool) -> RequestDecoderHandlePtr {
response_encoder2 = &encoder;
return std::make_unique<TestHandle>(request_decoder2);
}));
http_client_.startStream(stream2, std::move(stream_callbacks2), explicit_flow_control_);

// Send request headers.
EXPECT_CALL(request_decoder2, decodeHeaders_(_, false));
http_client_.sendHeaders(stream2, createDefaultRequestHeaders(), false);
http_client_.sendData(stream2, std::move(request_data2), true);

for (auto* callback : window_callbacks) {
EXPECT_TRUE(callback->enabled());
}

// Finish stream 2.
EXPECT_CALL(dispatcher_, deferredDelete_(_));
TestResponseHeaderMapImpl response_headers2{{":status", "200"}};
response_encoder2->encodeHeaders(response_headers2, true);
ASSERT_EQ(callbacks_called2.on_headers_calls_, 1);
// Ensure that the on_headers on the EnvoyStreamCallbacks was called.
ASSERT_EQ(callbacks_called2.on_complete_calls_, 1);

// Finish stream 1.
EXPECT_CALL(dispatcher_, deferredDelete_(_));
TestResponseHeaderMapImpl response_headers{{":status", "200"}};
response_encoder_->encodeHeaders(response_headers, true);
ASSERT_EQ(callbacks_called1.on_headers_calls_, 1);
ASSERT_EQ(callbacks_called1.on_complete_calls_, 1);
}

TEST_P(ClientTest, EnvoyLocalError) {
// Override the on_error default with some custom checks.
StreamCallbacksCalled callbacks_called;
Expand Down
1 change: 0 additions & 1 deletion mobile/test/java/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ envoy_mobile_android_test(
srcs = [
"AndroidEngineExplicitFlowTest.java",
],
flaky = True,
native_deps = [
"//test/jni:libenvoy_jni_with_test_extensions.so",
] + select({
Expand Down

0 comments on commit a9ce686

Please sign in to comment.