Skip to content

Commit

Permalink
Rename taskId to remoteTaskId for readability in Exchange
Browse files Browse the repository at this point in the history
Currently 'taskId' in an Exchange related class has two meanings, for
example `ExchangeClient::taskId_` means local task (consumer),
`ExchangeSource::taskId_` indicates a remote task (producer). This can
cause a bit of confusion.

And, in some places current codebase already use 'remoteTaskId, such as
`ExchangeClient::remoteTaskIds_`. This patch renames the remaining
'task ID's representing the remote to 'remoteTaskId' to make it more
intuitive.

No functional changes.
  • Loading branch information
lingbin committed Dec 22, 2024
1 parent e9bb6c1 commit f89262a
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 44 deletions.
21 changes: 10 additions & 11 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ Exchange::Exchange(
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
exchangeClient_{std::move(exchangeClient)} {}

void Exchange::addTaskIds(std::vector<std::string>& taskIds) {
std::shuffle(std::begin(taskIds), std::end(taskIds), rng_);
for (const std::string& taskId : taskIds) {
void Exchange::addRemoteTaskIds(std::vector<std::string>& remoteTaskIds) {
std::shuffle(std::begin(remoteTaskIds), std::end(remoteTaskIds), rng_);
for (const std::string& taskId : remoteTaskIds) {
exchangeClient_->addRemoteTaskId(taskId);
}
stats_.wlock()->numSplits += taskIds.size();
stats_.wlock()->numSplits += remoteTaskIds.size();
}

bool Exchange::getSplits(ContinueFuture* future) {
Expand All @@ -69,7 +69,7 @@ bool Exchange::getSplits(ContinueFuture* future) {
if (noMoreSplits_) {
return false;
}
std::vector<std::string> taskIds;
std::vector<std::string> remoteTaskIds;
for (;;) {
exec::Split split;
auto reason = operatorCtx_->task()->getSplitOrFuture(
Expand All @@ -78,10 +78,10 @@ bool Exchange::getSplits(ContinueFuture* future) {
if (split.hasConnectorSplit()) {
auto remoteSplit = std::dynamic_pointer_cast<RemoteConnectorSplit>(
split.connectorSplit);
VELOX_CHECK(remoteSplit, "Wrong type of split");
taskIds.push_back(remoteSplit->taskId);
VELOX_CHECK_NOT_NULL(remoteSplit, "Wrong type of split");
remoteTaskIds.push_back(remoteSplit->taskId);
} else {
addTaskIds(taskIds);
addRemoteTaskIds(remoteTaskIds);
exchangeClient_->noMoreRemoteTasks();
noMoreSplits_ = true;
if (atEnd_) {
Expand All @@ -92,7 +92,7 @@ bool Exchange::getSplits(ContinueFuture* future) {
return false;
}
} else {
addTaskIds(taskIds);
addRemoteTaskIds(remoteTaskIds);
return true;
}
}
Expand All @@ -103,8 +103,7 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) {
return BlockingReason::kNotBlocked;
}

// Start fetching data right away. Do not wait for all
// splits to be available.
// Start fetching data right away. Do not wait for all splits to be available.

if (!splitFuture_.valid()) {
getSplits(&splitFuture_);
Expand Down
10 changes: 5 additions & 5 deletions velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ namespace facebook::velox::exec {
struct RemoteConnectorSplit : public connector::ConnectorSplit {
const std::string taskId;

explicit RemoteConnectorSplit(const std::string& _taskId)
: ConnectorSplit(""), taskId(_taskId) {}
explicit RemoteConnectorSplit(const std::string& remoteTaskId)
: ConnectorSplit(""), taskId(remoteTaskId) {}

std::string toString() const override {
return fmt::format("Remote: {}", taskId);
Expand Down Expand Up @@ -62,9 +62,9 @@ class Exchange : public SourceOperator {
private:
// Invoked to create exchange client for remote tasks.
// The function shuffles the source task ids first to randomize the source
// tasks we fetch data from. This helps to avoid different tasks fetching
// from the same source task in a distributed system.
void addTaskIds(std::vector<std::string>& taskIds);
// tasks we fetch data from. This helps to avoid different tasks fetching from
// the same source task in a distributed system.
void addRemoteTaskIds(std::vector<std::string>& remoteTaskIds);

/// Fetches splits from the task until there are no more splits or task
/// returns a future that will be complete when more splits arrive. Adds
Expand Down
13 changes: 7 additions & 6 deletions velox/exec/ExchangeClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

namespace facebook::velox::exec {

void ExchangeClient::addRemoteTaskId(const std::string& taskId) {
void ExchangeClient::addRemoteTaskId(const std::string& remoteTaskId) {
std::vector<RequestSpec> requestSpecs;
std::shared_ptr<ExchangeSource> toClose;
{
std::lock_guard<std::mutex> l(queue_->mutex());

bool duplicate = !remoteTaskIds_.insert(taskId).second;
bool duplicate = !remoteTaskIds_.insert(remoteTaskId).second;
if (duplicate) {
// Do not add sources twice. Presto protocol may add duplicate sources
// and the task updates have no guarantees of arriving in order.
Expand All @@ -35,15 +35,16 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) {

std::shared_ptr<ExchangeSource> source;
try {
source = ExchangeSource::create(taskId, destination_, queue_, pool_);
source =
ExchangeSource::create(remoteTaskId, destination_, queue_, pool_);
} catch (const VeloxException&) {
throw;
} catch (const std::exception& e) {
// Task ID can be very long. Truncate to 128 characters.
// 'remoteTaskId' can be very long. Truncate to 128 characters.
VELOX_FAIL(
"Failed to create ExchangeSource: {}. Task ID: {}.",
e.what(),
taskId.substr(0, 128));
remoteTaskId.substr(0, 128));
}

if (closed_) {
Expand Down Expand Up @@ -91,9 +92,9 @@ void ExchangeClient::close() {
}

folly::F14FastMap<std::string, RuntimeMetric> ExchangeClient::stats() const {
folly::F14FastMap<std::string, RuntimeMetric> stats;
std::lock_guard<std::mutex> l(queue_->mutex());

folly::F14FastMap<std::string, RuntimeMetric> stats;
for (const auto& source : sources_) {
if (source->supportsMetrics()) {
for (const auto& [name, value] : source->metrics()) {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
// upstream task. If 'close' has been called already, creates an exchange
// source and immediately closes it to notify the upstream task that data is
// no longer needed. Repeated calls with the same 'taskId' are ignored.
void addRemoteTaskId(const std::string& taskId);
void addRemoteTaskId(const std::string& remoteTaskId);

void noMoreRemoteTasks();

Expand Down
6 changes: 3 additions & 3 deletions velox/exec/ExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
namespace facebook::velox::exec {

std::shared_ptr<ExchangeSource> ExchangeSource::create(
const std::string& taskId,
const std::string& remoteTaskId,
int destination,
std::shared_ptr<ExchangeQueue> queue,
memory::MemoryPool* pool) {
for (auto& factory : factories()) {
auto result = factory(taskId, destination, queue, pool);
auto result = factory(remoteTaskId, destination, queue, pool);
if (result) {
return result;
}
}
VELOX_FAIL("No ExchangeSource factory matches {}", taskId);
VELOX_FAIL("No ExchangeSource factory matches {}", remoteTaskId);
}

// static
Expand Down
21 changes: 10 additions & 11 deletions velox/exec/ExchangeSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,24 @@ namespace facebook::velox::exec {
class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {
public:
ExchangeSource(
const std::string& taskId,
const std::string& remoteTaskId,
int destination,
std::shared_ptr<ExchangeQueue> queue,
memory::MemoryPool* pool)
: taskId_(taskId),
: remoteTaskId_(remoteTaskId),
destination_(destination),
queue_(std::move(queue)),
pool_(pool->shared_from_this()) {}

virtual ~ExchangeSource() = default;

static std::shared_ptr<ExchangeSource> create(
const std::string& taskId,
const std::string& remoteTaskId,
int destination,
std::shared_ptr<ExchangeQueue> queue,
memory::MemoryPool* pool);

/// Temporary API to indicate whether 'metrics()' API
/// is supported.
/// Temporary API to indicate whether 'metrics()' API is supported.
virtual bool supportsMetrics() const {
return false;
}
Expand Down Expand Up @@ -113,14 +112,14 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {

virtual std::string toString() {
std::stringstream out;
out << "[ExchangeSource " << taskId_ << ":" << destination_
out << "[ExchangeSource " << remoteTaskId_ << ":" << destination_
<< (requestPending_ ? " pending " : "") << (atEnd_ ? " at end" : "");
return out.str();
}

virtual folly::dynamic toJson() {
folly::dynamic obj = folly::dynamic::object;
obj["taskId"] = taskId_;
obj["remoteTaskId"] = remoteTaskId_;
obj["destination"] = destination_;
obj["sequence"] = sequence_;
obj["requestPending"] = requestPending_.load();
Expand All @@ -129,7 +128,7 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {
}

using Factory = std::function<std::shared_ptr<ExchangeSource>(
const std::string& taskId,
const std::string& remoteTaskId,
int destination,
std::shared_ptr<ExchangeQueue> queue,
memory::MemoryPool* pool)>;
Expand All @@ -146,9 +145,9 @@ class ExchangeSource : public std::enable_shared_from_this<ExchangeSource> {
}

protected:
// ID of the task producing data
const std::string taskId_;
// Destination number of 'this' on producer
// ID of the remote task producing data.
const std::string remoteTaskId_;
// Destination number of 'this' on producer.
const int destination_;
const std::shared_ptr<ExchangeQueue> queue_{nullptr};
// Holds a shared reference on the memory pool as it might be still possible
Expand Down
15 changes: 8 additions & 7 deletions velox/exec/tests/utils/LocalExchangeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ class LocalExchangeSource : public exec::ExchangeSource {
}

if (requestedSequence > sequence && !data.empty()) {
VLOG(2) << "Receives earlier sequence than requested: task " << taskId_
<< ", destination " << destination_ << ", requested "
<< sequence << ", received " << requestedSequence;
VLOG(2) << "Receives earlier sequence than requested: task "
<< remoteTaskId_ << ", destination " << destination_
<< ", requested " << sequence << ", received "
<< requestedSequence;
int64_t nExtra = requestedSequence - sequence;
VELOX_CHECK(nExtra < data.size());
data.erase(data.begin(), data.begin() + nExtra);
Expand Down Expand Up @@ -140,7 +141,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
}
// Outside of queue mutex.
if (atEnd_) {
buffers->deleteResults(taskId_, destination_);
buffers->deleteResults(remoteTaskId_, destination_);
}

if (!requestPromise.isFulfilled()) {
Expand All @@ -151,7 +152,7 @@ class LocalExchangeSource : public exec::ExchangeSource {
registerTimeout(self, resultCallback, maxWait);

buffers->getData(
taskId_, destination_, maxBytes, sequence_, resultCallback);
remoteTaskId_, destination_, maxBytes, sequence_, resultCallback);

return future;
}
Expand All @@ -171,14 +172,14 @@ class LocalExchangeSource : public exec::ExchangeSource {
std::lock_guard<std::mutex> l(queue_->mutex());
ackSequence = sequence_;
}
buffers->acknowledge(taskId_, destination_, ackSequence);
buffers->acknowledge(remoteTaskId_, destination_, ackSequence);
}

void close() override {
checkSetRequestPromise();

auto buffers = OutputBufferManager::getInstance().lock();
buffers->deleteResults(taskId_, destination_);
buffers->deleteResults(remoteTaskId_, destination_);
}

folly::F14FastMap<std::string, RuntimeMetric> metrics() const override {
Expand Down

0 comments on commit f89262a

Please sign in to comment.