diff --git a/velox/exec/ExchangeSource.h b/velox/exec/ExchangeSource.h index 8cee990d8c027..03901ab28dead 100644 --- a/velox/exec/ExchangeSource.h +++ b/velox/exec/ExchangeSource.h @@ -66,17 +66,23 @@ class ExchangeSource : public std::enable_shared_from_this { /// Boolean indicating that there will be no more data. const bool atEnd; + + /// Number of bytes still buffered at the source. + const std::vector remainingBytes; }; /// Requests the producer to generate up to 'maxBytes' more data and reply /// within 'maxWaitSeconds'. Returns a future that completes when producer /// responds either with 'data' or with a message indicating that all data has /// been already produced or data will take more time to produce. + /// + /// When maxBytes is 0, normally not fetching any actual data + /// (i.e. Response::bytes should be 0). However for backward compatibility + /// (e.g. communicating with coordinator), we allow small data to be returned + /// when maxBytes is 0. virtual folly::SemiFuture request( uint32_t /*maxBytes*/, - uint32_t /*maxWaitSeconds*/) { - VELOX_NYI(); - } + uint32_t /*maxWaitSeconds*/) = 0; /// Close the exchange source. May be called before all data /// has been received and processed. This can happen in case diff --git a/velox/exec/OutputBuffer.cpp b/velox/exec/OutputBuffer.cpp index 1a9e01dc3bbcd..57326a96dd337 100644 --- a/velox/exec/OutputBuffer.cpp +++ b/velox/exec/OutputBuffer.cpp @@ -35,6 +35,15 @@ void ArbitraryBuffer::enqueue(std::unique_ptr page) { pages_.push_back(std::shared_ptr(page.release())); } +void ArbitraryBuffer::getAvailablePageSizes(std::vector& out) const { + out.reserve(out.size() + pages_.size()); + for (const auto& page : pages_) { + if (page != nullptr) { + out.push_back(page->size()); + } + } +} + std::vector> ArbitraryBuffer::getPages( uint64_t maxBytes) { VELOX_CHECK_GT(maxBytes, 0, "maxBytes can't be zero"); @@ -90,7 +99,7 @@ void DestinationBuffer::Stats::recordDelete(const SerializedPage& data) { recordAcknowledge(data); } -std::vector> DestinationBuffer::getData( +DestinationBuffer::Data DestinationBuffer::getData( uint64_t maxBytes, int64_t sequence, DataAvailableCallback notify, @@ -98,6 +107,7 @@ std::vector> DestinationBuffer::getData( ArbitraryBuffer* arbitraryBuffer) { VELOX_CHECK_GE( sequence, sequence_, "Get received for an already acknowledged item"); + VELOX_CHECK_GT(maxBytes, 0); if (arbitraryBuffer != nullptr) { loadData(arbitraryBuffer, maxBytes); } @@ -121,22 +131,39 @@ std::vector> DestinationBuffer::getData( return {}; } - std::vector> result; + std::vector> data; uint64_t resultBytes = 0; - for (auto i = sequence - sequence_; i < data_.size(); ++i) { + auto i = sequence - sequence_; + for (; i < data_.size(); ++i) { // nullptr is used as end marker if (data_[i] == nullptr) { VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); - result.push_back(nullptr); + data.push_back(nullptr); + ++i; break; } - result.push_back(data_[i]->getIOBuf()); + data.push_back(data_[i]->getIOBuf()); resultBytes += data_[i]->size(); if (resultBytes >= maxBytes) { + ++i; break; } } - return result; + bool atEnd = false; + std::vector remainingBytes; + remainingBytes.reserve(data_.size() - i); + for (; i < data_.size(); ++i) { + if (data_[i] == nullptr) { + VELOX_CHECK_EQ(i, data_.size() - 1, "null marker found in the middle"); + atEnd = true; + break; + } + remainingBytes.push_back(data_[i]->size()); + } + if (!atEnd && arbitraryBuffer) { + arbitraryBuffer->getAvailablePageSizes(remainingBytes); + } + return {std::move(data), std::move(remainingBytes), true}; } void DestinationBuffer::enqueue(std::shared_ptr data) { @@ -159,7 +186,9 @@ DataAvailable DestinationBuffer::getAndClearNotify() { DataAvailable result; result.callback = notify_; result.sequence = notifySequence_; - result.data = getData(notifyMaxBytes_, notifySequence_, nullptr, nullptr); + auto data = getData(notifyMaxBytes_, notifySequence_, nullptr, nullptr); + result.data = std::move(data.data); + result.remainingBytes = std::move(data.remainingBytes); clearNotify(); return result; } @@ -666,7 +695,7 @@ void OutputBuffer::getData( int64_t sequence, DataAvailableCallback notify, DataConsumerActiveCheckCallback activeCheck) { - std::vector> data; + DestinationBuffer::Data data; std::vector> freed; std::vector promises; { @@ -689,8 +718,8 @@ void OutputBuffer::getData( maxBytes, sequence, notify, activeCheck, arbitraryBuffer_.get()); } releaseAfterAcknowledge(freed, promises); - if (!data.empty()) { - notify(std::move(data), sequence); + if (data.immediate) { + notify(std::move(data.data), sequence, std::move(data.remainingBytes)); } } @@ -745,6 +774,39 @@ int64_t OutputBuffer::getAverageBufferTimeMsLocked() const { return 0; } +namespace { + +// Find out how many buffers hold 80% of the data. Useful to identify skew. +int32_t countTopBuffers( + const std::vector& bufferStats, + int64_t totalBytes) { + std::vector bufferSizes; + bufferSizes.reserve(bufferStats.size()); + for (auto i = 0; i < bufferStats.size(); ++i) { + const auto& stats = bufferStats[i]; + bufferSizes.push_back(stats.bytesBuffered + stats.bytesSent); + } + + // Sort descending. + std::sort(bufferSizes.begin(), bufferSizes.end(), std::greater()); + + const auto limit = totalBytes * 0.8; + int32_t numBuffers = 0; + int32_t runningTotal = 0; + for (auto size : bufferSizes) { + runningTotal += size; + numBuffers++; + + if (runningTotal >= limit) { + break; + } + } + + return numBuffers; +} + +} // namespace + OutputBuffer::Stats OutputBuffer::stats() { std::lock_guard l(mutex_); std::vector bufferStats; @@ -772,6 +834,7 @@ OutputBuffer::Stats OutputBuffer::stats() { numOutputRows_, numOutputPages_, getAverageBufferTimeMsLocked(), + countTopBuffers(bufferStats, numOutputBytes_), bufferStats); } diff --git a/velox/exec/OutputBuffer.h b/velox/exec/OutputBuffer.h index 1ff07bcce97be..925d66f392d6b 100644 --- a/velox/exec/OutputBuffer.h +++ b/velox/exec/OutputBuffer.h @@ -24,8 +24,10 @@ namespace facebook::velox::exec { /// sequence is the same as specified in BufferManager::getData call. The /// caller is expected to advance sequence by the number of entries in groups /// and call BufferManager::acknowledge. -using DataAvailableCallback = std::function< - void(std::vector> pages, int64_t sequence)>; +using DataAvailableCallback = std::function> pages, + int64_t sequence, + std::vector remainingBytes)>; /// Callback provided to indicate if the consumer of a destination buffer is /// currently active or not. It is used by arbitrary output buffer to optimize @@ -41,10 +43,11 @@ struct DataAvailable { DataAvailableCallback callback; int64_t sequence; std::vector> data; + std::vector remainingBytes; void notify() { if (callback) { - callback(std::move(data), sequence); + callback(std::move(data), sequence, remainingBytes); } } }; @@ -78,6 +81,9 @@ class ArbitraryBuffer { /// there are sufficient buffered pages. std::vector> getPages(uint64_t maxBytes); + /// Append the available page sizes to `out'. + void getAvailablePageSizes(std::vector& out) const; + std::string toString() const; private: @@ -125,13 +131,24 @@ class DestinationBuffer { /// arbitrary buffer on demand. void loadData(ArbitraryBuffer* buffer, uint64_t maxBytes); + struct Data { + std::vector> data; + std::vector remainingBytes; + bool immediate; + }; + /// Returns a shallow copy (folly::IOBuf::clone) of the data starting at /// 'sequence', stopping after exceeding 'maxBytes'. If there is no data, /// 'notify' is installed so that this gets called when data is added. If not /// null, 'activeCheck' is used to check if the consumer of a destination /// buffer with 'notify' installed is currently active or not. This only /// applies for arbitrary output buffer for now. - std::vector> getData( + /// + /// When arbitraryBuffer is provided, and the this buffer is not at end (no + /// null marker received), we append the remaining bytes from arbitraryBuffer + /// in the result, even the arbitraryBuffer could be shared among multiple + /// DestinationBuffers. + Data getData( uint64_t maxBytes, int64_t sequence, DataAvailableCallback notify, @@ -191,6 +208,7 @@ class OutputBuffer { int64_t _totalRowsSent, int64_t _totalPagesSent, int64_t _averageBufferTimeMs, + int32_t _numTopBuffers, const std::vector& _buffersStats) : kind(_kind), noMoreBuffers(_noMoreBuffers), @@ -202,6 +220,7 @@ class OutputBuffer { totalRowsSent(_totalRowsSent), totalPagesSent(_totalPagesSent), averageBufferTimeMs(_averageBufferTimeMs), + numTopBuffers(_numTopBuffers), buffersStats(_buffersStats) {} core::PartitionedOutputNode::Kind kind; @@ -223,6 +242,9 @@ class OutputBuffer { /// Average time each piece of data has been buffered for in milliseconds. int64_t averageBufferTimeMs{0}; + /// The number of largest buffers that handle 80% of the total data. + int32_t numTopBuffers{0}; + /// Stats of the OutputBuffer's destinations. std::vector buffersStats; diff --git a/velox/exec/OutputBufferManager.h b/velox/exec/OutputBufferManager.h index 410be0bed83ec..7870a1be114d3 100644 --- a/velox/exec/OutputBufferManager.h +++ b/velox/exec/OutputBufferManager.h @@ -82,6 +82,31 @@ class OutputBufferManager { DataAvailableCallback notify, DataConsumerActiveCheckCallback activeCheck = nullptr); +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + bool getData( + const std::string& taskId, + int destination, + uint64_t maxBytes, + int64_t sequence, + std::function> pages, + int64_t sequence)> notify, + DataConsumerActiveCheckCallback activeCheck = nullptr) { + return getData( + taskId, + destination, + maxBytes, + sequence, + [notify = std::move(notify)]( + std::vector> pages, + int64_t sequence, + std::vector /*remainingBytes*/) mutable { + notify(std::move(pages), sequence); + }, + std::move(activeCheck)); + } +#endif + void removeTask(const std::string& taskId); static std::weak_ptr getInstance(); diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 364ba829fb4eb..c7d503060933a 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -1897,6 +1897,13 @@ ContinueFuture Task::terminate(TaskState terminalState) { void Task::maybeRemoveFromOutputBufferManager() { if (hasPartitionedOutput()) { if (auto bufferManager = bufferManager_.lock()) { + // Capture output buffer stats before deleting the buffer. + { + std::lock_guard l(mutex_); + if (!taskStats_.outputBufferStats.has_value()) { + taskStats_.outputBufferStats = bufferManager->stats(taskId_); + } + } bufferManager->removeTask(taskId_); } } @@ -1982,7 +1989,9 @@ TaskStats Task::taskStats() const { auto bufferManager = bufferManager_.lock(); taskStats.outputBufferUtilization = bufferManager->getUtilization(taskId_); taskStats.outputBufferOverutilized = bufferManager->isOverutilized(taskId_); - taskStats.outputBufferStats = bufferManager->stats(taskId_); + if (!taskStats.outputBufferStats.has_value()) { + taskStats.outputBufferStats = bufferManager->stats(taskId_); + } return taskStats; } diff --git a/velox/exec/tests/LimitTest.cpp b/velox/exec/tests/LimitTest.cpp index fa37779e61de8..70249fb944d9d 100644 --- a/velox/exec/tests/LimitTest.cpp +++ b/velox/exec/tests/LimitTest.cpp @@ -131,7 +131,8 @@ TEST_F(LimitTest, partialLimitEagerFlush) { [numPagesPromise = std::make_shared>(std::move(numPagesPromise))]( std::vector> pages, - int64_t /*sequence*/) { + int64_t /*sequence*/, + std::vector /*remainingBytes*/) { numPagesPromise->setValue(pages.size()); })); ASSERT_GE(std::move(numPagesFuture).get(std::chrono::seconds(1)), 10); diff --git a/velox/exec/tests/MultiFragmentTest.cpp b/velox/exec/tests/MultiFragmentTest.cpp index 32957ad8154ed..126c77deccd83 100644 --- a/velox/exec/tests/MultiFragmentTest.cpp +++ b/velox/exec/tests/MultiFragmentTest.cpp @@ -1537,7 +1537,8 @@ TEST_F(MultiFragmentTest, taskTerminateWithPendingOutputBuffers) { maxBytes, sequence, [&](std::vector> iobufs, - int64_t inSequence) { + int64_t inSequence, + std::vector /*remainingBytes*/) { for (auto& iobuf : iobufs) { if (iobuf != nullptr) { ++inSequence; @@ -1726,7 +1727,7 @@ class DataFetcher { destination_, maxBytes_, sequence, - [&](auto pages, auto sequence) mutable { + [&](auto pages, auto sequence, auto /*remainingBytes*/) mutable { const auto nextSequence = sequence + pages.size(); const bool atEnd = processData(std::move(pages), sequence); bufferManager_->acknowledge(taskId_, destination_, nextSequence); diff --git a/velox/exec/tests/OutputBufferManagerTest.cpp b/velox/exec/tests/OutputBufferManagerTest.cpp index f1e014abe50fa..068ac5932b6ee 100644 --- a/velox/exec/tests/OutputBufferManagerTest.cpp +++ b/velox/exec/tests/OutputBufferManagerTest.cpp @@ -161,7 +161,8 @@ class OutputBufferManagerTest : public testing::Test { expectedEndMarker, &receivedData]( std::vector> pages, - int64_t inSequence) { + int64_t inSequence, + std::vector /*remainingBytes*/) { ASSERT_FALSE(receivedData) << "for destination " << destination; ASSERT_EQ(pages.size(), expectedGroups) << "for destination " << destination; @@ -213,11 +214,13 @@ class OutputBufferManagerTest : public testing::Test { receiveEndMarker(int destination, int64_t sequence, bool& receivedEndMarker) { return [destination, sequence, &receivedEndMarker]( std::vector> pages, - int64_t inSequence) { + int64_t inSequence, + std::vector remainingBytes) { EXPECT_FALSE(receivedEndMarker) << "for destination " << destination; EXPECT_EQ(pages.size(), 1) << "for destination " << destination; EXPECT_TRUE(pages[0] == nullptr) << "for destination " << destination; EXPECT_EQ(inSequence, sequence) << "for destination " << destination; + EXPECT_TRUE(remainingBytes.empty()); receivedEndMarker = true; }; } @@ -262,7 +265,8 @@ class OutputBufferManagerTest : public testing::Test { receivedData = false; return [destination, sequence, expectedGroups, &receivedData]( std::vector> pages, - int64_t inSequence) { + int64_t inSequence, + std::vector /*remainingBytes*/) { EXPECT_FALSE(receivedData) << "for destination " << destination; EXPECT_EQ(pages.size(), expectedGroups) << "for destination " << destination; @@ -314,7 +318,8 @@ class OutputBufferManagerTest : public testing::Test { maxBytes, nextSequence, [&](std::vector> pages, - int64_t inSequence) { + int64_t inSequence, + std::vector /*remainingBytes*/) { ASSERT_EQ(inSequence, nextSequence); for (int i = 0; i < pages.size(); ++i) { if (pages[i] != nullptr) { @@ -507,16 +512,20 @@ TEST_F(OutputBufferManagerTest, destinationBuffer) { destinationBuffer.loadData(&buffer, 0), "maxBytes can't be zero"); destinationBuffer.loadData(&buffer, 100); std::atomic notified{false}; - destinationBuffer.getData( + auto buffers = destinationBuffer.getData( 1'000'000, 0, [&](std::vector> buffers, - int64_t sequence) { + int64_t sequence, + std::vector remainingBytes) { ASSERT_EQ(buffers.size(), 1); ASSERT_TRUE(buffers[0].get() == nullptr); + ASSERT_EQ(sequence, 0); + ASSERT_TRUE(remainingBytes.empty()); notified = true; }, nullptr); + ASSERT_FALSE(buffers.immediate); ASSERT_TRUE(buffer.empty()); ASSERT_FALSE(buffer.hasNoMoreData()); ASSERT_FALSE(notified); @@ -545,10 +554,13 @@ TEST_F(OutputBufferManagerTest, destinationBuffer) { 1'000'000'000, 0, [&](std::vector> /*unused*/, - int64_t /*unused*/) { notified = true; }, + int64_t /*unused*/, + std::vector /*remainingBytes*/) { notified = true; }, []() { return true; }); - for (const auto& buffer : buffers) { - numBytes += buffer->length(); + ASSERT_TRUE(buffers.immediate); + ASSERT_TRUE(buffers.remainingBytes.empty()); + for (const auto& iobuf : buffers.data) { + numBytes += iobuf->length(); } ASSERT_GT(numBytes, 0); ASSERT_FALSE(notified); @@ -565,16 +577,19 @@ TEST_F(OutputBufferManagerTest, destinationBuffer) { 1'000'000, 1, [&](std::vector> buffers, - int64_t sequence) { + int64_t sequence, + std::vector remainingBytes) { ASSERT_EQ(sequence, 1); ASSERT_EQ(buffers.size(), 9); + ASSERT_TRUE(remainingBytes.empty()); for (const auto& buffer : buffers) { numBytes += buffer->length(); } notified = true; }, []() { return true; }); - ASSERT_TRUE(buffers.empty()); + ASSERT_FALSE(buffers.immediate); + ASSERT_TRUE(buffers.data.empty()); ASSERT_FALSE(notified); destinationBuffer.maybeLoadData(&buffer); @@ -584,6 +599,99 @@ TEST_F(OutputBufferManagerTest, destinationBuffer) { ASSERT_FALSE(buffer.hasNoMoreData()); ASSERT_EQ(numBytes, expectedNumBytes); } + + auto noNotify = [](std::vector> /*buffers*/, + int64_t /*sequence*/, + std::vector /*remainingBytes*/) { FAIL(); }; + + { + ArbitraryBuffer buffer; + for (int i = 0; i < 10; ++i) { + buffer.enqueue(makeSerializedPage(rowType_, 100)); + } + DestinationBuffer destinationBuffer; + destinationBuffer.loadData(&buffer, 1e9); + ASSERT_TRUE(buffer.empty()); + int64_t sequence = 0; + + auto buffers = + destinationBuffer.getData(1, sequence, noNotify, [] { return true; }); + ASSERT_TRUE(buffers.immediate); + ASSERT_EQ(buffers.data.size(), 1); + ASSERT_GT(buffers.data[0]->length(), 0); + ASSERT_EQ(buffers.remainingBytes.size(), 9); + ++sequence; + ASSERT_EQ(destinationBuffer.acknowledge(sequence, false).size(), 1); + + auto bytes = buffers.remainingBytes[0]; + buffers = destinationBuffer.getData( + bytes, sequence, noNotify, [] { return true; }); + ASSERT_TRUE(buffers.immediate); + ASSERT_EQ(buffers.data.size(), 1); + ASSERT_EQ(buffers.data[0]->length(), bytes); + ASSERT_EQ(buffers.remainingBytes.size(), 8); + ++sequence; + ASSERT_EQ(destinationBuffer.acknowledge(sequence, false).size(), 1); + + bytes = buffers.remainingBytes[0]; + auto bytes2 = buffers.remainingBytes[1]; + buffers = destinationBuffer.getData( + bytes + 1, sequence, noNotify, [] { return true; }); + ASSERT_TRUE(buffers.immediate); + ASSERT_EQ(buffers.data.size(), 2); + ASSERT_EQ(buffers.data[0]->length(), bytes); + ASSERT_EQ(buffers.data[1]->length(), bytes2); + ASSERT_EQ(buffers.remainingBytes.size(), 6); + sequence += 2; + ASSERT_EQ(destinationBuffer.acknowledge(sequence, false).size(), 2); + + bytes = std::accumulate( + buffers.remainingBytes.begin(), buffers.remainingBytes.end(), 0ll); + buffers = destinationBuffer.getData( + bytes, sequence, noNotify, [] { return true; }); + ASSERT_TRUE(buffers.immediate); + ASSERT_EQ(buffers.data.size(), 6); + ASSERT_EQ(buffers.remainingBytes.size(), 0); + sequence += 6; + ASSERT_EQ(destinationBuffer.acknowledge(sequence, false).size(), 6); + + bool notified = false; + buffers = destinationBuffer.getData( + 1, + sequence, + [&](std::vector> buffers, + int64_t sequence2, + std::vector remainingBytes) { + ASSERT_EQ(buffers.size(), 1); + ASSERT_TRUE(buffers[0]); + ASSERT_EQ(sequence2, sequence); + ASSERT_TRUE(remainingBytes.empty()); + notified = true; + }, + [] { return true; }); + ASSERT_FALSE(buffers.immediate); + ASSERT_FALSE(notified); + for (int i = 0; i < 10; ++i) { + buffer.enqueue(makeSerializedPage(rowType_, 100)); + } + destinationBuffer.maybeLoadData(&buffer); + destinationBuffer.getAndClearNotify().notify(); + ASSERT_TRUE(notified); + } + + { + ArbitraryBuffer buffer; + for (int i = 0; i < 10; ++i) { + buffer.enqueue(makeSerializedPage(rowType_, 100)); + } + DestinationBuffer destinationBuffer; + auto buffers = destinationBuffer.getData( + 1, 0, noNotify, [] { return true; }, &buffer); + ASSERT_TRUE(buffers.immediate); + ASSERT_EQ(buffers.data.size(), 1); + ASSERT_GT(buffers.data[0]->length(), 0); + ASSERT_EQ(buffers.remainingBytes.size(), 9); + } } TEST_F(OutputBufferManagerTest, basicPartitioned) { @@ -858,7 +966,8 @@ TEST_F(OutputBufferManagerTest, inactiveDestinationBuffer) { /*sequence=*/sequences[destination], [&, destination]( std::vector> pages, - int64_t sequence) { + int64_t sequence, + std::vector /*remainingBytes*/) { notifyCb(destination, std::move(pages), sequence); }, [&, destination]() { return actives[destination].load(); })); @@ -895,7 +1004,9 @@ TEST_F(OutputBufferManagerTest, inactiveDestinationBuffer) { /*destination=*/0, maxBytes, /*sequence=*/sequences[0], - [&](std::vector> pages, int64_t sequence) { + [&](std::vector> pages, + int64_t sequence, + std::vector /*remainingBytes*/) { notifyCb(0, std::move(pages), sequence); })); ASSERT_EQ(sequences[0], 2); @@ -916,7 +1027,9 @@ TEST_F(OutputBufferManagerTest, inactiveDestinationBuffer) { /*destination=*/1, maxBytes, /*sequence=*/sequences[1], - [&](std::vector> pages, int64_t sequence) { + [&](std::vector> pages, + int64_t sequence, + std::vector /*remainingBytes*/) { notifyCb(1, std::move(pages), sequence); }, [&]() { return actives[1].load(); })); @@ -1296,9 +1409,9 @@ TEST_F(OutputBufferManagerTest, getDataOnFailedTask) { 1, 10, 1, - [](std::vector> pages, int64_t sequence) { - VELOX_UNREACHABLE(); - })); + [](std::vector> /*pages*/, + int64_t /*sequence*/, + std::vector /*remainingBytes*/) { VELOX_UNREACHABLE(); })); // Missing tasks should be ignored in this call. ASSERT_FALSE(bufferManager_->updateNumDrivers("test.0.2", 1)); diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 848bb03ba9e86..a451ee1518c78 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -61,7 +61,8 @@ class LocalExchangeSource : public exec::ExchangeSource { // shared_ptr to the current object (self). auto resultCallback = [self, requestedSequence, buffers, this]( std::vector> data, - int64_t sequence) { + int64_t sequence, + std::vector remainingBytes) { { std::lock_guard l(timeoutMutex_); // This function is called either for a result or timeout. Only the @@ -103,7 +104,6 @@ class LocalExchangeSource : public exec::ExchangeSource { numPages_ += pages.size(); totalBytes_ += totalBytes; if (data.empty()) { - LOG(INFO) << "adjust timeout"; common::testutil::TestValue::adjust( "facebook::velox::exec::test::LocalExchangeSource::timeout", this); } @@ -148,7 +148,7 @@ class LocalExchangeSource : public exec::ExchangeSource { } if (!requestPromise.isFulfilled()) { - requestPromise.setValue(Response{totalBytes, atEnd_}); + requestPromise.setValue(Response{totalBytes, atEnd_, remainingBytes}); } }; @@ -189,8 +189,10 @@ class LocalExchangeSource : public exec::ExchangeSource { } private: - using ResultCallback = std::function< - void(std::vector> data, int64_t sequence)>; + using ResultCallback = std::function> data, + int64_t sequence, + std::vector remainingBytes)>; static void registerTimeout( const std::shared_ptr& self, ResultCallback callback, @@ -218,7 +220,7 @@ class LocalExchangeSource : public exec::ExchangeSource { } if (callback) { // Outside of mutex. - callback({}, 0); + callback({}, 0, {}); continue; } std::this_thread::sleep_for(std::chrono::seconds(1)); @@ -235,7 +237,7 @@ class LocalExchangeSource : public exec::ExchangeSource { promise = std::move(promise_); } if (promise.valid() && !promise.isFulfilled()) { - promise.setValue(Response{0, false}); + promise.setValue(Response{0, false, {}}); return true; }