From e554327a1209fd62b3f3ca3302339bc93474ecc0 Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Thu, 29 Feb 2024 15:23:40 -0800 Subject: [PATCH] New exchange protocol Summary: https://github.com/facebookincubator/velox/pull/8845 Upgrade the exchange protocol. We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget. This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster). See https://github.com/prestodb/presto/issues/21926 Reviewed By: amitkdutta Differential Revision: D54027466 --- velox/exec/ExchangeClient.cpp | 153 ++++++++---------- velox/exec/ExchangeClient.h | 30 ++-- velox/exec/ExchangeQueue.h | 4 +- velox/exec/ExchangeSource.h | 4 +- velox/exec/OutputBuffer.cpp | 76 +++++---- velox/exec/OutputBufferManager.h | 25 --- velox/exec/tests/OutputBufferManagerTest.cpp | 8 +- .../exec/tests/utils/LocalExchangeSource.cpp | 5 + 8 files changed, 143 insertions(+), 162 deletions(-) diff --git a/velox/exec/ExchangeClient.cpp b/velox/exec/ExchangeClient.cpp index 8ecf0d62ea2ac..97193cb8b04cb 100644 --- a/velox/exec/ExchangeClient.cpp +++ b/velox/exec/ExchangeClient.cpp @@ -18,7 +18,7 @@ namespace facebook::velox::exec { void ExchangeClient::addRemoteTaskId(const std::string& taskId) { - RequestSpec requestSpec; + std::vector requestSpecs; std::shared_ptr toClose; { std::lock_guard l(queue_->mutex()); @@ -48,11 +48,8 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) { } else { sources_.push_back(source); queue_->addSourceLocked(); - // Put new source into 'producingSources_' queue to prioritise fetching - // from these to find out whether these are productive or not. - producingSources_.push(source); - - requestSpec = pickSourcesToRequestLocked(); + emptySources_.push(source); + requestSpecs = pickSourcesToRequestLocked(); } } @@ -60,7 +57,7 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) { if (toClose) { toClose->close(); } else { - request(requestSpec); + request(requestSpecs); } } @@ -116,7 +113,7 @@ folly::F14FastMap ExchangeClient::stats() const { std::vector> ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) { - RequestSpec requestSpec; + std::vector requestSpecs; std::vector> pages; { std::lock_guard l(queue_->mutex()); @@ -130,38 +127,49 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) { return pages; } - requestSpec = pickSourcesToRequestLocked(); + requestSpecs = pickSourcesToRequestLocked(); } // Outside of lock - request(requestSpec); + request(requestSpecs); return pages; } -void ExchangeClient::request(const RequestSpec& requestSpec) { +void ExchangeClient::request(const std::vector& requestSpecs) { auto self = shared_from_this(); - for (auto& source : requestSpec.sources) { - auto future = source->request(requestSpec.maxBytes, kDefaultMaxWaitSeconds); + for (auto& spec : requestSpecs) { + auto future = folly::SemiFuture::makeEmpty(); + if (spec.maxBytes == 0) { + future = spec.source->requestDataSizes(kDefaultMaxWaitSeconds); + } else { + future = spec.source->request(spec.maxBytes, 1); + } VELOX_CHECK(future.valid()); std::move(future) .via(executor_) - .thenValue([self, requestSource = source](auto&& response) { - RequestSpec requestSpec; + .thenValue([self, spec = std::move(spec)](auto&& response) { + std::vector requestSpecs; { std::lock_guard l(self->queue_->mutex()); if (self->closed_) { return; } if (!response.atEnd) { - if (response.bytes > 0) { - self->producingSources_.push(requestSource); + if (!response.remainingBytes.empty()) { + for (auto bytes : response.remainingBytes) { + VELOX_CHECK_GT(bytes, 0); + } + self->producingSources_.push( + {std::move(spec.source), + std::move(response.remainingBytes)}); } else { - self->emptySources_.push(requestSource); + self->emptySources_.push(std::move(spec.source)); } } - requestSpec = self->pickSourcesToRequestLocked(); + self->totalPendingBytes_ -= spec.maxBytes; + requestSpecs = self->pickSourcesToRequestLocked(); } - self->request(requestSpec); + self->request(requestSpecs); }) .thenError( folly::tag_t{}, [self](const std::exception& e) { @@ -170,74 +178,53 @@ void ExchangeClient::request(const RequestSpec& requestSpec) { } } -int32_t ExchangeClient::countPendingSourcesLocked() { - int32_t numPending = 0; - for (auto& source : sources_) { - if (source->isRequestPendingLocked()) { - ++numPending; - } - } - return numPending; -} - -int64_t ExchangeClient::getAveragePageSize() { - auto averagePageSize = - std::min(maxQueuedBytes_, queue_->averageReceivedPageBytes()); - if (averagePageSize == 0) { - averagePageSize = 1 << 20; // 1 MB. +std::vector +ExchangeClient::pickSourcesToRequestLocked() { + if (closed_) { + return {}; } - - return averagePageSize; -} - -int32_t ExchangeClient::getNumSourcesToRequestLocked(int64_t averagePageSize) { - // Figure out how many more 'averagePageSize' fit into 'maxQueuedBytes_'. - // Make sure to leave room for 'numPending' pages. - const auto numPending = countPendingSourcesLocked(); - - auto numToRequest = std::max( - 1, (maxQueuedBytes_ - queue_->totalBytes()) / averagePageSize); - if (numToRequest <= numPending) { - return 0; + std::vector requestSpecs; + while (!emptySources_.empty()) { + auto& source = emptySources_.front(); + VELOX_CHECK(source->shouldRequestLocked()); + requestSpecs.push_back({std::move(source), 0}); + emptySources_.pop(); } - - return numToRequest - numPending; -} - -void ExchangeClient::pickSourcesToRequestLocked( - RequestSpec& requestSpec, - int32_t numToRequest, - std::queue>& sources) { - while (requestSpec.sources.size() < numToRequest && !sources.empty()) { - auto& source = sources.front(); - if (source->shouldRequestLocked()) { - requestSpec.sources.push_back(source); + int64_t availableSpace = + maxQueuedBytes_ - queue_->totalBytes() - totalPendingBytes_; + while (availableSpace > 0 && !producingSources_.empty()) { + auto& source = producingSources_.front().source; + int64_t requestBytes = 0; + for (auto bytes : producingSources_.front().remainingBytes) { + availableSpace -= bytes; + if (availableSpace < 0) { + break; + } + requestBytes += bytes; } - sources.pop(); - } -} - -ExchangeClient::RequestSpec ExchangeClient::pickSourcesToRequestLocked() { - if (closed_ || queue_->totalBytes() >= maxQueuedBytes_) { - return {}; + if (requestBytes == 0) { + VELOX_CHECK_LT(availableSpace, 0); + break; + } + VELOX_CHECK(source->shouldRequestLocked()); + requestSpecs.push_back({std::move(source), requestBytes}); + producingSources_.pop(); + totalPendingBytes_ += requestBytes; } - - const auto averagePageSize = getAveragePageSize(); - const auto numToRequest = getNumSourcesToRequestLocked(averagePageSize); - - if (numToRequest == 0) { - return {}; + if (queue_->totalBytes() == 0 && totalPendingBytes_ == 0 && + !producingSources_.empty()) { + // We have full capacity but still cannot initiate one single data transfer. + // Let the transfer happen in this case to avoid stuck. + auto& source = producingSources_.front().source; + auto requestBytes = producingSources_.front().remainingBytes.at(0); + LOG(INFO) << "Requesting large single page " << requestBytes + << " bytes, exceeding capacity " << maxQueuedBytes_; + VELOX_CHECK(source->shouldRequestLocked()); + requestSpecs.push_back({std::move(source), requestBytes}); + producingSources_.pop(); + totalPendingBytes_ += requestBytes; } - - RequestSpec requestSpec; - requestSpec.maxBytes = averagePageSize; - - // Pick up to 'numToRequest' next sources to request data from. Prioritize - // sources that return data. - pickSourcesToRequestLocked(requestSpec, numToRequest, producingSources_); - pickSourcesToRequestLocked(requestSpec, numToRequest, emptySources_); - - return requestSpec; + return requestSpecs; } ExchangeClient::~ExchangeClient() { diff --git a/velox/exec/ExchangeClient.h b/velox/exec/ExchangeClient.h index 05eb8dd60ed2e..e740a6ca23fe7 100644 --- a/velox/exec/ExchangeClient.h +++ b/velox/exec/ExchangeClient.h @@ -97,27 +97,22 @@ class ExchangeClient : public std::enable_shared_from_this { folly::dynamic toJson() const; private: - // A list of sources to request data from and how much to request from each - // (in bytes). struct RequestSpec { - std::vector> sources; + std::shared_ptr source; + + // How much bytes to request from this source. 0 bytes means request data + // sizes only. int64_t maxBytes; }; - int64_t getAveragePageSize(); - - int32_t getNumSourcesToRequestLocked(int64_t averagePageSize); - - RequestSpec pickSourcesToRequestLocked(); - - void pickSourcesToRequestLocked( - RequestSpec& requestSpec, - int32_t numToRequest, - std::queue>& sources); + struct ProducingSource { + std::shared_ptr source; + std::vector remainingBytes; + }; - int32_t countPendingSourcesLocked(); + std::vector pickSourcesToRequestLocked(); - void request(const RequestSpec& requestSpec); + void request(const std::vector& requestSpecs); // Handy for ad-hoc logging. const std::string taskId_; @@ -131,9 +126,12 @@ class ExchangeClient : public std::enable_shared_from_this { std::vector> sources_; bool closed_{false}; + // Total number of bytes in flight. + int64_t totalPendingBytes_{0}; + // A queue of sources that have returned non-empty response from the latest // request. - std::queue> producingSources_; + std::queue producingSources_; // A queue of sources that returned empty response from the latest request. std::queue> emptySources_; }; diff --git a/velox/exec/ExchangeQueue.h b/velox/exec/ExchangeQueue.h index d19babf26ce7f..c757f7f8af7e3 100644 --- a/velox/exec/ExchangeQueue.h +++ b/velox/exec/ExchangeQueue.h @@ -123,7 +123,7 @@ class ExchangeQueue { dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future); /// Returns the total bytes held by SerializedPages in 'this'. - uint64_t totalBytes() const { + int64_t totalBytes() const { return totalBytes_; } @@ -197,7 +197,7 @@ class ExchangeQueue { // throw an exception with this message. std::string error_; // Total size of SerializedPages in queue. - uint64_t totalBytes_{0}; + int64_t totalBytes_{0}; // Number of SerializedPages received. int64_t receivedPages_{0}; // Total size of SerializedPages received. Used to calculate an average diff --git a/velox/exec/ExchangeSource.h b/velox/exec/ExchangeSource.h index 9afa5de24d7f9..cccbd829691e5 100644 --- a/velox/exec/ExchangeSource.h +++ b/velox/exec/ExchangeSource.h @@ -86,9 +86,7 @@ class ExchangeSource : public std::enable_shared_from_this { /// backward compatibility (e.g. communicating with coordinator), we allow /// small data (1MB) to be returned. virtual folly::SemiFuture requestDataSizes( - uint32_t /*maxWaitSeconds*/) { - VELOX_NYI(); - } + uint32_t maxWaitSeconds) = 0; /// Close the exchange source. May be called before all data /// has been received and processed. This can happen in case diff --git a/velox/exec/OutputBuffer.cpp b/velox/exec/OutputBuffer.cpp index 57326a96dd337..6ecebe55dc6f7 100644 --- a/velox/exec/OutputBuffer.cpp +++ b/velox/exec/OutputBuffer.cpp @@ -46,8 +46,15 @@ void ArbitraryBuffer::getAvailablePageSizes(std::vector& out) const { std::vector> ArbitraryBuffer::getPages( uint64_t maxBytes) { - VELOX_CHECK_GT(maxBytes, 0, "maxBytes can't be zero"); - + if (maxBytes == 0 && !pages_.empty() && pages_.front() == nullptr) { + // Always give out an end marker when this buffer is finished and fully + // consumed. When multiple `DestinationBuffer' polling the same + // `ArbitraryBuffer', we can simplify the code in + // `DestinationBuffer::getData' since we will always get a null marker and + // not going through the callback path, eliminate the chance of stuck. + VELOX_CHECK_EQ(pages_.size(), 1); + return {nullptr}; + } std::vector> pages; uint64_t bytesRemoved{0}; while (bytesRemoved < maxBytes && !pages_.empty()) { @@ -107,26 +114,32 @@ DestinationBuffer::Data DestinationBuffer::getData( ArbitraryBuffer* arbitraryBuffer) { VELOX_CHECK_GE( sequence, sequence_, "Get received for an already acknowledged item"); - VELOX_CHECK_GT(maxBytes, 0); if (arbitraryBuffer != nullptr) { loadData(arbitraryBuffer, maxBytes); } - if (sequence - sequence_ > data_.size()) { - VLOG(1) << this << " Out of order get: " << sequence << " over " - << sequence_ << " Setting second notify " << notifySequence_ - << " / " << sequence; - notify_ = std::move(notify); - aliveCheck_ = std::move(activeCheck); - notifySequence_ = std::min(notifySequence_, sequence); - notifyMaxBytes_ = maxBytes; - return {}; - } - - if (sequence - sequence_ == data_.size()) { + if (sequence - sequence_ >= data_.size()) { + if (sequence - sequence_ > data_.size()) { + VLOG(1) << this << " Out of order get: " << sequence << " over " + << sequence_ << " Setting second notify " << notifySequence_ + << " / " << sequence; + } + if (maxBytes == 0) { + std::vector remainingBytes; + if (arbitraryBuffer) { + arbitraryBuffer->getAvailablePageSizes(remainingBytes); + } + if (!remainingBytes.empty()) { + return {{}, std::move(remainingBytes), true}; + } + } notify_ = std::move(notify); aliveCheck_ = std::move(activeCheck); - notifySequence_ = sequence; + if (sequence - sequence_ > data_.size()) { + notifySequence_ = std::min(notifySequence_, sequence); + } else { + notifySequence_ = sequence; + } notifyMaxBytes_ = maxBytes; return {}; } @@ -134,19 +147,20 @@ DestinationBuffer::Data DestinationBuffer::getData( std::vector> data; uint64_t resultBytes = 0; auto i = sequence - sequence_; - for (; i < data_.size(); ++i) { - // nullptr is used as end marker - if (data_[i] == nullptr) { - VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); - data.push_back(nullptr); - ++i; - break; - } - data.push_back(data_[i]->getIOBuf()); - resultBytes += data_[i]->size(); - if (resultBytes >= maxBytes) { - ++i; - break; + if (maxBytes > 0) { + for (; i < data_.size(); ++i) { + // nullptr is used as end marker + if (data_[i] == nullptr) { + VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); + data.push_back(nullptr); + break; + } + data.push_back(data_[i]->getIOBuf()); + resultBytes += data_[i]->size(); + if (resultBytes >= maxBytes) { + ++i; + break; + } } } bool atEnd = false; @@ -163,6 +177,9 @@ DestinationBuffer::Data DestinationBuffer::getData( if (!atEnd && arbitraryBuffer) { arbitraryBuffer->getAvailablePageSizes(remainingBytes); } + if (data.empty() && remainingBytes.empty() && atEnd) { + data.push_back(nullptr); + } return {std::move(data), std::move(remainingBytes), true}; } @@ -216,7 +233,6 @@ void DestinationBuffer::maybeLoadData(ArbitraryBuffer* buffer) { clearNotify(); return; } - VELOX_CHECK_GT(notifyMaxBytes_, 0); loadData(buffer, notifyMaxBytes_); } diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 7870a1be114d3..410be0bed83ec 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -82,31 +82,6 @@ class OutputBufferManager { DataAvailableCallback notify, DataConsumerActiveCheckCallback activeCheck = nullptr); -#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY - bool getData( - const std::string& taskId, - int destination, - uint64_t maxBytes, - int64_t sequence, - std::function> pages, - int64_t sequence)> notify, - DataConsumerActiveCheckCallback activeCheck = nullptr) { - return getData( - taskId, - destination, - maxBytes, - sequence, - [notify = std::move(notify)]( - std::vector> pages, - int64_t sequence, - std::vector /*remainingBytes*/) mutable { - notify(std::move(pages), sequence); - }, - std::move(activeCheck)); - } -#endif - void removeTask(const std::string& taskId); static std::weak_ptr getInstance(); diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index 068ac5932b6ee..fa2c259e00d61 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -454,6 +454,7 @@ TEST_F(OutputBufferManagerTest, arbitrayBuffer) { buffer.enqueue(std::move(page3)); ASSERT_EQ( buffer.toString(), "[ARBITRARY_BUFFER PAGES[2] NO MORE DATA[false]]"); + ASSERT_TRUE(buffer.getPages(0).empty()); buffer.noMoreData(); ASSERT_FALSE(buffer.empty()); ASSERT_TRUE(buffer.hasNoMoreData()); @@ -475,7 +476,9 @@ TEST_F(OutputBufferManagerTest, arbitrayBuffer) { ASSERT_EQ( buffer.toString(), "[ARBITRARY_BUFFER PAGES[0] NO MORE DATA[true]]"); buffer.noMoreData(); - VELOX_ASSERT_THROW(buffer.getPages(0), "maxBytes can't be zero"); + pages = buffer.getPages(0); + ASSERT_EQ(pages.size(), 1); + ASSERT_FALSE(pages[0]); // Verify the end marker is persistent. for (int i = 0; i < 3; ++i) { pages = buffer.getPages(100); @@ -508,8 +511,7 @@ TEST_F(OutputBufferManagerTest, destinationBuffer) { { ArbitraryBuffer buffer; DestinationBuffer destinationBuffer; - VELOX_ASSERT_THROW( - destinationBuffer.loadData(&buffer, 0), "maxBytes can't be zero"); + destinationBuffer.loadData(&buffer, 0); destinationBuffer.loadData(&buffer, 100); std::atomic notified{false}; auto buffers = destinationBuffer.getData( diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index a451ee1518c78..8cd94b5287cd3 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -160,6 +160,11 @@ class LocalExchangeSource : public exec::ExchangeSource { return future; } + folly::SemiFuture requestDataSizes( + uint32_t maxWaitSeconds) override { + return request(0, maxWaitSeconds); + } + void close() override { checkSetRequestPromise();