From eee9a8e9da136f9d05dab02fd18fe39d6887e0c2 Mon Sep 17 00:00:00 2001 From: Ge Gao Date: Fri, 13 Dec 2024 18:53:23 -0800 Subject: [PATCH] refactor: Add arg cpuExecutor to newConnector interface of ConnectorFactory Add an additional argument, cpuExecutor, to the ConnectorFactory's newConnector interface. Connectors could send async operators to this cpuExecutor to prevent occupying the driver executor. --- velox/connectors/Connector.h | 3 ++- velox/connectors/fuzzer/FuzzerConnector.h | 5 +++-- velox/connectors/hive/HiveConnector.h | 5 +++-- velox/connectors/tests/ConnectorTest.cpp | 3 ++- velox/connectors/tpch/TpchConnector.h | 5 +++-- velox/exec/tests/AsyncConnectorTest.cpp | 3 ++- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 565e64fdb3631..b0a0f108aa216 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -482,7 +482,8 @@ class ConnectorFactory { virtual std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) = 0; + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) = 0; private: const std::string name_; diff --git a/velox/connectors/fuzzer/FuzzerConnector.h b/velox/connectors/fuzzer/FuzzerConnector.h index 4e8f665608b40..64477b73ea36f 100644 --- a/velox/connectors/fuzzer/FuzzerConnector.h +++ b/velox/connectors/fuzzer/FuzzerConnector.h @@ -141,8 +141,9 @@ class FuzzerConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) override { - return std::make_shared(id, config, executor); + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config, ioExecutor); } }; diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index a98a7f7532893..673c83792a88d 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -93,8 +93,9 @@ class HiveConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) override { - return std::make_shared(id, config, executor); + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config, ioExecutor); } }; diff --git a/velox/connectors/tests/ConnectorTest.cpp b/velox/connectors/tests/ConnectorTest.cpp index 5681bc80cb04e..96d46e607be9a 100644 --- a/velox/connectors/tests/ConnectorTest.cpp +++ b/velox/connectors/tests/ConnectorTest.cpp @@ -59,7 +59,8 @@ class TestConnectorFactory : public connector::ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr /*config*/, - folly::Executor* /*executor*/ = nullptr) override { + folly::Executor* /*ioExecutor*/ = nullptr, + folly::Executor* /*cpuExecutor*/ = nullptr) override { return std::make_shared(id); } }; diff --git a/velox/connectors/tpch/TpchConnector.h b/velox/connectors/tpch/TpchConnector.h index babcda8e5cb01..02f6579828de0 100644 --- a/velox/connectors/tpch/TpchConnector.h +++ b/velox/connectors/tpch/TpchConnector.h @@ -171,8 +171,9 @@ class TpchConnectorFactory : public ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* executor = nullptr) override { - return std::make_shared(id, config, executor); + folly::Executor* ioExecutor = nullptr, + folly::Executor* cpuExecutor = nullptr) override { + return std::make_shared(id, config, ioExecutor); } }; diff --git a/velox/exec/tests/AsyncConnectorTest.cpp b/velox/exec/tests/AsyncConnectorTest.cpp index b4c532bdfa6f6..7f2ad55c00d6c 100644 --- a/velox/exec/tests/AsyncConnectorTest.cpp +++ b/velox/exec/tests/AsyncConnectorTest.cpp @@ -164,7 +164,8 @@ class TestConnectorFactory : public connector::ConnectorFactory { std::shared_ptr newConnector( const std::string& id, std::shared_ptr config, - folly::Executor* /* executor */) override { + folly::Executor* /* ioExecutor */, + folly::Executor* /* cpuExecutor */) override { return std::make_shared(id); } };