Skip to content

Commit

Permalink
Return remaining bytes from exchange sources (facebookincubator#8758)
Browse files Browse the repository at this point in the history
Summary:

This is the first diff to upgrade the exchange protocol.  This change
only exposes the remaining bytes to buffer manager; it does not change the
existing protocol yet, and is compatible with the current Prestissimo code.

Also added a few statistics to spot skewed exchange.

See prestodb/presto#21926 for details about the
design.

Differential Revision: D53793123
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 15, 2024
1 parent 27bbff5 commit 200a8b2
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 45 deletions.
12 changes: 9 additions & 3 deletions velox/exec/ExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,23 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {

/// Boolean indicating that there will be no more data.
const bool atEnd;

/// Number of bytes still buffered at the source.
const std::vector<int64_t> remainingBytes;
};

/// Requests the producer to generate up to 'maxBytes' more data and reply
/// within 'maxWaitSeconds'. Returns a future that completes when producer
/// responds either with 'data' or with a message indicating that all data has
/// been already produced or data will take more time to produce.
///
/// When maxBytes is 0, normally not fetching any actual data
/// (i.e. Response::bytes should be 0). However for backward compatibility
/// (e.g. communicating with coordinator), we allow small data to be returned
/// when maxBytes is 0.
virtual folly::SemiFuture<Response> request(
uint32_t /*maxBytes*/,
uint32_t /*maxWaitSeconds*/) {
VELOX_NYI();
}
uint32_t /*maxWaitSeconds*/) = 0;

/// Close the exchange source. May be called before all data
/// has been received and processed. This can happen in case
Expand Down
83 changes: 73 additions & 10 deletions velox/exec/OutputBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ void ArbitraryBuffer::enqueue(std::unique_ptr<SerializedPage> page) {
pages_.push_back(std::shared_ptr<SerializedPage>(page.release()));
}

void ArbitraryBuffer::getAvailablePageSizes(std::vector<int64_t>& out) const {
out.reserve(out.size() + pages_.size());
for (const auto& page : pages_) {
if (page != nullptr) {
out.push_back(page->size());
}
}
}

std::vector<std::shared_ptr<SerializedPage>> ArbitraryBuffer::getPages(
uint64_t maxBytes) {
VELOX_CHECK_GT(maxBytes, 0, "maxBytes can't be zero");
Expand Down Expand Up @@ -90,14 +99,15 @@ void DestinationBuffer::Stats::recordDelete(const SerializedPage& data) {
recordAcknowledge(data);
}

std::vector<std::unique_ptr<folly::IOBuf>> DestinationBuffer::getData(
DestinationBuffer::Data DestinationBuffer::getData(
uint64_t maxBytes,
int64_t sequence,
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck,
ArbitraryBuffer* arbitraryBuffer) {
VELOX_CHECK_GE(
sequence, sequence_, "Get received for an already acknowledged item");
VELOX_CHECK_GT(maxBytes, 0);
if (arbitraryBuffer != nullptr) {
loadData(arbitraryBuffer, maxBytes);
}
Expand All @@ -121,22 +131,39 @@ std::vector<std::unique_ptr<folly::IOBuf>> DestinationBuffer::getData(
return {};
}

std::vector<std::unique_ptr<folly::IOBuf>> result;
std::vector<std::unique_ptr<folly::IOBuf>> data;
uint64_t resultBytes = 0;
for (auto i = sequence - sequence_; i < data_.size(); ++i) {
auto i = sequence - sequence_;
for (; i < data_.size(); ++i) {
// nullptr is used as end marker
if (data_[i] == nullptr) {
VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle");
result.push_back(nullptr);
data.push_back(nullptr);
++i;
break;
}
result.push_back(data_[i]->getIOBuf());
data.push_back(data_[i]->getIOBuf());
resultBytes += data_[i]->size();
if (resultBytes >= maxBytes) {
++i;
break;
}
}
return result;
bool atEnd = false;
std::vector<int64_t> remainingBytes;
remainingBytes.reserve(data_.size() - i);
for (; i < data_.size(); ++i) {
if (data_[i] == nullptr) {
VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle");
atEnd = true;
break;
}
remainingBytes.push_back(data_[i]->size());
}
if (!atEnd && arbitraryBuffer) {
arbitraryBuffer->getAvailablePageSizes(remainingBytes);
}
return {std::move(data), std::move(remainingBytes), true};
}

void DestinationBuffer::enqueue(std::shared_ptr<SerializedPage> data) {
Expand All @@ -159,7 +186,9 @@ DataAvailable DestinationBuffer::getAndClearNotify() {
DataAvailable result;
result.callback = notify_;
result.sequence = notifySequence_;
result.data = getData(notifyMaxBytes_, notifySequence_, nullptr, nullptr);
auto data = getData(notifyMaxBytes_, notifySequence_, nullptr, nullptr);
result.data = std::move(data.data);
result.remainingBytes = std::move(data.remainingBytes);
clearNotify();
return result;
}
Expand Down Expand Up @@ -666,7 +695,7 @@ void OutputBuffer::getData(
int64_t sequence,
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck) {
std::vector<std::unique_ptr<folly::IOBuf>> data;
DestinationBuffer::Data data;
std::vector<std::shared_ptr<SerializedPage>> freed;
std::vector<ContinuePromise> promises;
{
Expand All @@ -689,8 +718,8 @@ void OutputBuffer::getData(
maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get());
}
releaseAfterAcknowledge(freed, promises);
if (!data.empty()) {
notify(std::move(data), sequence);
if (data.immediate) {
notify(std::move(data.data), sequence, std::move(data.remainingBytes));
}
}

Expand Down Expand Up @@ -745,6 +774,39 @@ int64_t OutputBuffer::getAverageBufferTimeMsLocked() const {
return 0;
}

namespace {

// Find out how many buffers hold 80% of the data. Useful to identify skew.
int32_t countTopBuffers(
const std::vector<DestinationBuffer::Stats>& bufferStats,
int64_t totalBytes) {
std::vector<int64_t> bufferSizes;
bufferSizes.reserve(bufferStats.size());
for (auto i = 0; i < bufferStats.size(); ++i) {
const auto& stats = bufferStats[i];
bufferSizes.push_back(stats.bytesBuffered + stats.bytesSent);
}

// Sort descending.
std::sort(bufferSizes.begin(), bufferSizes.end(), std::greater<int64_t>());

const auto limit = totalBytes * 0.8;
int32_t numBuffers = 0;
int32_t runningTotal = 0;
for (auto size : bufferSizes) {
runningTotal += size;
numBuffers++;

if (runningTotal >= limit) {
break;
}
}

return numBuffers;
}

} // namespace

OutputBuffer::Stats OutputBuffer::stats() {
std::lock_guard<std::mutex> l(mutex_);
std::vector<DestinationBuffer::Stats> bufferStats;
Expand Down Expand Up @@ -772,6 +834,7 @@ OutputBuffer::Stats OutputBuffer::stats() {
numOutputRows_,
numOutputPages_,
getAverageBufferTimeMsLocked(),
countTopBuffers(bufferStats, numOutputBytes_),
bufferStats);
}

Expand Down
30 changes: 26 additions & 4 deletions velox/exec/OutputBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ namespace facebook::velox::exec {
/// sequence is the same as specified in BufferManager::getData call. The
/// caller is expected to advance sequence by the number of entries in groups
/// and call BufferManager::acknowledge.
using DataAvailableCallback = std::function<
void(std::vector<std::unique_ptr<folly::IOBuf>> pages, int64_t sequence)>;
using DataAvailableCallback = std::function<void(
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t sequence,
std::vector<int64_t> remainingBytes)>;

/// Callback provided to indicate if the consumer of a destination buffer is
/// currently active or not. It is used by arbitrary output buffer to optimize
Expand All @@ -41,10 +43,11 @@ struct DataAvailable {
DataAvailableCallback callback;
int64_t sequence;
std::vector<std::unique_ptr<folly::IOBuf>> data;
std::vector<int64_t> remainingBytes;

void notify() {
if (callback) {
callback(std::move(data), sequence);
callback(std::move(data), sequence, remainingBytes);
}
}
};
Expand Down Expand Up @@ -78,6 +81,9 @@ class ArbitraryBuffer {
/// there are sufficient buffered pages.
std::vector<std::shared_ptr<SerializedPage>> getPages(uint64_t maxBytes);

/// Append the available page sizes to `out'.
void getAvailablePageSizes(std::vector<int64_t>& out) const;

std::string toString() const;

private:
Expand Down Expand Up @@ -125,13 +131,24 @@ class DestinationBuffer {
/// arbitrary buffer on demand.
void loadData(ArbitraryBuffer* buffer, uint64_t maxBytes);

struct Data {
std::vector<std::unique_ptr<folly::IOBuf>> data;
std::vector<int64_t> remainingBytes;
bool immediate;
};

/// Returns a shallow copy (folly::IOBuf::clone) of the data starting at
/// 'sequence', stopping after exceeding 'maxBytes'. If there is no data,
/// 'notify' is installed so that this gets called when data is added. If not
/// null, 'activeCheck' is used to check if the consumer of a destination
/// buffer with 'notify' installed is currently active or not. This only
/// applies for arbitrary output buffer for now.
std::vector<std::unique_ptr<folly::IOBuf>> getData(
///
/// When arbitraryBuffer is provided, and the this buffer is not at end (no
/// null marker received), we append the remaining bytes from arbitraryBuffer
/// in the result, even the arbitraryBuffer could be shared among multiple
/// DestinationBuffers.
Data getData(
uint64_t maxBytes,
int64_t sequence,
DataAvailableCallback notify,
Expand Down Expand Up @@ -191,6 +208,7 @@ class OutputBuffer {
int64_t _totalRowsSent,
int64_t _totalPagesSent,
int64_t _averageBufferTimeMs,
int32_t _numTopBuffers,
const std::vector<DestinationBuffer::Stats>& _buffersStats)
: kind(_kind),
noMoreBuffers(_noMoreBuffers),
Expand All @@ -202,6 +220,7 @@ class OutputBuffer {
totalRowsSent(_totalRowsSent),
totalPagesSent(_totalPagesSent),
averageBufferTimeMs(_averageBufferTimeMs),
numTopBuffers(_numTopBuffers),
buffersStats(_buffersStats) {}

core::PartitionedOutputNode::Kind kind;
Expand All @@ -223,6 +242,9 @@ class OutputBuffer {
/// Average time each piece of data has been buffered for in milliseconds.
int64_t averageBufferTimeMs{0};

/// The number of largest buffers that handle 80% of the total data.
int32_t numTopBuffers{0};

/// Stats of the OutputBuffer's destinations.
std::vector<DestinationBuffer::Stats> buffersStats;

Expand Down
25 changes: 25 additions & 0 deletions velox/exec/OutputBufferManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,31 @@ class OutputBufferManager {
DataAvailableCallback notify,
DataConsumerActiveCheckCallback activeCheck = nullptr);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
bool getData(
const std::string& taskId,
int destination,
uint64_t maxBytes,
int64_t sequence,
std::function<void(
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t sequence)> notify,
DataConsumerActiveCheckCallback activeCheck = nullptr) {
return getData(
taskId,
destination,
maxBytes,
sequence,
[notify = std::move(notify)](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t sequence,
std::vector<int64_t> /*remainingBytes*/) mutable {
notify(std::move(pages), sequence);
},
std::move(activeCheck));
}
#endif

void removeTask(const std::string& taskId);

static std::weak_ptr<OutputBufferManager> getInstance();
Expand Down
11 changes: 10 additions & 1 deletion velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1897,6 +1897,13 @@ ContinueFuture Task::terminate(TaskState terminalState) {
void Task::maybeRemoveFromOutputBufferManager() {
if (hasPartitionedOutput()) {
if (auto bufferManager = bufferManager_.lock()) {
// Capture output buffer stats before deleting the buffer.
{
std::lock_guard<std::timed_mutex> l(mutex_);
if (!taskStats_.outputBufferStats.has_value()) {
taskStats_.outputBufferStats = bufferManager->stats(taskId_);
}
}
bufferManager->removeTask(taskId_);
}
}
Expand Down Expand Up @@ -1982,7 +1989,9 @@ TaskStats Task::taskStats() const {
auto bufferManager = bufferManager_.lock();
taskStats.outputBufferUtilization = bufferManager->getUtilization(taskId_);
taskStats.outputBufferOverutilized = bufferManager->isOverutilized(taskId_);
taskStats.outputBufferStats = bufferManager->stats(taskId_);
if (!taskStats.outputBufferStats.has_value()) {
taskStats.outputBufferStats = bufferManager->stats(taskId_);
}
return taskStats;
}

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/LimitTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ TEST_F(LimitTest, partialLimitEagerFlush) {
[numPagesPromise =
std::make_shared<folly::Promise<int>>(std::move(numPagesPromise))](
std::vector<std::unique_ptr<folly::IOBuf>> pages,
int64_t /*sequence*/) {
int64_t /*sequence*/,
std::vector<int64_t> /*remainingBytes*/) {
numPagesPromise->setValue(pages.size());
}));
ASSERT_GE(std::move(numPagesFuture).get(std::chrono::seconds(1)), 10);
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/tests/MultiFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,8 @@ TEST_F(MultiFragmentTest, taskTerminateWithPendingOutputBuffers) {
maxBytes,
sequence,
[&](std::vector<std::unique_ptr<folly::IOBuf>> iobufs,
int64_t inSequence) {
int64_t inSequence,
std::vector<int64_t> /*remainingBytes*/) {
for (auto& iobuf : iobufs) {
if (iobuf != nullptr) {
++inSequence;
Expand Down Expand Up @@ -1726,7 +1727,7 @@ class DataFetcher {
destination_,
maxBytes_,
sequence,
[&](auto pages, auto sequence) mutable {
[&](auto pages, auto sequence, auto /*remainingBytes*/) mutable {
const auto nextSequence = sequence + pages.size();
const bool atEnd = processData(std::move(pages), sequence);
bufferManager_->acknowledge(taskId_, destination_, nextSequence);
Expand Down
Loading

0 comments on commit 200a8b2

Please sign in to comment.