From f89262a1ad7276316e470740fec29a42a000583b Mon Sep 17 00:00:00 2001 From: lingbin Date: Tue, 5 Nov 2024 01:40:24 +0800 Subject: [PATCH] Rename taskId to remoteTaskId for readability in Exchange Currently 'taskId' in an Exchange related class has two meanings, for example `ExchangeClient::taskId_` means local task (consumer), `ExchangeSource::taskId_` indicates a remote task (producer). This can cause a bit of confusion. And, in some places current codebase already use 'remoteTaskId, such as `ExchangeClient::remoteTaskIds_`. This patch renames the remaining 'task ID's representing the remote to 'remoteTaskId' to make it more intuitive. No functional changes. --- velox/exec/Exchange.cpp | 21 +++++++++---------- velox/exec/Exchange.h | 10 ++++----- velox/exec/ExchangeClient.cpp | 13 ++++++------ velox/exec/ExchangeClient.h | 2 +- velox/exec/ExchangeSource.cpp | 6 +++--- velox/exec/ExchangeSource.h | 21 +++++++++---------- .../exec/tests/utils/LocalExchangeSource.cpp | 15 ++++++------- 7 files changed, 44 insertions(+), 44 deletions(-) diff --git a/velox/exec/Exchange.cpp b/velox/exec/Exchange.cpp index c45e8c8b2af4b..fb27c4f31ad90 100644 --- a/velox/exec/Exchange.cpp +++ b/velox/exec/Exchange.cpp @@ -54,12 +54,12 @@ Exchange::Exchange( processSplits_{operatorCtx_->driverCtx()->driverId == 0}, exchangeClient_{std::move(exchangeClient)} {} -void Exchange::addTaskIds(std::vector& taskIds) { - std::shuffle(std::begin(taskIds), std::end(taskIds), rng_); - for (const std::string& taskId : taskIds) { +void Exchange::addRemoteTaskIds(std::vector& remoteTaskIds) { + std::shuffle(std::begin(remoteTaskIds), std::end(remoteTaskIds), rng_); + for (const std::string& taskId : remoteTaskIds) { exchangeClient_->addRemoteTaskId(taskId); } - stats_.wlock()->numSplits += taskIds.size(); + stats_.wlock()->numSplits += remoteTaskIds.size(); } bool Exchange::getSplits(ContinueFuture* future) { @@ -69,7 +69,7 @@ bool Exchange::getSplits(ContinueFuture* future) { if (noMoreSplits_) { return false; } - std::vector taskIds; + std::vector remoteTaskIds; for (;;) { exec::Split split; auto reason = operatorCtx_->task()->getSplitOrFuture( @@ -78,10 +78,10 @@ bool Exchange::getSplits(ContinueFuture* future) { if (split.hasConnectorSplit()) { auto remoteSplit = std::dynamic_pointer_cast( split.connectorSplit); - VELOX_CHECK(remoteSplit, "Wrong type of split"); - taskIds.push_back(remoteSplit->taskId); + VELOX_CHECK_NOT_NULL(remoteSplit, "Wrong type of split"); + remoteTaskIds.push_back(remoteSplit->taskId); } else { - addTaskIds(taskIds); + addRemoteTaskIds(remoteTaskIds); exchangeClient_->noMoreRemoteTasks(); noMoreSplits_ = true; if (atEnd_) { @@ -92,7 +92,7 @@ bool Exchange::getSplits(ContinueFuture* future) { return false; } } else { - addTaskIds(taskIds); + addRemoteTaskIds(remoteTaskIds); return true; } } @@ -103,8 +103,7 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) { return BlockingReason::kNotBlocked; } - // Start fetching data right away. Do not wait for all - // splits to be available. + // Start fetching data right away. Do not wait for all splits to be available. if (!splitFuture_.valid()) { getSplits(&splitFuture_); diff --git a/velox/exec/Exchange.h b/velox/exec/Exchange.h index 3a1f32cac5339..9630ec591fa56 100644 --- a/velox/exec/Exchange.h +++ b/velox/exec/Exchange.h @@ -27,8 +27,8 @@ namespace facebook::velox::exec { struct RemoteConnectorSplit : public connector::ConnectorSplit { const std::string taskId; - explicit RemoteConnectorSplit(const std::string& _taskId) - : ConnectorSplit(""), taskId(_taskId) {} + explicit RemoteConnectorSplit(const std::string& remoteTaskId) + : ConnectorSplit(""), taskId(remoteTaskId) {} std::string toString() const override { return fmt::format("Remote: {}", taskId); @@ -62,9 +62,9 @@ class Exchange : public SourceOperator { private: // Invoked to create exchange client for remote tasks. // The function shuffles the source task ids first to randomize the source - // tasks we fetch data from. This helps to avoid different tasks fetching - // from the same source task in a distributed system. - void addTaskIds(std::vector& taskIds); + // tasks we fetch data from. This helps to avoid different tasks fetching from + // the same source task in a distributed system. + void addRemoteTaskIds(std::vector& remoteTaskIds); /// Fetches splits from the task until there are no more splits or task /// returns a future that will be complete when more splits arrive. Adds diff --git a/velox/exec/ExchangeClient.cpp b/velox/exec/ExchangeClient.cpp index b37b50cca3618..ed93c94469e37 100644 --- a/velox/exec/ExchangeClient.cpp +++ b/velox/exec/ExchangeClient.cpp @@ -20,13 +20,13 @@ namespace facebook::velox::exec { -void ExchangeClient::addRemoteTaskId(const std::string& taskId) { +void ExchangeClient::addRemoteTaskId(const std::string& remoteTaskId) { std::vector requestSpecs; std::shared_ptr toClose; { std::lock_guard l(queue_->mutex()); - bool duplicate = !remoteTaskIds_.insert(taskId).second; + bool duplicate = !remoteTaskIds_.insert(remoteTaskId).second; if (duplicate) { // Do not add sources twice. Presto protocol may add duplicate sources // and the task updates have no guarantees of arriving in order. @@ -35,15 +35,16 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) { std::shared_ptr source; try { - source = ExchangeSource::create(taskId, destination_, queue_, pool_); + source = + ExchangeSource::create(remoteTaskId, destination_, queue_, pool_); } catch (const VeloxException&) { throw; } catch (const std::exception& e) { - // Task ID can be very long. Truncate to 128 characters. + // 'remoteTaskId' can be very long. Truncate to 128 characters. VELOX_FAIL( "Failed to create ExchangeSource: {}. Task ID: {}.", e.what(), - taskId.substr(0, 128)); + remoteTaskId.substr(0, 128)); } if (closed_) { @@ -91,9 +92,9 @@ void ExchangeClient::close() { } folly::F14FastMap ExchangeClient::stats() const { + folly::F14FastMap stats; std::lock_guard l(queue_->mutex()); - folly::F14FastMap stats; for (const auto& source : sources_) { if (source->supportsMetrics()) { for (const auto& [name, value] : source->metrics()) { diff --git a/velox/exec/ExchangeClient.h b/velox/exec/ExchangeClient.h index 29bc3b884fdd9..e6931d1f1fb7a 100644 --- a/velox/exec/ExchangeClient.h +++ b/velox/exec/ExchangeClient.h @@ -66,7 +66,7 @@ class ExchangeClient : public std::enable_shared_from_this { // upstream task. If 'close' has been called already, creates an exchange // source and immediately closes it to notify the upstream task that data is // no longer needed. Repeated calls with the same 'taskId' are ignored. - void addRemoteTaskId(const std::string& taskId); + void addRemoteTaskId(const std::string& remoteTaskId); void noMoreRemoteTasks(); diff --git a/velox/exec/ExchangeSource.cpp b/velox/exec/ExchangeSource.cpp index ff28439503dfe..d766391a9ba4b 100644 --- a/velox/exec/ExchangeSource.cpp +++ b/velox/exec/ExchangeSource.cpp @@ -18,17 +18,17 @@ namespace facebook::velox::exec { std::shared_ptr ExchangeSource::create( - const std::string& taskId, + const std::string& remoteTaskId, int destination, std::shared_ptr queue, memory::MemoryPool* pool) { for (auto& factory : factories()) { - auto result = factory(taskId, destination, queue, pool); + auto result = factory(remoteTaskId, destination, queue, pool); if (result) { return result; } } - VELOX_FAIL("No ExchangeSource factory matches {}", taskId); + VELOX_FAIL("No ExchangeSource factory matches {}", remoteTaskId); } // static diff --git a/velox/exec/ExchangeSource.h b/velox/exec/ExchangeSource.h index 5137423f7b2a3..eb05d7c4d93be 100644 --- a/velox/exec/ExchangeSource.h +++ b/velox/exec/ExchangeSource.h @@ -23,11 +23,11 @@ namespace facebook::velox::exec { class ExchangeSource : public std::enable_shared_from_this { public: ExchangeSource( - const std::string& taskId, + const std::string& remoteTaskId, int destination, std::shared_ptr queue, memory::MemoryPool* pool) - : taskId_(taskId), + : remoteTaskId_(remoteTaskId), destination_(destination), queue_(std::move(queue)), pool_(pool->shared_from_this()) {} @@ -35,13 +35,12 @@ class ExchangeSource : public std::enable_shared_from_this { virtual ~ExchangeSource() = default; static std::shared_ptr create( - const std::string& taskId, + const std::string& remoteTaskId, int destination, std::shared_ptr queue, memory::MemoryPool* pool); - /// Temporary API to indicate whether 'metrics()' API - /// is supported. + /// Temporary API to indicate whether 'metrics()' API is supported. virtual bool supportsMetrics() const { return false; } @@ -113,14 +112,14 @@ class ExchangeSource : public std::enable_shared_from_this { virtual std::string toString() { std::stringstream out; - out << "[ExchangeSource " << taskId_ << ":" << destination_ + out << "[ExchangeSource " << remoteTaskId_ << ":" << destination_ << (requestPending_ ? " pending " : "") << (atEnd_ ? " at end" : ""); return out.str(); } virtual folly::dynamic toJson() { folly::dynamic obj = folly::dynamic::object; - obj["taskId"] = taskId_; + obj["remoteTaskId"] = remoteTaskId_; obj["destination"] = destination_; obj["sequence"] = sequence_; obj["requestPending"] = requestPending_.load(); @@ -129,7 +128,7 @@ class ExchangeSource : public std::enable_shared_from_this { } using Factory = std::function( - const std::string& taskId, + const std::string& remoteTaskId, int destination, std::shared_ptr queue, memory::MemoryPool* pool)>; @@ -146,9 +145,9 @@ class ExchangeSource : public std::enable_shared_from_this { } protected: - // ID of the task producing data - const std::string taskId_; - // Destination number of 'this' on producer + // ID of the remote task producing data. + const std::string remoteTaskId_; + // Destination number of 'this' on producer. const int destination_; const std::shared_ptr queue_{nullptr}; // Holds a shared reference on the memory pool as it might be still possible diff --git a/velox/exec/tests/utils/LocalExchangeSource.cpp b/velox/exec/tests/utils/LocalExchangeSource.cpp index 9d012c908c95e..b6baae9583acf 100644 --- a/velox/exec/tests/utils/LocalExchangeSource.cpp +++ b/velox/exec/tests/utils/LocalExchangeSource.cpp @@ -75,9 +75,10 @@ class LocalExchangeSource : public exec::ExchangeSource { } if (requestedSequence > sequence && !data.empty()) { - VLOG(2) << "Receives earlier sequence than requested: task " << taskId_ - << ", destination " << destination_ << ", requested " - << sequence << ", received " << requestedSequence; + VLOG(2) << "Receives earlier sequence than requested: task " + << remoteTaskId_ << ", destination " << destination_ + << ", requested " << sequence << ", received " + << requestedSequence; int64_t nExtra = requestedSequence - sequence; VELOX_CHECK(nExtra < data.size()); data.erase(data.begin(), data.begin() + nExtra); @@ -140,7 +141,7 @@ class LocalExchangeSource : public exec::ExchangeSource { } // Outside of queue mutex. if (atEnd_) { - buffers->deleteResults(taskId_, destination_); + buffers->deleteResults(remoteTaskId_, destination_); } if (!requestPromise.isFulfilled()) { @@ -151,7 +152,7 @@ class LocalExchangeSource : public exec::ExchangeSource { registerTimeout(self, resultCallback, maxWait); buffers->getData( - taskId_, destination_, maxBytes, sequence_, resultCallback); + remoteTaskId_, destination_, maxBytes, sequence_, resultCallback); return future; } @@ -171,14 +172,14 @@ class LocalExchangeSource : public exec::ExchangeSource { std::lock_guard l(queue_->mutex()); ackSequence = sequence_; } - buffers->acknowledge(taskId_, destination_, ackSequence); + buffers->acknowledge(remoteTaskId_, destination_, ackSequence); } void close() override { checkSetRequestPromise(); auto buffers = OutputBufferManager::getInstance().lock(); - buffers->deleteResults(taskId_, destination_); + buffers->deleteResults(remoteTaskId_, destination_); } folly::F14FastMap metrics() const override {