Skip to content

Commit

Permalink
Change max wait for actual data fetching to 100ms (facebookincubator#…
Browse files Browse the repository at this point in the history
…8965)

Summary:

As a follow up to the exchange protocol upgrade, we can reduce the max
wait for actual data fetching to a very small amount, to avoid long waiting in
particular happening when multiple destination buffers are sharing the same
arbitrary buffer.

Reviewed By: mbasmanova

Differential Revision: D54495685
  • Loading branch information
Yuhta authored and facebook-github-bot committed Mar 5, 2024
1 parent dec9bb9 commit 4fb08a9
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 23 deletions.
5 changes: 2 additions & 3 deletions velox/exec/ExchangeClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,9 @@ void ExchangeClient::request(std::vector<RequestSpec>&& requestSpecs) {
for (auto& spec : requestSpecs) {
auto future = folly::SemiFuture<ExchangeSource::Response>::makeEmpty();
if (spec.maxBytes == 0) {
future = spec.source->requestDataSizes(kDefaultMaxWaitSeconds);
future = spec.source->requestDataSizes(kRequestDataSizesMaxWait);
} else {
// TODO: Change maxWait to 100ms once we fix the unit of wait time.
future = spec.source->request(spec.maxBytes, 1);
future = spec.source->request(spec.maxBytes, kRequestDataMaxWait);
}
VELOX_CHECK(future.valid());
std::move(future)
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace facebook::velox::exec {
class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
public:
static constexpr int32_t kDefaultMaxQueuedBytes = 32 << 20; // 32 MB.
static constexpr int32_t kDefaultMaxWaitSeconds = 2;
static constexpr std::chrono::seconds kRequestDataSizesMaxWait{2};
static constexpr std::chrono::milliseconds kRequestDataMaxWait{100};
static inline const std::string kBackgroundCpuTimeMs = "backgroundCpuTimeMs";

ExchangeClient(
Expand Down
29 changes: 24 additions & 5 deletions velox/exec/ExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,38 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {
};

/// Requests the producer to generate up to 'maxBytes' more data and reply
/// within 'maxWaitSeconds'. Returns a future that completes when producer
/// responds either with 'data' or with a message indicating that all data has
/// been already produced or data will take more time to produce.
/// within 'maxWait'. Returns a future that completes when producer responds
/// either with 'data' or with a message indicating that all data has been
/// already produced or data will take more time to produce.
virtual folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) = 0;
std::chrono::nanoseconds maxWait) {
auto maxWaitSeconds = std::max(1ll, llround(maxWait.count() / 1e9));
return request(maxBytes, maxWaitSeconds);
}

/// Ask for available data sizes that can be fetched. Normally should not
/// fetching any actual data (i.e. Response::bytes should be 0). However for
/// backward compatibility (e.g. communicating with coordinator), we allow
/// small data (1MB) to be returned.
virtual folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) = 0;
std::chrono::nanoseconds maxWait) {
auto maxWaitSeconds = std::max(1ll, llround(maxWait.count() / 1e9));
return requestDataSizes(maxWaitSeconds);
}

/// Used in presto_cpp for backward compatibility only, will be removed soon.
virtual folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) {
VELOX_UNREACHABLE();
}

/// Used in presto_cpp for backward compatibility only, will be removed soon.
virtual folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) {
VELOX_UNREACHABLE();
}

/// Close the exchange source. May be called before all data
/// has been received and processed. This can happen in case
Expand Down
10 changes: 5 additions & 5 deletions velox/exec/tests/ExchangeClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ TEST_F(ExchangeClientTest, sourceTimeout) {

#ifndef NDEBUG
// Wait until all sources have timed out at least once.
constexpr int32_t kMaxIters =
3 * kNumSources * ExchangeClient::kDefaultMaxWaitSeconds;
int32_t counter = 0;
for (; counter < kMaxIters; ++counter) {
auto deadline = std::chrono::system_clock::now() +
3 * kNumSources *
std::chrono::seconds(ExchangeClient::kRequestDataSizesMaxWait);
while (std::chrono::system_clock::now() < deadline) {
{
std::lock_guard<std::mutex> l(mutex);
if (sourcesWithTimeout.size() >= kNumSources) {
Expand All @@ -383,7 +383,7 @@ TEST_F(ExchangeClientTest, sourceTimeout) {
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
EXPECT_LT(counter, kMaxIters);
EXPECT_LT(std::chrono::system_clock::now(), deadline);
#endif

const auto& queue = client->queue();
Expand Down
21 changes: 12 additions & 9 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class LocalExchangeSource : public exec::ExchangeSource {

folly::SemiFuture<Response> request(
uint32_t maxBytes,
uint32_t maxWaitSeconds) override {
std::chrono::nanoseconds maxWait) override {
++numRequests_;

auto promise = VeloxPromise<Response>("LocalExchangeSource::request");
Expand Down Expand Up @@ -152,7 +152,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
}
};

registerTimeout(self, resultCallback, maxWaitSeconds);
registerTimeout(self, resultCallback, maxWait);

buffers->getData(
taskId_, destination_, maxBytes, sequence_, resultCallback);
Expand All @@ -161,8 +161,8 @@ class LocalExchangeSource : public exec::ExchangeSource {
}

folly::SemiFuture<Response> requestDataSizes(
uint32_t maxWaitSeconds) override {
return request(0, maxWaitSeconds);
std::chrono::nanoseconds maxWait) override {
return request(0, maxWait);
}

void close() override {
Expand Down Expand Up @@ -201,7 +201,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
static void registerTimeout(
const std::shared_ptr<ExchangeSource>& self,
ResultCallback callback,
int32_t seconds) {
std::chrono::nanoseconds maxWait) {
std::lock_guard<std::mutex> l(timeoutMutex_);
if (!executor_) {
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(1);
Expand All @@ -212,7 +212,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
stop_ = false;
executor_->add([]() {
while (!stop_) {
auto now = getCurrentTimeSec();
auto now = std::chrono::system_clock::now();
ResultCallback callback = nullptr;
{
std::lock_guard<std::mutex> t(timeoutMutex_);
Expand All @@ -232,7 +232,8 @@ class LocalExchangeSource : public exec::ExchangeSource {
}
});
}
timeouts_[self] = std::make_pair(callback, getCurrentTimeSec() + seconds);
timeouts_[self] =
std::make_pair(callback, std::chrono::system_clock::now() + maxWait);
}

bool checkSetRequestPromise() {
Expand All @@ -258,7 +259,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
static std::mutex timeoutMutex_;
static folly::F14FastMap<
std::shared_ptr<ExchangeSource>,
std::pair<ResultCallback, size_t>>
std::pair<ResultCallback, std::chrono::system_clock::time_point>>
timeouts_;
static std::unique_ptr<folly::CPUThreadPoolExecutor> executor_;
static std::atomic_bool stop_;
Expand All @@ -268,7 +269,9 @@ class LocalExchangeSource : public exec::ExchangeSource {
std::mutex LocalExchangeSource::timeoutMutex_;
folly::F14FastMap<
std::shared_ptr<ExchangeSource>,
std::pair<LocalExchangeSource::ResultCallback, size_t>>
std::pair<
LocalExchangeSource::ResultCallback,
std::chrono::system_clock::time_point>>
LocalExchangeSource::timeouts_;
std::unique_ptr<folly::CPUThreadPoolExecutor> LocalExchangeSource::executor_;
std::atomic_bool LocalExchangeSource::stop_ = false;
Expand Down

0 comments on commit 4fb08a9

Please sign in to comment.