Skip to content

Commit

Permalink
New exchange protocol
Browse files Browse the repository at this point in the history
Summary:
facebookincubator#8845

Upgrade the exchange protocol.  We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget.  This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster).

See prestodb/presto#21926

Reviewed By: amitkdutta

Differential Revision: D54027466
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 29, 2024
1 parent 2745069 commit e554327
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 162 deletions.
153 changes: 70 additions & 83 deletions velox/exec/ExchangeClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
namespace facebook::velox::exec {

void ExchangeClient::addRemoteTaskId(const std::string& taskId) {
RequestSpec requestSpec;
std::vector<RequestSpec> requestSpecs;
std::shared_ptr<ExchangeSource> toClose;
{
std::lock_guard<std::mutex> l(queue_->mutex());
Expand Down Expand Up @@ -48,19 +48,16 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) {
} else {
sources_.push_back(source);
queue_->addSourceLocked();
// Put new source into 'producingSources_' queue to prioritise fetching
// from these to find out whether these are productive or not.
producingSources_.push(source);

requestSpec = pickSourcesToRequestLocked();
emptySources_.push(source);
requestSpecs = pickSourcesToRequestLocked();
}
}

// Outside of lock.
if (toClose) {
toClose->close();
} else {
request(requestSpec);
request(requestSpecs);
}
}

Expand Down Expand Up @@ -116,7 +113,7 @@ folly::F14FastMap<std::string, RuntimeMetric> ExchangeClient::stats() const {

std::vector<std::unique_ptr<SerializedPage>>
ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
RequestSpec requestSpec;
std::vector<RequestSpec> requestSpecs;
std::vector<std::unique_ptr<SerializedPage>> pages;
{
std::lock_guard<std::mutex> l(queue_->mutex());
Expand All @@ -130,38 +127,49 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
return pages;
}

requestSpec = pickSourcesToRequestLocked();
requestSpecs = pickSourcesToRequestLocked();
}

// Outside of lock
request(requestSpec);
request(requestSpecs);
return pages;
}

void ExchangeClient::request(const RequestSpec& requestSpec) {
void ExchangeClient::request(const std::vector<RequestSpec>& requestSpecs) {
auto self = shared_from_this();
for (auto& source : requestSpec.sources) {
auto future = source->request(requestSpec.maxBytes, kDefaultMaxWaitSeconds);
for (auto& spec : requestSpecs) {
auto future = folly::SemiFuture<ExchangeSource::Response>::makeEmpty();
if (spec.maxBytes == 0) {
future = spec.source->requestDataSizes(kDefaultMaxWaitSeconds);
} else {
future = spec.source->request(spec.maxBytes, 1);
}
VELOX_CHECK(future.valid());
std::move(future)
.via(executor_)
.thenValue([self, requestSource = source](auto&& response) {
RequestSpec requestSpec;
.thenValue([self, spec = std::move(spec)](auto&& response) {
std::vector<RequestSpec> requestSpecs;
{
std::lock_guard<std::mutex> l(self->queue_->mutex());
if (self->closed_) {
return;
}
if (!response.atEnd) {
if (response.bytes > 0) {
self->producingSources_.push(requestSource);
if (!response.remainingBytes.empty()) {
for (auto bytes : response.remainingBytes) {
VELOX_CHECK_GT(bytes, 0);
}
self->producingSources_.push(
{std::move(spec.source),
std::move(response.remainingBytes)});
} else {
self->emptySources_.push(requestSource);
self->emptySources_.push(std::move(spec.source));
}
}
requestSpec = self->pickSourcesToRequestLocked();
self->totalPendingBytes_ -= spec.maxBytes;
requestSpecs = self->pickSourcesToRequestLocked();
}
self->request(requestSpec);
self->request(requestSpecs);
})
.thenError(
folly::tag_t<std::exception>{}, [self](const std::exception& e) {
Expand All @@ -170,74 +178,53 @@ void ExchangeClient::request(const RequestSpec& requestSpec) {
}
}

int32_t ExchangeClient::countPendingSourcesLocked() {
int32_t numPending = 0;
for (auto& source : sources_) {
if (source->isRequestPendingLocked()) {
++numPending;
}
}
return numPending;
}

int64_t ExchangeClient::getAveragePageSize() {
auto averagePageSize =
std::min<int64_t>(maxQueuedBytes_, queue_->averageReceivedPageBytes());
if (averagePageSize == 0) {
averagePageSize = 1 << 20; // 1 MB.
std::vector<ExchangeClient::RequestSpec>
ExchangeClient::pickSourcesToRequestLocked() {
if (closed_) {
return {};
}

return averagePageSize;
}

int32_t ExchangeClient::getNumSourcesToRequestLocked(int64_t averagePageSize) {
// Figure out how many more 'averagePageSize' fit into 'maxQueuedBytes_'.
// Make sure to leave room for 'numPending' pages.
const auto numPending = countPendingSourcesLocked();

auto numToRequest = std::max<int32_t>(
1, (maxQueuedBytes_ - queue_->totalBytes()) / averagePageSize);
if (numToRequest <= numPending) {
return 0;
std::vector<RequestSpec> requestSpecs;
while (!emptySources_.empty()) {
auto& source = emptySources_.front();
VELOX_CHECK(source->shouldRequestLocked());
requestSpecs.push_back({std::move(source), 0});
emptySources_.pop();
}

return numToRequest - numPending;
}

void ExchangeClient::pickSourcesToRequestLocked(
RequestSpec& requestSpec,
int32_t numToRequest,
std::queue<std::shared_ptr<ExchangeSource>>& sources) {
while (requestSpec.sources.size() < numToRequest && !sources.empty()) {
auto& source = sources.front();
if (source->shouldRequestLocked()) {
requestSpec.sources.push_back(source);
int64_t availableSpace =
maxQueuedBytes_ - queue_->totalBytes() - totalPendingBytes_;
while (availableSpace > 0 && !producingSources_.empty()) {
auto& source = producingSources_.front().source;
int64_t requestBytes = 0;
for (auto bytes : producingSources_.front().remainingBytes) {
availableSpace -= bytes;
if (availableSpace < 0) {
break;
}
requestBytes += bytes;
}
sources.pop();
}
}

ExchangeClient::RequestSpec ExchangeClient::pickSourcesToRequestLocked() {
if (closed_ || queue_->totalBytes() >= maxQueuedBytes_) {
return {};
if (requestBytes == 0) {
VELOX_CHECK_LT(availableSpace, 0);
break;
}
VELOX_CHECK(source->shouldRequestLocked());
requestSpecs.push_back({std::move(source), requestBytes});
producingSources_.pop();
totalPendingBytes_ += requestBytes;
}

const auto averagePageSize = getAveragePageSize();
const auto numToRequest = getNumSourcesToRequestLocked(averagePageSize);

if (numToRequest == 0) {
return {};
if (queue_->totalBytes() == 0 && totalPendingBytes_ == 0 &&
!producingSources_.empty()) {
// We have full capacity but still cannot initiate one single data transfer.
// Let the transfer happen in this case to avoid stuck.
auto& source = producingSources_.front().source;
auto requestBytes = producingSources_.front().remainingBytes.at(0);
LOG(INFO) << "Requesting large single page " << requestBytes
<< " bytes, exceeding capacity " << maxQueuedBytes_;
VELOX_CHECK(source->shouldRequestLocked());
requestSpecs.push_back({std::move(source), requestBytes});
producingSources_.pop();
totalPendingBytes_ += requestBytes;
}

RequestSpec requestSpec;
requestSpec.maxBytes = averagePageSize;

// Pick up to 'numToRequest' next sources to request data from. Prioritize
// sources that return data.
pickSourcesToRequestLocked(requestSpec, numToRequest, producingSources_);
pickSourcesToRequestLocked(requestSpec, numToRequest, emptySources_);

return requestSpec;
return requestSpecs;
}

ExchangeClient::~ExchangeClient() {
Expand Down
30 changes: 14 additions & 16 deletions velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,22 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
folly::dynamic toJson() const;

private:
// A list of sources to request data from and how much to request from each
// (in bytes).
struct RequestSpec {
std::vector<std::shared_ptr<ExchangeSource>> sources;
std::shared_ptr<ExchangeSource> source;

// How much bytes to request from this source. 0 bytes means request data
// sizes only.
int64_t maxBytes;
};

int64_t getAveragePageSize();

int32_t getNumSourcesToRequestLocked(int64_t averagePageSize);

RequestSpec pickSourcesToRequestLocked();

void pickSourcesToRequestLocked(
RequestSpec& requestSpec,
int32_t numToRequest,
std::queue<std::shared_ptr<ExchangeSource>>& sources);
struct ProducingSource {
std::shared_ptr<ExchangeSource> source;
std::vector<int64_t> remainingBytes;
};

int32_t countPendingSourcesLocked();
std::vector<RequestSpec> pickSourcesToRequestLocked();

void request(const RequestSpec& requestSpec);
void request(const std::vector<RequestSpec>& requestSpecs);

// Handy for ad-hoc logging.
const std::string taskId_;
Expand All @@ -131,9 +126,12 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
std::vector<std::shared_ptr<ExchangeSource>> sources_;
bool closed_{false};

// Total number of bytes in flight.
int64_t totalPendingBytes_{0};

// A queue of sources that have returned non-empty response from the latest
// request.
std::queue<std::shared_ptr<ExchangeSource>> producingSources_;
std::queue<ProducingSource> producingSources_;
// A queue of sources that returned empty response from the latest request.
std::queue<std::shared_ptr<ExchangeSource>> emptySources_;
};
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class ExchangeQueue {
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);

/// Returns the total bytes held by SerializedPages in 'this'.
uint64_t totalBytes() const {
int64_t totalBytes() const {
return totalBytes_;
}

Expand Down Expand Up @@ -197,7 +197,7 @@ class ExchangeQueue {
// throw an exception with this message.
std::string error_;
// Total size of SerializedPages in queue.
uint64_t totalBytes_{0};
int64_t totalBytes_{0};
// Number of SerializedPages received.
int64_t receivedPages_{0};
// Total size of SerializedPages received. Used to calculate an average
Expand Down
4 changes: 1 addition & 3 deletions velox/exec/ExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {
/// backward compatibility (e.g. communicating with coordinator), we allow
/// small data (1MB) to be returned.
virtual folly::SemiFuture<Response> requestDataSizes(
uint32_t /*maxWaitSeconds*/) {
VELOX_NYI();
}
uint32_t maxWaitSeconds) = 0;

/// Close the exchange source. May be called before all data
/// has been received and processed. This can happen in case
Expand Down
Loading

0 comments on commit e554327

Please sign in to comment.