From fd06bd90f989b764e060ea97f898dfbe57282ce1 Mon Sep 17 00:00:00 2001 From: rui-mo Date: Tue, 3 Sep 2024 14:10:52 -0700 Subject: [PATCH] Use aggregate pool in Spark query runner (#10798) Summary: In https://github.com/facebookincubator/velox/issues/10568, an 'aggregate' pool is passed to query runner's constructor. This PR uses it to create child pools in the Spark query runner. Pull Request resolved: https://github.com/facebookincubator/velox/pull/10798 Reviewed By: xiaoxmeng Differential Revision: D62142058 Pulled By: bikramSingh91 fbshipit-source-id: 750f589e04abdd540d09d33da02d840fc41f4347 --- .../sparksql/fuzzer/SparkQueryRunner.cpp | 2 +- .../sparksql/fuzzer/SparkQueryRunner.h | 21 +++++++------------ .../fuzzer/tests/SparkQueryRunnerTest.cpp | 9 +++++--- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/velox/functions/sparksql/fuzzer/SparkQueryRunner.cpp b/velox/functions/sparksql/fuzzer/SparkQueryRunner.cpp index 01f3dd783417..89d05aeb604d 100644 --- a/velox/functions/sparksql/fuzzer/SparkQueryRunner.cpp +++ b/velox/functions/sparksql/fuzzer/SparkQueryRunner.cpp @@ -123,7 +123,7 @@ std::vector SparkQueryRunner::executeVector( // Write the input to a Parquet file. auto tempFile = exec::test::TempFilePath::create(); const auto& filePath = tempFile->getPath(); - auto writerPool = rootPool()->addAggregateChild("writer"); + auto writerPool = aggregatePool()->addAggregateChild("writer"); writeToFile(filePath, input, writerPool.get()); // Create temporary view 'tmp' in Spark by reading the generated Parquet file. diff --git a/velox/functions/sparksql/fuzzer/SparkQueryRunner.h b/velox/functions/sparksql/fuzzer/SparkQueryRunner.h index b458bec68ab5..038f3a82b04f 100644 --- a/velox/functions/sparksql/fuzzer/SparkQueryRunner.h +++ b/velox/functions/sparksql/fuzzer/SparkQueryRunner.h @@ -33,17 +33,20 @@ class SparkQueryRunner : public velox::exec::test::ReferenceQueryRunner { public: /// @param coordinatorUri Spark connect server endpoint, e.g. localhost:15002. SparkQueryRunner( - memory::MemoryPool* aggregatePool, + memory::MemoryPool* pool, const std::string& coordinatorUri, const std::string& userId, const std::string& userName) - : ReferenceQueryRunner(aggregatePool), + : ReferenceQueryRunner(pool), userId_(userId), userName_(userName), sessionId_(generateUUID()), stub_(spark::connect::SparkConnectService::NewStub(grpc::CreateChannel( coordinatorUri, - grpc::InsecureChannelCredentials()))){}; + grpc::InsecureChannelCredentials()))) { + pool_ = aggregatePool()->addLeafChild("leaf"); + copyPool_ = aggregatePool()->addLeafChild("copy"); + }; /// Converts Velox query plan to Spark SQL. Supports Values -> Aggregation. /// Values node is converted into reading from 'tmp' table. @@ -90,10 +93,6 @@ class SparkQueryRunner : public velox::exec::test::ReferenceQueryRunner { return pool_.get(); } - velox::memory::MemoryPool* rootPool() { - return rootPool_.get(); - } - // Reads the arrow IPC-format string data with arrow IPC reader and convert // them into Velox RowVectors. std::vector readArrowData(const std::string& data); @@ -111,11 +110,7 @@ class SparkQueryRunner : public velox::exec::test::ReferenceQueryRunner { const std::string sessionId_; // Used to make gRPC calls to the SparkConnectService. std::unique_ptr stub_; - std::shared_ptr rootPool_{ - velox::memory::memoryManager()->addRootPool()}; - std::shared_ptr pool_{ - rootPool_->addLeafChild("leaf")}; - std::shared_ptr copyPool_{ - rootPool_->addLeafChild("copy")}; + std::shared_ptr pool_; + std::shared_ptr copyPool_; }; } // namespace facebook::velox::functions::sparksql::fuzzer diff --git a/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp b/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp index e0bb8a3c76e3..50a1ff60b370 100644 --- a/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp +++ b/velox/functions/sparksql/fuzzer/tests/SparkQueryRunnerTest.cpp @@ -49,8 +49,9 @@ class SparkQueryRunnerTest : public ::testing::Test, // This test requires a Spark coordinator running at localhost, so disable it // by default. TEST_F(SparkQueryRunnerTest, DISABLED_basic) { + auto aggregatePool = rootPool_->addAggregateChild("basic"); auto queryRunner = std::make_unique( - pool(), "localhost:15002", "test", "basic"); + aggregatePool.get(), "localhost:15002", "test", "basic"); auto input = makeRowVector({ makeConstant(1, 25), @@ -93,8 +94,9 @@ TEST_F(SparkQueryRunnerTest, DISABLED_fuzzer) { .project({"a0", "array_sort(a1)"}) .planNode(); + auto aggregatePool = rootPool_->addAggregateChild("fuzzer"); auto queryRunner = std::make_unique( - pool(), "localhost:15002", "test", "fuzzer"); + aggregatePool.get(), "localhost:15002", "test", "fuzzer"); auto sql = queryRunner->toSql(plan); ASSERT_TRUE(sql.has_value()); @@ -107,8 +109,9 @@ TEST_F(SparkQueryRunnerTest, DISABLED_fuzzer) { } TEST_F(SparkQueryRunnerTest, toSql) { + auto aggregatePool = rootPool_->addAggregateChild("toSql"); auto queryRunner = std::make_unique( - pool(), "unused", "unused", "unused"); + aggregatePool.get(), "unused", "unused", "unused"); auto dataType = ROW({"c0", "c1", "c2"}, {DOUBLE(), DOUBLE(), BOOLEAN()}); auto plan = exec::test::PlanBuilder()