Skip to content

Commit

Permalink
refactor: Add arg cpuExecutor to ConnectorFactory::newConnector
Browse files Browse the repository at this point in the history
Add an additional argument, cpuExecutor, to the ConnectorFactory's
newConnector interface. Connectors could send async operators to this
cpuExecutor to prevent occupying the driver executor.
  • Loading branch information
gggrace14 committed Dec 14, 2024
1 parent abc54d9 commit f6e5f03
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 9 deletions.
3 changes: 2 additions & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,8 @@ class ConnectorFactory {
virtual std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) = 0;
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) = 0;

private:
const std::string name_;
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/fuzzer/FuzzerConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ class FuzzerConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<FuzzerConnector>(id, config, ioExecutor);
}
};

Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ class HiveConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<HiveConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/tests/ConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> /*config*/,
folly::Executor* /*executor*/ = nullptr) override {
folly::Executor* /*ioExecutor*/ = nullptr,
folly::Executor* /*cpuExecutor*/ = nullptr) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down
5 changes: 3 additions & 2 deletions velox/connectors/tpch/TpchConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ class TpchConnectorFactory : public ConnectorFactory {
std::shared_ptr<Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* executor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, executor);
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<TpchConnector>(id, config, ioExecutor);
}
};

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/tests/AsyncConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ class TestConnectorFactory : public connector::ConnectorFactory {
std::shared_ptr<connector::Connector> newConnector(
const std::string& id,
std::shared_ptr<const config::ConfigBase> config,
folly::Executor* /* executor */) override {
folly::Executor* /* ioExecutor */,
folly::Executor* /* cpuExecutor */) override {
return std::make_shared<TestConnector>(id);
}
};
Expand Down

0 comments on commit f6e5f03

Please sign in to comment.