From 31aa87894130a01af2ac1efb8442d1aba085a14a Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Mon, 16 Dec 2024 11:07:09 -0800 Subject: [PATCH] Change WebTransport write API to be always synchronous Summary: Returning a SemiFuture when the stream window was full, and only allowing one was too heavy handed. Instead, just return the state and add a new API (awaitWritable) for applications that don't want to build a buffer. Reviewed By: sharmafb Differential Revision: D66881576 fbshipit-source-id: 38a88929c449df77eac7eb62af65085a4596710b --- .../hq/devious/test/DeviousBatonTests.cpp | 5 +- proxygen/lib/http/session/HQSession.cpp | 23 ++++--- proxygen/lib/http/session/HQSession.h | 8 ++- proxygen/lib/http/session/HTTPTransaction.h | 13 ++-- .../session/test/HQUpstreamSessionTest.cpp | 19 +++--- .../http/session/test/HTTPTransactionMocks.h | 11 ++-- .../test/HTTPTransactionWebTransportTest.cpp | 63 ++++++++----------- .../http/webtransport/QuicWebTransport.cpp | 19 +++--- .../lib/http/webtransport/QuicWebTransport.h | 13 ++-- proxygen/lib/http/webtransport/WebTransport.h | 24 ++++--- .../http/webtransport/WebTransportImpl.cpp | 45 +++++++------ .../lib/http/webtransport/WebTransportImpl.h | 33 ++++++---- .../test/FakeSharedWebTransport.h | 24 +++++-- proxygen/lib/http/webtransport/test/Mocks.h | 10 ++- .../test/QuicWebTransportTest.cpp | 14 +---- 15 files changed, 181 insertions(+), 143 deletions(-) diff --git a/proxygen/httpserver/samples/hq/devious/test/DeviousBatonTests.cpp b/proxygen/httpserver/samples/hq/devious/test/DeviousBatonTests.cpp index 3fb363a4a1..5a34e5de90 100644 --- a/proxygen/httpserver/samples/hq/devious/test/DeviousBatonTests.cpp +++ b/proxygen/httpserver/samples/hq/devious/test/DeviousBatonTests.cpp @@ -28,14 +28,15 @@ folly::SemiFuture expectSendMessage(MockWebTransport& wt) { .WillOnce( [&wt, promise = folly::MoveWrapper(std::move(promise))]( uint64_t id, std::unique_ptr data, bool eof) mutable - -> folly::Expected { + -> folly::Expected { Message m; m.id = id; m.message = std::move(data); promise->setValue(std::move(m)); EXPECT_TRUE(eof); wt.cleanupStream(id); - return folly::unit; + return proxygen::WebTransport::FCState::UNBLOCKED; }) .RetiresOnSaturation(); diff --git a/proxygen/lib/http/session/HQSession.cpp b/proxygen/lib/http/session/HQSession.cpp index 3e7bb1eb4d..6f8f90a60f 100644 --- a/proxygen/lib/http/session/HQSession.cpp +++ b/proxygen/lib/http/session/HQSession.cpp @@ -3865,13 +3865,9 @@ HQSession::HQStreamTransport::newWebTransportUniStream() { return *id; } -folly::Expected +folly::Expected HQSession::HQStreamTransport::sendWebTransportStreamData( - HTTPCodec::StreamID id, - std::unique_ptr data, - bool eof, - quic::StreamWriteCallback* writeCallback) { + HTTPCodec::StreamID id, std::unique_ptr data, bool eof) { auto res = session_.sock_->writeChain(id, std::move(data), eof); if (res.hasError()) { LOG(ERROR) << "Failed to write WT stream data"; @@ -3883,14 +3879,21 @@ HQSession::HQStreamTransport::sendWebTransportStreamData( return folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR); } if (!eof && flowControl->sendWindowAvailable == 0) { - session_.sock_->notifyPendingWriteOnStream(id, writeCallback); - VLOG(4) << "Closing fc window"; - return WebTransportImpl::TransportProvider::FCState::BLOCKED; + VLOG(4) << "FC window closed"; + return WebTransport::FCState::BLOCKED; } else { - return WebTransportImpl::TransportProvider::FCState::UNBLOCKED; + return WebTransport::FCState::UNBLOCKED; } } +folly::Expected +HQSession::HQStreamTransport::notifyPendingWriteOnStream( + HTTPCodec::StreamID id, quic::StreamWriteCallback* wcb) { + CHECK(session_.sock_); + session_.sock_->notifyPendingWriteOnStream(id, wcb); + return folly::unit; +} + folly::Expected HQSession::HQStreamTransport::resetWebTransportEgress(HTTPCodec::StreamID id, uint32_t errorCode) { diff --git a/proxygen/lib/http/session/HQSession.h b/proxygen/lib/http/session/HQSession.h index b4d3567de2..2e4c963c5f 100644 --- a/proxygen/lib/http/session/HQSession.h +++ b/proxygen/lib/http/session/HQSession.h @@ -1832,11 +1832,13 @@ class HQSession folly::Expected newWebTransportUniStream() override; - folly::Expected + folly::Expected sendWebTransportStreamData(HTTPCodec::StreamID /*id*/, std::unique_ptr /*data*/, - bool /*eof*/, + bool /*eof*/) override; + + folly::Expected + notifyPendingWriteOnStream(HTTPCodec::StreamID, quic::StreamWriteCallback* wcb) override; folly::Expected diff --git a/proxygen/lib/http/session/HTTPTransaction.h b/proxygen/lib/http/session/HTTPTransaction.h index a897d8d4a9..498b6a03d1 100644 --- a/proxygen/lib/http/session/HTTPTransaction.h +++ b/proxygen/lib/http/session/HTTPTransaction.h @@ -678,12 +678,17 @@ class HTTPTransaction folly::assume_unreachable(); } - folly::Expected + folly::Expected sendWebTransportStreamData(HTTPCodec::StreamID /*id*/, std::unique_ptr /*data*/, - bool /*eof*/, - quic::StreamWriteCallback* /*wcb*/) override { + bool /*eof*/) override { + LOG(FATAL) << __func__ << " not supported"; + folly::assume_unreachable(); + } + + folly::Expected + notifyPendingWriteOnStream(HTTPCodec::StreamID, + quic::StreamWriteCallback*) override { LOG(FATAL) << __func__ << " not supported"; folly::assume_unreachable(); } diff --git a/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp b/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp index 0648b3079a..a720b15b8c 100644 --- a/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp +++ b/proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp @@ -2257,20 +2257,23 @@ TEST_P(HQUpstreamSessionTestWebTransport, BidirectionalStream) { // shrink the fcw to force it to block socketDriver_->setStreamFlowControlWindow(id, 100); bool writeComplete = false; - stream.writeHandle->writeStreamData(makeBuf(65536), false) - .value() - .via(&eventBase_) - .then([&](auto) { - VLOG(4) << "big write complete"; - // after it completes, write FIN - stream.writeHandle->writeStreamData(nullptr, true) + stream.writeHandle->writeStreamData(makeBuf(65536), false); + stream.writeHandle->awaitWritable().value().via(&eventBase_).then([&](auto) { + VLOG(4) << "big write complete"; + // after it completes, write FIN + stream.writeHandle->writeStreamData(nullptr, true); +#if 0 + stream.writeHandle .value() .via(&eventBase_) .then([&](auto) { VLOG(4) << "fin write complete"; writeComplete = true; }); - }); + // ug, can't determine fin write complete; +#endif + writeComplete = true; + }); eventBase_.loopOnce(); // grow the fcw which will complete the big write socketDriver_->setStreamFlowControlWindow(id, 100000); diff --git a/proxygen/lib/http/session/test/HTTPTransactionMocks.h b/proxygen/lib/http/session/test/HTTPTransactionMocks.h index eb16ebde46..77307e82ce 100644 --- a/proxygen/lib/http/session/test/HTTPTransactionMocks.h +++ b/proxygen/lib/http/session/test/HTTPTransactionMocks.h @@ -233,16 +233,15 @@ class MockHTTPTransactionTransport : public HTTPTransaction::Transport { MOCK_METHOD((folly::Expected), newWebTransportUniStream, ()); - MOCK_METHOD((folly::Expected), + MOCK_METHOD((folly::Expected), sendWebTransportStreamData, - (HTTPCodec::StreamID, - std::unique_ptr, - bool, - quic::StreamWriteCallback*)); + (HTTPCodec::StreamID, std::unique_ptr, bool)); MOCK_METHOD((folly::Expected), resetWebTransportEgress, (HTTPCodec::StreamID, uint32_t)); + MOCK_METHOD((folly::Expected), + notifyPendingWriteOnStream, + (HTTPCodec::StreamID, quic::StreamWriteCallback*)); MOCK_METHOD((folly::Expected, bool>, WebTransport::ErrorCode>), diff --git a/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp b/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp index 519fe2f282..ddc21f59b7 100644 --- a/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp +++ b/proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp @@ -12,7 +12,7 @@ #include using namespace testing; -using WTFCState = proxygen::WebTransportImpl::TransportProvider::FCState; +using WTFCState = proxygen::WebTransport::FCState; namespace { constexpr uint32_t WT_APP_ERROR_1 = 19; constexpr uint32_t WT_APP_ERROR_2 = 77; @@ -122,15 +122,9 @@ TEST_F(HTTPTransactionWebTransportTest, CreateStreams) { EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1)); auto res2 = wt_->createUniStream(); EXPECT_TRUE(res2.hasValue()); - EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, true, _)) + EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, true)) .WillOnce(Return(WTFCState::UNBLOCKED)); - res2.value() - ->writeStreamData(nullptr, true) - .value() - .via(&eventBase_) - .thenTry( - [](auto writeReady) { EXPECT_FALSE(writeReady.hasException()); }); - + res2.value()->writeStreamData(nullptr, true); // Try creating streams but fail at transport EXPECT_CALL(transport_, newWebTransportBidiStream()) .WillOnce(Return(folly::makeUnexpected( @@ -245,7 +239,7 @@ TEST_F(HTTPTransactionWebTransportTest, WriteFails) { EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1)); auto res = wt_->createUniStream(); EXPECT_TRUE(res.hasValue()); - EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false, _)) + EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false)) .WillOnce( Return(folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR))); EXPECT_EQ(res.value()->writeStreamData(makeBuf(10), false).error(), @@ -260,10 +254,14 @@ TEST_F(HTTPTransactionWebTransportTest, WriteStreamPauseStopSending) { // Block write, then resume bool ready = false; quic::StreamWriteCallback* wcb{nullptr}; - EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false, _)) - .WillOnce(DoAll(SaveArg<3>(&wcb), Return(WTFCState::BLOCKED))); + EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false)) + .WillOnce(Return(WTFCState::BLOCKED)); + auto res = writeHandle.value()->writeStreamData(makeBuf(10), false); + EXPECT_TRUE(res.hasValue()); + EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_)) + .WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit))); writeHandle.value() - ->writeStreamData(makeBuf(10), false) + ->awaitWritable() .value() .via(&eventBase_) .thenTry([&ready](auto writeReady) { @@ -277,10 +275,14 @@ TEST_F(HTTPTransactionWebTransportTest, WriteStreamPauseStopSending) { // Block write/stop sending ready = false; - EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false, _)) - .WillOnce(DoAll(SaveArg<3>(&wcb), Return(WTFCState::BLOCKED))); + EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false)) + .WillOnce(Return(WTFCState::BLOCKED)); + auto res2 = writeHandle.value()->writeStreamData(makeBuf(10), false); + EXPECT_TRUE(res2.hasValue()); + EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_)) + .WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit))); writeHandle.value() - ->writeStreamData(makeBuf(10), false) + ->awaitWritable() .value() .via(&eventBase_) .thenTry([&ready, &writeHandle, this](auto writeReady) { @@ -321,25 +323,18 @@ TEST_F(HTTPTransactionWebTransportTest, BidiStreamEdgeCases) { // Cancellation handling folly::CancellationCallback writeCancel( - streamHandle.writeHandle->getCancelToken(), [&streamHandle, this] { + streamHandle.writeHandle->getCancelToken(), [&streamHandle] { // Write cancelled: // We can retrieve the stop sending code from the handle EXPECT_EQ(*streamHandle.writeHandle->stopSendingErrorCode(), WT_APP_ERROR_2); // attempt to write, will error, but don't reset the stream - streamHandle.writeHandle->writeStreamData(makeBuf(10), true) - .value() - .via(&eventBase_) - .thenValue([](auto) {}) - .thenError(folly::tag_t{}, - [](auto const& ex) { - VLOG(4) << "write error"; - EXPECT_EQ(ex.error, WT_APP_ERROR_2); - }); + auto res = streamHandle.writeHandle->writeStreamData(makeBuf(10), true); + EXPECT_TRUE(res.hasError()); + EXPECT_EQ(res.error(), WebTransport::ErrorCode::STOP_SENDING); }); // Deliver SS txn_->onWebTransportStopSending(0, WT_APP_ERROR_2); - eventBase_.loopOnce(); EXPECT_CALL(transport_, resetWebTransportEgress(0, WebTransport::kInternalError)); // Note the egress stream was not reset, will be reset when the txn detaches @@ -424,18 +419,10 @@ TEST_F(HTTPTransactionWebTransportTest, StreamIDAPIs) { wt_->stopSending(id, WT_APP_ERROR_1); // write by ID - bool ready = false; - EXPECT_CALL(transport_, sendWebTransportStreamData(id, testing::_, false, _)) + EXPECT_CALL(transport_, sendWebTransportStreamData(id, testing::_, false)) .WillOnce(Return(WTFCState::UNBLOCKED)); - wt_->writeStreamData(id, makeBuf(10), false) - .value() - .via(&eventBase_) - .thenTry([&ready](auto writeReady) { - EXPECT_FALSE(writeReady.hasException()); - ready = true; - }); - eventBase_.loopOnce(); - EXPECT_TRUE(ready); + auto res2 = wt_->writeStreamData(id, makeBuf(10), false); + EXPECT_TRUE(res2.hasValue()); // resetStream by ID EXPECT_CALL(transport_, resetWebTransportEgress(id, WT_APP_ERROR_2)); diff --git a/proxygen/lib/http/webtransport/QuicWebTransport.cpp b/proxygen/lib/http/webtransport/QuicWebTransport.cpp index 7f8de760e5..68d5a66892 100644 --- a/proxygen/lib/http/webtransport/QuicWebTransport.cpp +++ b/proxygen/lib/http/webtransport/QuicWebTransport.cpp @@ -8,7 +8,7 @@ #include -using FCState = proxygen::WebTransportImpl::TransportProvider::FCState; +using FCState = proxygen::WebTransport::FCState; namespace proxygen { @@ -92,12 +92,10 @@ QuicWebTransport::newWebTransportUniStream() { return id.value(); } -folly::Expected +folly::Expected QuicWebTransport::sendWebTransportStreamData(HTTPCodec::StreamID id, std::unique_ptr data, - bool eof, - quic::StreamWriteCallback* wcb) { + bool eof) { XCHECK(quicSocket_); auto res = quicSocket_->writeChain(id, std::move(data), eof); if (!res) { @@ -109,14 +107,21 @@ QuicWebTransport::sendWebTransportStreamData(HTTPCodec::StreamID id, return folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR); } if (!eof && flowControl->sendWindowAvailable == 0) { - quicSocket_->notifyPendingWriteOnStream(id, wcb); - VLOG(4) << "Closing fc window"; + VLOG(4) << "fc window closed"; return FCState::BLOCKED; } else { return FCState::UNBLOCKED; } } +folly::Expected +QuicWebTransport::notifyPendingWriteOnStream(HTTPCodec::StreamID id, + quic::StreamWriteCallback* wcb) { + XCHECK(quicSocket_); + quicSocket_->notifyPendingWriteOnStream(id, wcb); + return folly::unit; +} + folly::Expected QuicWebTransport::resetWebTransportEgress(HTTPCodec::StreamID id, uint32_t errorCode) { diff --git a/proxygen/lib/http/webtransport/QuicWebTransport.h b/proxygen/lib/http/webtransport/QuicWebTransport.h index a5602dcd2a..b5c4f32469 100644 --- a/proxygen/lib/http/webtransport/QuicWebTransport.h +++ b/proxygen/lib/http/webtransport/QuicWebTransport.h @@ -66,11 +66,14 @@ class QuicWebTransport folly::SemiFuture awaitBidiStreamCredit() override; - folly::Expected sendWebTransportStreamData( - HTTPCodec::StreamID /*id*/, - std::unique_ptr /*data*/, - bool /*eof*/, - quic::StreamWriteCallback* /*wcb*/) override; + folly::Expected + sendWebTransportStreamData(HTTPCodec::StreamID /*id*/, + std::unique_ptr /*data*/, + bool /*eof*/) override; + + folly::Expected + notifyPendingWriteOnStream(HTTPCodec::StreamID, + quic::StreamWriteCallback* wcb) override; folly::Expected resetWebTransportEgress( HTTPCodec::StreamID /*id*/, uint32_t /*errorCode*/) override; diff --git a/proxygen/lib/http/webtransport/WebTransport.h b/proxygen/lib/http/webtransport/WebTransport.h index f1cfb12e58..fa12f80323 100644 --- a/proxygen/lib/http/webtransport/WebTransport.h +++ b/proxygen/lib/http/webtransport/WebTransport.h @@ -41,7 +41,8 @@ class WebTransport { GENERIC_ERROR = 0x00, INVALID_STREAM_ID, STREAM_CREATION_ERROR, - SEND_ERROR + SEND_ERROR, + STOP_SENDING }; static bool isConnectMessage(const proxygen::HTTPMessage& msg) { @@ -146,13 +147,13 @@ class WebTransport { uint32_t error) = 0; }; + enum class FCState { BLOCKED, UNBLOCKED }; // Handle for write streams class StreamWriteHandle : public StreamHandleBase { public: ~StreamWriteHandle() override = default; - // Write the data and optional fin to the stream. The returned Future will - // complete when the stream is available for more writes. + // Write the data and optional fin to the stream. // // The StreamWriteHandle becomes invalid after calling writeStreamData with // fin=true or calling resetStream. @@ -163,8 +164,8 @@ class WebTransport { // CancellationCallback. Calling writeStreamData from the callback will // fail with a WebTransport::Exception with the stopSendingErrorCode. // After the cancellation callback, the StreamWriteHandle is invalid. - virtual folly::Expected, ErrorCode> - writeStreamData(std::unique_ptr data, bool fin) = 0; + virtual folly::Expected writeStreamData( + std::unique_ptr data, bool fin) = 0; // Reset the stream with the given error virtual folly::Expected resetStream( @@ -178,6 +179,11 @@ class WebTransport { virtual folly::Expected setPriority( uint8_t level, uint64_t order, bool incremental) = 0; + // The returned Future will complete when the stream is available for more + // writes. + virtual folly::Expected, ErrorCode> + awaitWritable() = 0; + protected: folly::Optional stopSendingErrorCode_; }; @@ -212,14 +218,14 @@ class WebTransport { virtual folly::Expected, WebTransport::ErrorCode> readStreamData(uint64_t id) = 0; - virtual folly::Expected, ErrorCode> - writeStreamData(uint64_t id, - std::unique_ptr data, - bool fin) = 0; + virtual folly::Expected writeStreamData( + uint64_t id, std::unique_ptr data, bool fin) = 0; virtual folly::Expected resetStream( uint64_t streamId, uint32_t error) = 0; virtual folly::Expected setPriority( uint64_t streamId, uint8_t level, uint64_t order, bool incremental) = 0; + virtual folly::Expected, ErrorCode> + awaitWritable(uint64_t streamId) = 0; virtual folly::Expected stopSending( uint64_t streamId, uint32_t error) = 0; diff --git a/proxygen/lib/http/webtransport/WebTransportImpl.cpp b/proxygen/lib/http/webtransport/WebTransportImpl.cpp index c10058ac6d..52c73d3de5 100644 --- a/proxygen/lib/http/webtransport/WebTransportImpl.cpp +++ b/proxygen/lib/http/webtransport/WebTransportImpl.cpp @@ -106,13 +106,12 @@ WebTransportImpl::StreamReadHandle* WebTransportImpl::onWebTransportUniStream( return &ingRes.first->second; } -folly::Expected WebTransportImpl::sendWebTransportStreamData(HTTPCodec::StreamID id, std::unique_ptr data, - bool eof, - quic::StreamWriteCallback* wcb) { - auto res = tp_.sendWebTransportStreamData(id, std::move(data), eof, wcb); + bool eof) { + auto res = tp_.sendWebTransportStreamData(id, std::move(data), eof); if (eof || res.hasError()) { wtEgressStreams_.erase(id); } @@ -138,28 +137,27 @@ WebTransportImpl::stopReadingWebTransportIngress(HTTPCodec::StreamID id, return res; } -folly::Expected, WebTransport::ErrorCode> +folly::Expected WebTransportImpl::StreamWriteHandle::writeStreamData( std::unique_ptr data, bool fin) { - CHECK(!writePromise_) << "Wait for previous write to complete"; if (stopSendingErrorCode_) { - return folly::makeSemiFuture( - folly::make_exception_wrapper( - *stopSendingErrorCode_)); + return folly::makeUnexpected(WebTransport::ErrorCode::STOP_SENDING); } impl_.sp_.refreshTimeout(); - auto fcState = - impl_.sendWebTransportStreamData(id_, std::move(data), fin, this); + auto fcState = impl_.sendWebTransportStreamData(id_, std::move(data), fin); if (fcState.hasError()) { return folly::makeUnexpected(fcState.error()); } - if (*fcState == TransportProvider::FCState::UNBLOCKED) { - return folly::makeSemiFuture(folly::unit); - } else { - auto contract = folly::makePromiseContract(); - writePromise_.emplace(std::move(contract.promise)); - return std::move(contract.future); - } + return *fcState; +} + +folly::Expected, WebTransport::ErrorCode> +WebTransportImpl::StreamWriteHandle::awaitWritable() { + CHECK(!writePromise_) << "awaitWritable already called"; + auto contract = folly::makePromiseContract(); + writePromise_.emplace(std::move(contract.promise)); + impl_.tp_.notifyPendingWriteOnStream(id_, this); + return std::move(contract.future); } void WebTransportImpl::onWebTransportStopSending(HTTPCodec::StreamID id, @@ -233,14 +231,13 @@ void WebTransportImpl::StreamReadHandle::readAvailable( bool eof = readRes.value().second; // deliver data, eof auto state = dataAvailable(std::move(data), eof); - if (state == TransportProvider::FCState::BLOCKED && !eof) { + if (state == WebTransport::FCState::BLOCKED && !eof) { VLOG(4) << __func__ << " pausing reads"; impl_.tp_.pauseWebTransportIngress(id); } } -WebTransportImpl::TransportProvider::FCState -WebTransportImpl::StreamReadHandle::dataAvailable( +WebTransport::FCState WebTransportImpl::StreamReadHandle::dataAvailable( std::unique_ptr data, bool eof) { VLOG(4) << "dataAvailable buflen=" << (data ? data->computeChainDataLength() : 0) @@ -250,7 +247,7 @@ WebTransportImpl::StreamReadHandle::dataAvailable( readPromise_.reset(); if (eof) { impl_.wtIngressStreams_.erase(getID()); - return TransportProvider::FCState::UNBLOCKED; + return WebTransport::FCState::UNBLOCKED; } } else { buf_.append(std::move(data)); @@ -258,8 +255,8 @@ WebTransportImpl::StreamReadHandle::dataAvailable( } VLOG(4) << "dataAvailable buflen=" << buf_.chainLength(); return (eof || buf_.chainLength() < kMaxWTIngressBuf) - ? TransportProvider::FCState::UNBLOCKED - : TransportProvider::FCState::BLOCKED; + ? WebTransport::FCState::UNBLOCKED + : WebTransport::FCState::BLOCKED; } void WebTransportImpl::StreamReadHandle::readError( diff --git a/proxygen/lib/http/webtransport/WebTransportImpl.h b/proxygen/lib/http/webtransport/WebTransportImpl.h index c8483a9d17..5c7899178f 100644 --- a/proxygen/lib/http/webtransport/WebTransportImpl.h +++ b/proxygen/lib/http/webtransport/WebTransportImpl.h @@ -30,12 +30,14 @@ class WebTransportImpl : public WebTransport { virtual folly::SemiFuture awaitBidiStreamCredit() = 0; - enum class FCState { BLOCKED, UNBLOCKED }; virtual folly::Expected sendWebTransportStreamData(HTTPCodec::StreamID /*id*/, std::unique_ptr /*data*/, - bool /*eof*/, - quic::StreamWriteCallback* /*wcb*/) = 0; + bool /*eof*/) = 0; + + virtual folly::Expected + notifyPendingWriteOnStream(HTTPCodec::StreamID, + quic::StreamWriteCallback* wcb) = 0; virtual folly::Expected resetWebTransportEgress(HTTPCodec::StreamID /*id*/, @@ -117,7 +119,7 @@ class WebTransportImpl : public WebTransport { } return it->second.stopSending(error); } - folly::Expected, ErrorCode> writeStreamData( + folly::Expected writeStreamData( uint64_t id, std::unique_ptr data, bool fin) override { auto it = wtEgressStreams_.find(id); if (it == wtEgressStreams_.end()) { @@ -141,6 +143,14 @@ class WebTransportImpl : public WebTransport { } return it->second.setPriority(level, order, incremental); } + folly::Expected, ErrorCode> awaitWritable( + uint64_t streamId) override { + auto it = wtEgressStreams_.find(streamId); + if (it == wtEgressStreams_.end()) { + return folly::makeUnexpected(WebTransport::ErrorCode::INVALID_STREAM_ID); + } + return it->second.awaitWritable(); + } folly::Expected sendDatagram( std::unique_ptr datagram) override { // This can bypass the size and state machine checks in @@ -176,8 +186,8 @@ class WebTransportImpl : public WebTransport { return id_; } - folly::Expected, WebTransport::ErrorCode> - writeStreamData(std::unique_ptr data, bool fin) override; + folly::Expected writeStreamData( + std::unique_ptr data, bool fin) override; folly::Expected resetStream( uint32_t errorCode) override { @@ -189,6 +199,9 @@ class WebTransportImpl : public WebTransport { return impl_.tp_.setWebTransportStreamPriority( getID(), {level, incremental, order}); } + folly::Expected, ErrorCode> awaitWritable() + override; + void onStopSending(uint32_t errorCode); // TODO: what happens to promise_ if this stream is reset or the @@ -228,8 +241,7 @@ class WebTransportImpl : public WebTransport { return impl_.stopReadingWebTransportIngress(id_, error); } - TransportProvider::FCState dataAvailable(std::unique_ptr data, - bool eof); + FCState dataAvailable(std::unique_ptr data, bool eof); void deliverReadError(uint32_t error); [[nodiscard]] bool open() const { return !eof_ && !error_; @@ -250,11 +262,10 @@ class WebTransportImpl : public WebTransport { }; private: - folly::Expected + folly::Expected sendWebTransportStreamData(HTTPCodec::StreamID id, std::unique_ptr data, - bool eof, - quic::StreamWriteCallback* wcb); + bool eof); folly::Expected resetWebTransportEgress( HTTPCodec::StreamID id, uint32_t errorCode); diff --git a/proxygen/lib/http/webtransport/test/FakeSharedWebTransport.h b/proxygen/lib/http/webtransport/test/FakeSharedWebTransport.h index 9ab7cfdf2f..bcfadf5713 100644 --- a/proxygen/lib/http/webtransport/test/FakeSharedWebTransport.h +++ b/proxygen/lib/http/webtransport/test/FakeSharedWebTransport.h @@ -53,13 +53,15 @@ class FakeStreamHandle } } GenericApiRet stopSending(uint32_t code) override { - stopSendingErrorCode_ = code; - cs_.requestCancellation(); + if (!stopSendingErrorCode_) { + stopSendingErrorCode_ = code; + cs_.requestCancellation(); + } return folly::unit; } using WriteStreamDataRet = - folly::Expected, WebTransport::ErrorCode>; + folly::Expected; WriteStreamDataRet writeStreamData(std::unique_ptr data, bool fin) override { buf_.append(std::move(data)); @@ -69,6 +71,11 @@ class FakeStreamHandle promise_.reset(); } else { } + return WebTransport::FCState::UNBLOCKED; + } + + folly::Expected, WebTransport::ErrorCode> + awaitWritable() override { return folly::makeFuture(folly::unit); } @@ -173,7 +180,7 @@ class FakeSharedWebTransport : public WebTransport { return h->second->readStreamData(); } - folly::Expected, ErrorCode> writeStreamData( + folly::Expected writeStreamData( uint64_t id, std::unique_ptr data, bool fin) override { auto h = writeHandles.find(id); if (h == writeHandles.end()) { @@ -182,6 +189,15 @@ class FakeSharedWebTransport : public WebTransport { return h->second->writeStreamData(std::move(data), fin); } + folly::Expected, ErrorCode> awaitWritable( + uint64_t id) override { + auto h = writeHandles.find(id); + if (h == writeHandles.end()) { + return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR); + } + return h->second->awaitWritable(); + } + folly::Expected resetStream(uint64_t streamId, uint32_t error) override { auto h = writeHandles.find(streamId); diff --git a/proxygen/lib/http/webtransport/test/Mocks.h b/proxygen/lib/http/webtransport/test/Mocks.h index b56d356b65..2376e0171a 100644 --- a/proxygen/lib/http/webtransport/test/Mocks.h +++ b/proxygen/lib/http/webtransport/test/Mocks.h @@ -37,10 +37,14 @@ class MockStreamWriteHandle : public WebTransport::StreamWriteHandle { MOCK_METHOD(uint64_t, getID, ()); MOCK_METHOD(folly::CancellationToken, getCancelToken, ()); using WriteStreamDataRet = - folly::Expected, WebTransport::ErrorCode>; + folly::Expected; MOCK_METHOD(WriteStreamDataRet, writeStreamData, (std::unique_ptr, bool)); + MOCK_METHOD((folly::Expected, + WebTransport::ErrorCode>), + awaitWritable, + ()); MOCK_METHOD(GenericApiRet, resetStream, (uint32_t)); MOCK_METHOD(GenericApiRet, setPriority, (uint8_t, uint64_t, bool)); @@ -89,6 +93,10 @@ class MockWebTransport : public WebTransport { MOCK_METHOD(GenericApiRet, stopSending, (uint64_t, uint32_t)); MOCK_METHOD(GenericApiRet, sendDatagram, (std::unique_ptr)); MOCK_METHOD(GenericApiRet, closeSession, (folly::Optional)); + MOCK_METHOD((folly::Expected, + WebTransport::ErrorCode>), + awaitWritable, + (uint64_t)); void cleanupStream(uint64_t id) { auto handleIt = writeHandles_.find(id); diff --git a/proxygen/lib/http/webtransport/test/QuicWebTransportTest.cpp b/proxygen/lib/http/webtransport/test/QuicWebTransportTest.cpp index 806444a958..4a9fe2a81f 100644 --- a/proxygen/lib/http/webtransport/test/QuicWebTransportTest.cpp +++ b/proxygen/lib/http/webtransport/test/QuicWebTransportTest.cpp @@ -122,17 +122,9 @@ TEST_F(QuicWebTransportTest, OnStopSending) { socketDriver_.addStopSending(id, WT_ERROR_1); eventBase_.loopOnce(); auto res = handle.value()->writeStreamData(nullptr, true); - EXPECT_FALSE(res.hasError()); - EXPECT_TRUE(res.value().isReady()); - bool err = false; - auto fut = std::move(res.value()).via(&eventBase_); - std::move(fut).thenError(folly::tag_t{}, - [&](auto const& ex) { - EXPECT_EQ(ex.error, WT_ERROR_1); - err = true; - }); - eventBase_.loop(); - EXPECT_TRUE(err); + EXPECT_TRUE(res.hasError()); + EXPECT_EQ(res.error(), WebTransport::ErrorCode::STOP_SENDING); + EXPECT_EQ(*handle.value()->stopSendingErrorCode(), WT_ERROR_1); } TEST_F(QuicWebTransportTest, ConnectionError) {