Skip to content

Commit

Permalink
Change WebTransport write API to be always synchronous
Browse files Browse the repository at this point in the history
Summary: Returning a SemiFuture when the stream window was full, and only allowing one was too heavy handed.  Instead, just return the state and add a new API (awaitWritable) for applications that don't want to build a buffer.

Reviewed By: sharmafb

Differential Revision: D66881576

fbshipit-source-id: 38a88929c449df77eac7eb62af65085a4596710b
  • Loading branch information
afrind authored and facebook-github-bot committed Dec 16, 2024
1 parent 84a13b4 commit 31aa878
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ folly::SemiFuture<Message> expectSendMessage(MockWebTransport& wt) {
.WillOnce(
[&wt, promise = folly::MoveWrapper(std::move(promise))](
uint64_t id, std::unique_ptr<folly::IOBuf> data, bool eof) mutable
-> folly::Expected<folly::Unit, proxygen::WebTransport::ErrorCode> {
-> folly::Expected<proxygen::WebTransport::FCState,
proxygen::WebTransport::ErrorCode> {
Message m;
m.id = id;
m.message = std::move(data);
promise->setValue(std::move(m));
EXPECT_TRUE(eof);
wt.cleanupStream(id);
return folly::unit;
return proxygen::WebTransport::FCState::UNBLOCKED;
})
.RetiresOnSaturation();

Expand Down
23 changes: 13 additions & 10 deletions proxygen/lib/http/session/HQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3865,13 +3865,9 @@ HQSession::HQStreamTransport::newWebTransportUniStream() {
return *id;
}

folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>
folly::Expected<WebTransport::FCState, WebTransport::ErrorCode>
HQSession::HQStreamTransport::sendWebTransportStreamData(
HTTPCodec::StreamID id,
std::unique_ptr<folly::IOBuf> data,
bool eof,
quic::StreamWriteCallback* writeCallback) {
HTTPCodec::StreamID id, std::unique_ptr<folly::IOBuf> data, bool eof) {
auto res = session_.sock_->writeChain(id, std::move(data), eof);
if (res.hasError()) {
LOG(ERROR) << "Failed to write WT stream data";
Expand All @@ -3883,14 +3879,21 @@ HQSession::HQStreamTransport::sendWebTransportStreamData(
return folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR);
}
if (!eof && flowControl->sendWindowAvailable == 0) {
session_.sock_->notifyPendingWriteOnStream(id, writeCallback);
VLOG(4) << "Closing fc window";
return WebTransportImpl::TransportProvider::FCState::BLOCKED;
VLOG(4) << "FC window closed";
return WebTransport::FCState::BLOCKED;
} else {
return WebTransportImpl::TransportProvider::FCState::UNBLOCKED;
return WebTransport::FCState::UNBLOCKED;
}
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
HQSession::HQStreamTransport::notifyPendingWriteOnStream(
HTTPCodec::StreamID id, quic::StreamWriteCallback* wcb) {
CHECK(session_.sock_);
session_.sock_->notifyPendingWriteOnStream(id, wcb);
return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
HQSession::HQStreamTransport::resetWebTransportEgress(HTTPCodec::StreamID id,
uint32_t errorCode) {
Expand Down
8 changes: 5 additions & 3 deletions proxygen/lib/http/session/HQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -1832,11 +1832,13 @@ class HQSession
folly::Expected<HTTPCodec::StreamID, WebTransport::ErrorCode>
newWebTransportUniStream() override;

folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>
folly::Expected<WebTransport::FCState, WebTransport::ErrorCode>
sendWebTransportStreamData(HTTPCodec::StreamID /*id*/,
std::unique_ptr<folly::IOBuf> /*data*/,
bool /*eof*/,
bool /*eof*/) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode>
notifyPendingWriteOnStream(HTTPCodec::StreamID,
quic::StreamWriteCallback* wcb) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode>
Expand Down
13 changes: 9 additions & 4 deletions proxygen/lib/http/session/HTTPTransaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -678,12 +678,17 @@ class HTTPTransaction
folly::assume_unreachable();
}

folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>
folly::Expected<WebTransport::FCState, WebTransport::ErrorCode>
sendWebTransportStreamData(HTTPCodec::StreamID /*id*/,
std::unique_ptr<folly::IOBuf> /*data*/,
bool /*eof*/,
quic::StreamWriteCallback* /*wcb*/) override {
bool /*eof*/) override {
LOG(FATAL) << __func__ << " not supported";
folly::assume_unreachable();
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
notifyPendingWriteOnStream(HTTPCodec::StreamID,
quic::StreamWriteCallback*) override {
LOG(FATAL) << __func__ << " not supported";
folly::assume_unreachable();
}
Expand Down
19 changes: 11 additions & 8 deletions proxygen/lib/http/session/test/HQUpstreamSessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2257,20 +2257,23 @@ TEST_P(HQUpstreamSessionTestWebTransport, BidirectionalStream) {
// shrink the fcw to force it to block
socketDriver_->setStreamFlowControlWindow(id, 100);
bool writeComplete = false;
stream.writeHandle->writeStreamData(makeBuf(65536), false)
.value()
.via(&eventBase_)
.then([&](auto) {
VLOG(4) << "big write complete";
// after it completes, write FIN
stream.writeHandle->writeStreamData(nullptr, true)
stream.writeHandle->writeStreamData(makeBuf(65536), false);
stream.writeHandle->awaitWritable().value().via(&eventBase_).then([&](auto) {
VLOG(4) << "big write complete";
// after it completes, write FIN
stream.writeHandle->writeStreamData(nullptr, true);
#if 0
stream.writeHandle
.value()
.via(&eventBase_)
.then([&](auto) {
VLOG(4) << "fin write complete";
writeComplete = true;
});
});
// ug, can't determine fin write complete;
#endif
writeComplete = true;
});
eventBase_.loopOnce();
// grow the fcw which will complete the big write
socketDriver_->setStreamFlowControlWindow(id, 100000);
Expand Down
11 changes: 5 additions & 6 deletions proxygen/lib/http/session/test/HTTPTransactionMocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,16 +233,15 @@ class MockHTTPTransactionTransport : public HTTPTransaction::Transport {
MOCK_METHOD((folly::Expected<HTTPCodec::StreamID, WebTransport::ErrorCode>),
newWebTransportUniStream,
());
MOCK_METHOD((folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>),
MOCK_METHOD((folly::Expected<WebTransport::FCState, WebTransport::ErrorCode>),
sendWebTransportStreamData,
(HTTPCodec::StreamID,
std::unique_ptr<folly::IOBuf>,
bool,
quic::StreamWriteCallback*));
(HTTPCodec::StreamID, std::unique_ptr<folly::IOBuf>, bool));
MOCK_METHOD((folly::Expected<folly::Unit, WebTransport::ErrorCode>),
resetWebTransportEgress,
(HTTPCodec::StreamID, uint32_t));
MOCK_METHOD((folly::Expected<folly::Unit, WebTransport::ErrorCode>),
notifyPendingWriteOnStream,
(HTTPCodec::StreamID, quic::StreamWriteCallback*));

MOCK_METHOD((folly::Expected<std::pair<std::unique_ptr<folly::IOBuf>, bool>,
WebTransport::ErrorCode>),
Expand Down
63 changes: 25 additions & 38 deletions proxygen/lib/http/session/test/HTTPTransactionWebTransportTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include <proxygen/lib/http/session/test/HTTPTransactionMocks.h>

using namespace testing;
using WTFCState = proxygen::WebTransportImpl::TransportProvider::FCState;
using WTFCState = proxygen::WebTransport::FCState;
namespace {
constexpr uint32_t WT_APP_ERROR_1 = 19;
constexpr uint32_t WT_APP_ERROR_2 = 77;
Expand Down Expand Up @@ -122,15 +122,9 @@ TEST_F(HTTPTransactionWebTransportTest, CreateStreams) {
EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1));
auto res2 = wt_->createUniStream();
EXPECT_TRUE(res2.hasValue());
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, true, _))
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, true))
.WillOnce(Return(WTFCState::UNBLOCKED));
res2.value()
->writeStreamData(nullptr, true)
.value()
.via(&eventBase_)
.thenTry(
[](auto writeReady) { EXPECT_FALSE(writeReady.hasException()); });

res2.value()->writeStreamData(nullptr, true);
// Try creating streams but fail at transport
EXPECT_CALL(transport_, newWebTransportBidiStream())
.WillOnce(Return(folly::makeUnexpected(
Expand Down Expand Up @@ -245,7 +239,7 @@ TEST_F(HTTPTransactionWebTransportTest, WriteFails) {
EXPECT_CALL(transport_, newWebTransportUniStream()).WillOnce(Return(1));
auto res = wt_->createUniStream();
EXPECT_TRUE(res.hasValue());
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false, _))
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false))
.WillOnce(
Return(folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR)));
EXPECT_EQ(res.value()->writeStreamData(makeBuf(10), false).error(),
Expand All @@ -260,10 +254,14 @@ TEST_F(HTTPTransactionWebTransportTest, WriteStreamPauseStopSending) {
// Block write, then resume
bool ready = false;
quic::StreamWriteCallback* wcb{nullptr};
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false, _))
.WillOnce(DoAll(SaveArg<3>(&wcb), Return(WTFCState::BLOCKED)));
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false))
.WillOnce(Return(WTFCState::BLOCKED));
auto res = writeHandle.value()->writeStreamData(makeBuf(10), false);
EXPECT_TRUE(res.hasValue());
EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_))
.WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit)));
writeHandle.value()
->writeStreamData(makeBuf(10), false)
->awaitWritable()
.value()
.via(&eventBase_)
.thenTry([&ready](auto writeReady) {
Expand All @@ -277,10 +275,14 @@ TEST_F(HTTPTransactionWebTransportTest, WriteStreamPauseStopSending) {

// Block write/stop sending
ready = false;
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false, _))
.WillOnce(DoAll(SaveArg<3>(&wcb), Return(WTFCState::BLOCKED)));
EXPECT_CALL(transport_, sendWebTransportStreamData(1, testing::_, false))
.WillOnce(Return(WTFCState::BLOCKED));
auto res2 = writeHandle.value()->writeStreamData(makeBuf(10), false);
EXPECT_TRUE(res2.hasValue());
EXPECT_CALL(transport_, notifyPendingWriteOnStream(1, testing::_))
.WillOnce(DoAll(SaveArg<1>(&wcb), Return(folly::unit)));
writeHandle.value()
->writeStreamData(makeBuf(10), false)
->awaitWritable()
.value()
.via(&eventBase_)
.thenTry([&ready, &writeHandle, this](auto writeReady) {
Expand Down Expand Up @@ -321,25 +323,18 @@ TEST_F(HTTPTransactionWebTransportTest, BidiStreamEdgeCases) {

// Cancellation handling
folly::CancellationCallback writeCancel(
streamHandle.writeHandle->getCancelToken(), [&streamHandle, this] {
streamHandle.writeHandle->getCancelToken(), [&streamHandle] {
// Write cancelled:
// We can retrieve the stop sending code from the handle
EXPECT_EQ(*streamHandle.writeHandle->stopSendingErrorCode(),
WT_APP_ERROR_2);
// attempt to write, will error, but don't reset the stream
streamHandle.writeHandle->writeStreamData(makeBuf(10), true)
.value()
.via(&eventBase_)
.thenValue([](auto) {})
.thenError(folly::tag_t<const WebTransport::Exception&>{},
[](auto const& ex) {
VLOG(4) << "write error";
EXPECT_EQ(ex.error, WT_APP_ERROR_2);
});
auto res = streamHandle.writeHandle->writeStreamData(makeBuf(10), true);
EXPECT_TRUE(res.hasError());
EXPECT_EQ(res.error(), WebTransport::ErrorCode::STOP_SENDING);
});
// Deliver SS
txn_->onWebTransportStopSending(0, WT_APP_ERROR_2);
eventBase_.loopOnce();
EXPECT_CALL(transport_,
resetWebTransportEgress(0, WebTransport::kInternalError));
// Note the egress stream was not reset, will be reset when the txn detaches
Expand Down Expand Up @@ -424,18 +419,10 @@ TEST_F(HTTPTransactionWebTransportTest, StreamIDAPIs) {
wt_->stopSending(id, WT_APP_ERROR_1);

// write by ID
bool ready = false;
EXPECT_CALL(transport_, sendWebTransportStreamData(id, testing::_, false, _))
EXPECT_CALL(transport_, sendWebTransportStreamData(id, testing::_, false))
.WillOnce(Return(WTFCState::UNBLOCKED));
wt_->writeStreamData(id, makeBuf(10), false)
.value()
.via(&eventBase_)
.thenTry([&ready](auto writeReady) {
EXPECT_FALSE(writeReady.hasException());
ready = true;
});
eventBase_.loopOnce();
EXPECT_TRUE(ready);
auto res2 = wt_->writeStreamData(id, makeBuf(10), false);
EXPECT_TRUE(res2.hasValue());

// resetStream by ID
EXPECT_CALL(transport_, resetWebTransportEgress(id, WT_APP_ERROR_2));
Expand Down
19 changes: 12 additions & 7 deletions proxygen/lib/http/webtransport/QuicWebTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <proxygen/lib/http/webtransport/QuicWebTransport.h>

using FCState = proxygen::WebTransportImpl::TransportProvider::FCState;
using FCState = proxygen::WebTransport::FCState;

namespace proxygen {

Expand Down Expand Up @@ -92,12 +92,10 @@ QuicWebTransport::newWebTransportUniStream() {
return id.value();
}

folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>
folly::Expected<WebTransport::FCState, WebTransport::ErrorCode>
QuicWebTransport::sendWebTransportStreamData(HTTPCodec::StreamID id,
std::unique_ptr<folly::IOBuf> data,
bool eof,
quic::StreamWriteCallback* wcb) {
bool eof) {
XCHECK(quicSocket_);
auto res = quicSocket_->writeChain(id, std::move(data), eof);
if (!res) {
Expand All @@ -109,14 +107,21 @@ QuicWebTransport::sendWebTransportStreamData(HTTPCodec::StreamID id,
return folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR);
}
if (!eof && flowControl->sendWindowAvailable == 0) {
quicSocket_->notifyPendingWriteOnStream(id, wcb);
VLOG(4) << "Closing fc window";
VLOG(4) << "fc window closed";
return FCState::BLOCKED;
} else {
return FCState::UNBLOCKED;
}
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::notifyPendingWriteOnStream(HTTPCodec::StreamID id,
quic::StreamWriteCallback* wcb) {
XCHECK(quicSocket_);
quicSocket_->notifyPendingWriteOnStream(id, wcb);
return folly::unit;
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
QuicWebTransport::resetWebTransportEgress(HTTPCodec::StreamID id,
uint32_t errorCode) {
Expand Down
13 changes: 8 additions & 5 deletions proxygen/lib/http/webtransport/QuicWebTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,14 @@ class QuicWebTransport

folly::SemiFuture<folly::Unit> awaitBidiStreamCredit() override;

folly::Expected<FCState, WebTransport::ErrorCode> sendWebTransportStreamData(
HTTPCodec::StreamID /*id*/,
std::unique_ptr<folly::IOBuf> /*data*/,
bool /*eof*/,
quic::StreamWriteCallback* /*wcb*/) override;
folly::Expected<WebTransport::FCState, WebTransport::ErrorCode>
sendWebTransportStreamData(HTTPCodec::StreamID /*id*/,
std::unique_ptr<folly::IOBuf> /*data*/,
bool /*eof*/) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode>
notifyPendingWriteOnStream(HTTPCodec::StreamID,
quic::StreamWriteCallback* wcb) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode> resetWebTransportEgress(
HTTPCodec::StreamID /*id*/, uint32_t /*errorCode*/) override;
Expand Down
Loading

0 comments on commit 31aa878

Please sign in to comment.