Skip to content

Commit

Permalink
Return remaining bytes from exchange sources (facebookincubator#8758)
Browse files Browse the repository at this point in the history
Summary:

This is the first diff to upgrade the exchange protocol.  This change
only exposes the remaining bytes to buffer manager; it does not change the
existing protocol yet, and is compatible with the current Prestissimo code.

See prestodb/presto#21926 for details about the
design.

Reviewed By: mbasmanova

Differential Revision: D53793123
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 15, 2024
1 parent ec6741c commit 00f46e8
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 42 deletions.
14 changes: 13 additions & 1 deletion velox/exec/ExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,26 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {

/// Boolean indicating that there will be no more data.
const bool atEnd;

/// Number of bytes still buffered at the source. Each element represent
/// one page, and the consumer can choose to fetch a prefix of them
/// according to the memory restriction.
const std::vector<int64_t> remainingBytes;
};

/// Requests the producer to generate up to 'maxBytes' more data and reply
/// within 'maxWaitSeconds'. Returns a future that completes when producer
/// responds either with 'data' or with a message indicating that all data has
/// been already produced or data will take more time to produce.
virtual folly::SemiFuture<Response> request(
uint32_t /*maxBytes*/,
uint32_t maxBytes,
uint32_t maxWaitSeconds) = 0;

/// Ask for available data sizes that can be fetched. Normally should not
/// fetching any actual data (i.e. Response::bytes should be 0). However for
/// backward compatibility (e.g. communicating with coordinator), we allow
/// small data (1MB) to be returned.
virtual folly::SemiFuture<Response> requestDataSizes(
uint32_t /*maxWaitSeconds*/) {
VELOX_NYI();
}
Expand Down
49 changes: 39 additions & 10 deletions velox/exec/OutputBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ void ArbitraryBuffer::enqueue(std::unique_ptr<SerializedPage> page) {
pages_.push_back(std::shared_ptr<SerializedPage>(page.release()));
}

void ArbitraryBuffer::getAvailablePageSizes(std::vector<int64_t>& out) const {
out.reserve(out.size() + pages_.size());
for (const auto& page : pages_) {
if (page != nullptr) {
out.push_back(page->size());
}
}
}

std::vector<std::shared_ptr<SerializedPage>> ArbitraryBuffer::getPages(
uint64_t maxBytes) {
VELOX_CHECK_GT(maxBytes, 0, "maxBytes can't be zero");
Expand Down Expand Up @@ -90,14 +99,15 @@ void DestinationBuffer::Stats::recordDelete(const SerializedPage& data) {
recordAcknowledge(data);
}

std::vector<std::unique_ptr<folly::IOBuf>> DestinationBuffer::getData(
DestinationBuffer::Data DestinationBuffer::getData(
uint64_t maxBytes,
int64_t sequence,
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck,
ArbitraryBuffer* arbitraryBuffer) {
VELOX_CHECK_GE(
sequence, sequence_, "Get received for an already acknowledged item");
VELOX_CHECK_GT(maxBytes, 0);
if (arbitraryBuffer != nullptr) {
loadData(arbitraryBuffer, maxBytes);
}
Expand All @@ -121,22 +131,39 @@ std::vector<std::unique_ptr<folly::IOBuf>> DestinationBuffer::getData(
return {};
}

std::vector<std::unique_ptr<folly::IOBuf>> result;
std::vector<std::unique_ptr<folly::IOBuf>> data;
uint64_t resultBytes = 0;
for (auto i = sequence - sequence_; i < data_.size(); ++i) {
auto i = sequence - sequence_;
for (; i < data_.size(); ++i) {
// nullptr is used as end marker
if (data_[i] == nullptr) {
VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle");
result.push_back(nullptr);
data.push_back(nullptr);
++i;
break;
}
result.push_back(data_[i]->getIOBuf());
data.push_back(data_[i]->getIOBuf());
resultBytes += data_[i]->size();
if (resultBytes >= maxBytes) {
++i;
break;
}
}
return result;
bool atEnd = false;
std::vector<int64_t> remainingBytes;
remainingBytes.reserve(data_.size() - i);
for (; i < data_.size(); ++i) {
if (data_[i] == nullptr) {
VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle");
atEnd = true;
break;
}
remainingBytes.push_back(data_[i]->size());
}
if (!atEnd && arbitraryBuffer) {
arbitraryBuffer->getAvailablePageSizes(remainingBytes);
}
return {std::move(data), std::move(remainingBytes), true};
}

void DestinationBuffer::enqueue(std::shared_ptr<SerializedPage> data) {
Expand All @@ -159,7 +186,9 @@ DataAvailable DestinationBuffer::getAndClearNotify() {
DataAvailable result;
result.callback = notify_;
result.sequence = notifySequence_;
result.data = getData(notifyMaxBytes_, notifySequence_, nullptr, nullptr);
auto data = getData(notifyMaxBytes_, notifySequence_, nullptr, nullptr);
result.data = std::move(data.data);
result.remainingBytes = std::move(data.remainingBytes);
clearNotify();
return result;
}
Expand Down Expand Up @@ -666,7 +695,7 @@ void OutputBuffer::getData(
int64_t sequence,
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck) {
std::vector<std::unique_ptr<folly::IOBuf>> data;
DestinationBuffer::Data data;
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<ContinuePromise> promises;
{
Expand All @@ -689,8 +718,8 @@ void OutputBuffer::getData(
maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get());
}
releaseAfterAcknowledge(freed, promises);
if (!data.empty()) {
notify(std::move(data), sequence);
if (data.immediate) {
notify(std::move(data.data), sequence, std::move(data.remainingBytes));
}
}

Expand Down
31 changes: 27 additions & 4 deletions velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ namespace facebook::velox::exec {
/// sequence is the same as specified in BufferManager::getData call. The
/// caller is expected to advance sequence by the number of entries in groups
/// and call BufferManager::acknowledge.
using DataAvailableCallback = std::function<
void(std::vector<std::unique_ptr<folly::IOBuf>> pages, int64_t sequence)>;
using DataAvailableCallback = std::function<void(
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t sequence,
std::vector<int64_t> remainingBytes)>;

/// Callback provided to indicate if the consumer of a destination buffer is
/// currently active or not. It is used by arbitrary output buffer to optimize
Expand All @@ -41,10 +43,11 @@ struct DataAvailable {
DataAvailableCallback callback;
int64_t sequence;
std::vector<std::unique_ptr<folly::IOBuf>> data;
std::vector<int64_t> remainingBytes;

void notify() {
if (callback) {
callback(std::move(data), sequence);
callback(std::move(data), sequence, remainingBytes);
}
}
};
Expand Down Expand Up @@ -78,6 +81,9 @@ class ArbitraryBuffer {
/// there are sufficient buffered pages.
std::vector<std::shared_ptr<SerializedPage>> getPages(uint64_t maxBytes);

/// Append the available page sizes to `out'.
void getAvailablePageSizes(std::vector<int64_t>& out) const;

std::string toString() const;

private:
Expand Down Expand Up @@ -125,13 +131,30 @@ class DestinationBuffer {
/// arbitrary buffer on demand.
void loadData(ArbitraryBuffer* buffer, uint64_t maxBytes);

struct Data {
/// The actual data available at this buffer.
std::vector<std::unique_ptr<folly::IOBuf>> data;

/// The byte sizes of pages that can be fetched.
std::vector<int64_t> remainingBytes;

/// Whether the result is returned immediately without invoking the `notify'
/// callback.
bool immediate;
};

/// Returns a shallow copy (folly::IOBuf::clone) of the data starting at
/// 'sequence', stopping after exceeding 'maxBytes'. If there is no data,
/// 'notify' is installed so that this gets called when data is added. If not
/// null, 'activeCheck' is used to check if the consumer of a destination
/// buffer with 'notify' installed is currently active or not. This only
/// applies for arbitrary output buffer for now.
std::vector<std::unique_ptr<folly::IOBuf>> getData(
///
/// When arbitraryBuffer is provided, and this buffer is not at end (no null
/// marker received), we append the remaining bytes from arbitraryBuffer in
/// the result, even the arbitraryBuffer could be shared among multiple
/// DestinationBuffers.
Data getData(
uint64_t maxBytes,
int64_t sequence,
DataAvailableCallback notify,
Expand Down
25 changes: 25 additions & 0 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ class OutputBufferManager {
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck = nullptr);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
bool getData(
const std::string& taskId,
int destination,
uint64_t maxBytes,
int64_t sequence,
std::function<void(
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t sequence)> notify,
DataConsumerActiveCheckCallback activeCheck = nullptr) {
return getData(
taskId,
destination,
maxBytes,
sequence,
[notify = std::move(notify)](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t sequence,
std::vector<int64_t> /*remainingBytes*/) mutable {
notify(std::move(pages), sequence);
},
std::move(activeCheck));
}
#endif

void removeTask(const std::string& taskId);

static std::weak_ptr<OutputBufferManager> getInstance();
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/LimitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ TEST_F(LimitTest, partialLimitEagerFlush) {
[numPagesPromise =
std::make_shared<folly::Promise<int>>(std::move(numPagesPromise))](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t /*sequence*/) {
int64_t /*sequence*/,
std::vector<int64_t> /*remainingBytes*/) {
numPagesPromise->setValue(pages.size());
}));
ASSERT_GE(std::move(numPagesFuture).get(std::chrono::seconds(1)), 10);
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,8 @@ TEST_F(MultiFragmentTest, taskTerminateWithPendingOutputBuffers) {
maxBytes,
sequence,
[&](std::vector<std::unique_ptr<folly::IOBuf>> iobufs,
int64_t inSequence) {
int64_t inSequence,
std::vector<int64_t> /*remainingBytes*/) {
for (auto& iobuf : iobufs) {
if (iobuf != nullptr) {
++inSequence;
Expand Down Expand Up @@ -1726,7 +1727,7 @@ class DataFetcher {
destination_,
maxBytes_,
sequence,
[&](auto pages, auto sequence) mutable {
[&](auto pages, auto sequence, auto /*remainingBytes*/) mutable {
const auto nextSequence = sequence + pages.size();
const bool atEnd = processData(std::move(pages), sequence);
bufferManager_->acknowledge(taskId_, destination_, nextSequence);
Expand Down
Loading

0 comments on commit 00f46e8

Please sign in to comment.