Skip to content

Commit

Permalink
Minor refactor of ExchangeClient (#8419)
Browse files Browse the repository at this point in the history
Summary:
Naming and comments refactoring of ExchangeClient and ExchangeQueue

Pull Request resolved: #8419

Reviewed By: amitkdutta

Differential Revision: D52839095

Pulled By: tanjialiang

fbshipit-source-id: 508ecda8b7adbc526a1c63c2579bcbdea0f1f30d
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Jan 17, 2024
1 parent 94a1b43 commit 1d778ef
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion velox/exec/ExchangeClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void ExchangeClient::addRemoteTaskId(const std::string& taskId) {
{
std::lock_guard<std::mutex> l(queue_->mutex());

bool duplicate = !taskIds_.insert(taskId).second;
bool duplicate = !remoteTaskIds_.insert(taskId).second;
if (duplicate) {
// Do not add sources twice. Presto protocol may add duplicate sources
// and the task updates have no guarantees of arriving in order.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
folly::Executor* const executor_;
const std::shared_ptr<ExchangeQueue> queue_;

std::unordered_set<std::string> taskIds_;
std::unordered_set<std::string> remoteTaskIds_;
std::vector<std::shared_ptr<ExchangeSource>> sources_;
bool closed_{false};

Expand Down
18 changes: 12 additions & 6 deletions velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ class SerializedPage {
std::function<void(folly::IOBuf&)> onDestructionCb_;
};

// Queue of results retrieved from source. Owned by shared_ptr by
// Exchange and client threads and registered callbacks waiting
// for input.
/// Queue of results retrieved from source. Owned by shared_ptr by
/// Exchange and client threads and registered callbacks waiting
/// for input.
class ExchangeQueue {
public:
~ExchangeQueue() {
Expand All @@ -93,13 +93,19 @@ class ExchangeQueue {
return queue_.empty();
}

/// Enqueues 'page' to the queue. One random promise(top of promise queue)
/// associated with the future that is waiting for the data from the queue is
/// returned in 'promises' if 'page' is not nullptr. When 'page' is nullptr
/// and the queue is completed serving data, all left over promises will be
/// returned in 'promises'. When 'page' is nullptr and the queue is not
/// completed serving data, no 'promises' will be added and returned.
void enqueueLocked(
std::unique_ptr<SerializedPage>&& page,
std::vector<ContinuePromise>& promises);

// If data is permanently not available, e.g. the source cannot be
// contacted, this registers an error message and causes the reading
// Exchanges to throw with the message.
/// If data is permanently not available, e.g. the source cannot be
/// contacted, this registers an error message and causes the reading
/// Exchanges to throw with the message.
void setError(const std::string& error);

/// Returns pages of data.
Expand Down

0 comments on commit 1d778ef

Please sign in to comment.