Skip to content

Commit

Permalink
refactor: Add arg cpuExecutor to newConnector interface of ConnectorF…
Browse files Browse the repository at this point in the history
…actory

Add an additional argument, cpuExecutor, to the ConnectorFactory's
newConnector interface. Connectors could send async operators to this
cpuExecutor to prevent occupying the driver executor.
  • Loading branch information
gggrace14 committed Dec 14, 2024
1 parent 19c5771 commit eee9a8e
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
3 changes: 2 additions & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ class ConnectorFactory {
virtual std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) = 0;
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) = 0;

private:
const std::string name_;
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ class FuzzerConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, ioExecutor);
}
};

Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ class HiveConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/tests/ConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> /*config*/,
folly::Executor* /*executor*/ = nullptr) override {
folly::Executor* /*ioExecutor*/ = nullptr,
folly::Executor* /*cpuExecutor*/ = nullptr) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/tpch/TpchConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ class TpchConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/AsyncConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<connector::Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* /* executor */) override {
folly::Executor* /* ioExecutor */,
folly::Executor* /* cpuExecutor */) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down

0 comments on commit eee9a8e

Please sign in to comment.