From c1cee9e8ff0f4dd7e82dc5176261a1160aa50281 Mon Sep 17 00:00:00 2001 From: Orri Erling Date: Tue, 19 Nov 2024 18:43:02 -0800 Subject: [PATCH] feat(runner): Add runner for local distributed execution (#11578) Summary: Reintroducing Orri's change based on PR: #11475 This diff attempts to fix the CMAKE errors of the first tow PRs and also enabling pyVelox. Original commit changeset: 97be48600342 Original Phabricator Diff: D65900074 Differential Revision: D66137319 --- .github/workflows/scheduled.yml | 1 + CMakeLists.txt | 2 + velox/CMakeLists.txt | 4 + velox/connectors/hive/HiveConnectorSplit.h | 108 ++++++ velox/connectors/hive/tests/CMakeLists.txt | 1 + velox/connectors/hive/tests/HiveSplitTest.cpp | 64 ++++ velox/exec/tests/TableScanTest.cpp | 30 +- velox/exec/tests/utils/CMakeLists.txt | 3 + velox/exec/tests/utils/Cursor.cpp | 20 ++ velox/exec/tests/utils/Cursor.h | 2 + .../tests/utils/DistributedPlanBuilder.cpp | 120 +++++++ .../exec/tests/utils/DistributedPlanBuilder.h | 82 +++++ .../exec/tests/utils/HiveConnectorTestBase.h | 106 +----- .../exec/tests/utils/LocalRunnerTestBase.cpp | 116 +++++++ velox/exec/tests/utils/LocalRunnerTestBase.h | 99 ++++++ velox/exec/tests/utils/PlanBuilder.h | 36 +- velox/runner/CMakeLists.txt | 27 ++ velox/runner/LocalRunner.cpp | 291 ++++++++++++++++ velox/runner/LocalRunner.h | 109 ++++++ velox/runner/LocalSchema.cpp | 321 ++++++++++++++++++ velox/runner/LocalSchema.h | 115 +++++++ velox/runner/MultiFragmentPlan.h | 96 ++++++ velox/runner/Runner.cpp | 59 ++++ velox/runner/Runner.h | 101 ++++++ velox/runner/Schema.h | 168 +++++++++ velox/runner/tests/CMakeLists.txt | 25 ++ velox/runner/tests/LocalRunnerTest.cpp | 175 ++++++++++ 27 files changed, 2166 insertions(+), 115 deletions(-) create mode 100644 velox/connectors/hive/tests/HiveSplitTest.cpp create mode 100644 velox/exec/tests/utils/DistributedPlanBuilder.cpp create mode 100644 velox/exec/tests/utils/DistributedPlanBuilder.h create mode 100644 velox/exec/tests/utils/LocalRunnerTestBase.cpp create mode 100644 velox/exec/tests/utils/LocalRunnerTestBase.h create mode 100644 velox/runner/CMakeLists.txt create mode 100644 velox/runner/LocalRunner.cpp create mode 100644 velox/runner/LocalRunner.h create mode 100644 velox/runner/LocalSchema.cpp create mode 100644 velox/runner/LocalSchema.h create mode 100644 velox/runner/MultiFragmentPlan.h create mode 100644 velox/runner/Runner.cpp create mode 100644 velox/runner/Runner.h create mode 100644 velox/runner/Schema.h create mode 100644 velox/runner/tests/CMakeLists.txt create mode 100644 velox/runner/tests/LocalRunnerTest.cpp diff --git a/.github/workflows/scheduled.yml b/.github/workflows/scheduled.yml index e6cf425881e68..cb72bc6a0b396 100644 --- a/.github/workflows/scheduled.yml +++ b/.github/workflows/scheduled.yml @@ -222,6 +222,7 @@ jobs: run: | python3 -m venv .venv source .venv/bin/activate + python3 -m pip install -e . - name: Create and test new function signatures diff --git a/CMakeLists.txt b/CMakeLists.txt index 99a791e56b21f..3a8dc7167ec16 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -121,6 +121,7 @@ option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF) option(VELOX_BUILD_PYTHON_PACKAGE "Builds Velox Python bindings" OFF) +option(VELOX_BUILD_RUNNER "Builds velox runner" ON) option( VELOX_ENABLE_INT64_BUILD_PARTITION_BOUND "make buildPartitionBounds_ a vector int64 instead of int32 to avoid integer overflow when the hashtable has billions of records" @@ -189,6 +190,7 @@ if(${VELOX_BUILD_PYTHON_PACKAGE}) set(VELOX_ENABLE_EXEC ON) set(VELOX_ENABLE_AGGREGATES ON) set(VELOX_ENABLE_SPARK_FUNCTIONS ON) + set(VELOX_BUILD_RUNNER OFF) endif() if(${VELOX_ENABLE_DUCKDB}) diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index fe2d23371aa86..f904135355fac 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -58,6 +58,10 @@ add_subdirectory(connectors) if(${VELOX_ENABLE_EXEC}) add_subdirectory(exec) + # Disable runner from pyvelox builds + if(${VELOX_BUILD_RUNNER}) + add_subdirectory(runner) + endif() endif() if(${VELOX_ENABLE_DUCKDB}) diff --git a/velox/connectors/hive/HiveConnectorSplit.h b/velox/connectors/hive/HiveConnectorSplit.h index a06ffa668e11a..99bca9b309cf4 100644 --- a/velox/connectors/hive/HiveConnectorSplit.h +++ b/velox/connectors/hive/HiveConnectorSplit.h @@ -111,4 +111,112 @@ struct HiveConnectorSplit : public connector::ConnectorSplit { static void registerSerDe(); }; +class HiveConnectorSplitBuilder { + public: + explicit HiveConnectorSplitBuilder(std::string filePath) + : filePath_{std::move(filePath)} { + infoColumns_["$path"] = filePath_; + } + + HiveConnectorSplitBuilder& start(uint64_t start) { + start_ = start; + return *this; + } + + HiveConnectorSplitBuilder& length(uint64_t length) { + length_ = length; + return *this; + } + + HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { + splitWeight_ = splitWeight; + return *this; + } + + HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { + fileFormat_ = format; + return *this; + } + + HiveConnectorSplitBuilder& infoColumn( + const std::string& name, + const std::string& value) { + infoColumns_.emplace(std::move(name), std::move(value)); + return *this; + } + + HiveConnectorSplitBuilder& partitionKey( + std::string name, + std::optional value) { + partitionKeys_.emplace(std::move(name), std::move(value)); + return *this; + } + + HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { + tableBucketNumber_ = bucket; + infoColumns_["$bucket"] = std::to_string(bucket); + return *this; + } + + HiveConnectorSplitBuilder& customSplitInfo( + const std::unordered_map& customSplitInfo) { + customSplitInfo_ = customSplitInfo; + return *this; + } + + HiveConnectorSplitBuilder& extraFileInfo( + const std::shared_ptr& extraFileInfo) { + extraFileInfo_ = extraFileInfo; + return *this; + } + + HiveConnectorSplitBuilder& serdeParameters( + const std::unordered_map& serdeParameters) { + serdeParameters_ = serdeParameters; + return *this; + } + + HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { + connectorId_ = connectorId; + return *this; + } + + HiveConnectorSplitBuilder& fileProperties(FileProperties fileProperties) { + fileProperties_ = fileProperties; + return *this; + } + + std::shared_ptr build() const { + return std::make_shared( + connectorId_, + filePath_, + fileFormat_, + start_, + length_, + partitionKeys_, + tableBucketNumber_, + customSplitInfo_, + extraFileInfo_, + serdeParameters_, + splitWeight_, + infoColumns_, + fileProperties_); + } + + private: + const std::string filePath_; + dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; + uint64_t start_{0}; + uint64_t length_{std::numeric_limits::max()}; + std::unordered_map> partitionKeys_; + std::optional tableBucketNumber_; + std::unordered_map customSplitInfo_ = {}; + std::shared_ptr extraFileInfo_ = {}; + std::unordered_map serdeParameters_ = {}; + std::unordered_map infoColumns_ = {}; + std::string connectorId_; + int64_t splitWeight_{0}; + std::optional fileProperties_; +}; + } // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index f4235cfa13c05..ad3a373dab119 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable( HiveConnectorSerDeTest.cpp HivePartitionFunctionTest.cpp HivePartitionUtilTest.cpp + HiveSplitTest.cpp PartitionIdGeneratorTest.cpp TableHandleTest.cpp) add_test(velox_hive_connector_test velox_hive_connector_test) diff --git a/velox/connectors/hive/tests/HiveSplitTest.cpp b/velox/connectors/hive/tests/HiveSplitTest.cpp new file mode 100644 index 0000000000000..91412caaee2d6 --- /dev/null +++ b/velox/connectors/hive/tests/HiveSplitTest.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gtest/gtest.h" +#include "velox/common/config/Config.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector::hive; + +TEST(HiveSplitTest, builder) { + FileProperties properties = {11, 1111}; + auto extra = std::make_shared("extra file info"); + std::unordered_map custom; + custom["custom1"] = "customValue1"; + std::unordered_map serde; + serde["serde1"] = "serdeValue1"; + auto split = HiveConnectorSplitBuilder("filepath") + .start(100) + .length(100000) + .splitWeight(1) + .fileFormat(dwio::common::FileFormat::DWRF) + .infoColumn("info1", "infoValue1") + .partitionKey("DS", "2024-11-01") + .tableBucketNumber(11) + .customSplitInfo(custom) + .extraFileInfo(extra) + .serdeParameters(serde) + .connectorId("connectorId") + .fileProperties(properties) + .build(); + + EXPECT_EQ(100, split->start); + EXPECT_EQ(100000, split->length); + EXPECT_EQ(1, split->splitWeight); + EXPECT_TRUE(dwio::common::FileFormat::DWRF == split->fileFormat); + EXPECT_EQ("infoValue1", split->infoColumns["info1"]); + auto it = split->partitionKeys.find("DS"); + EXPECT_TRUE(it != split->partitionKeys.end()); + EXPECT_EQ("2024-11-01", it->second.value()); + EXPECT_EQ(11, split->tableBucketNumber.value()); + EXPECT_EQ("customValue1", split->customSplitInfo["custom1"]); + EXPECT_EQ(std::string("extra file info"), *split->extraFileInfo); + EXPECT_EQ("serdeValue1", split->serdeParameters["serde1"]); + EXPECT_EQ("connectorId", split->connectorId); + EXPECT_EQ( + properties.fileSize.value(), split->properties.value().fileSize.value()); + EXPECT_EQ( + properties.modificationTime.value(), + split->properties.value().modificationTime.value()); +} diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index dc74ef0fb3b64..42cee37afb365 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -180,7 +180,7 @@ class TableScanTest : public virtual HiveConnectorTestBase { const std::string& filePath, const TypePtr& partitionType, const std::optional& partitionValue) { - auto split = HiveConnectorSplitBuilder(filePath) + auto split = exec::test::HiveConnectorSplitBuilder(filePath) .partitionKey("pkey", partitionValue) .build(); auto outputType = @@ -411,7 +411,7 @@ TEST_F(TableScanTest, partitionKeyAlias) { {"a", regularColumn("c0", BIGINT())}, {"ds_alias", partitionKey("ds", VARCHAR())}}; - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); @@ -1806,7 +1806,8 @@ TEST_F(TableScanTest, splitOffsetAndLength) { } TEST_F(TableScanTest, fileNotFound) { - auto split = HiveConnectorSplitBuilder("/path/to/nowhere.orc").build(); + auto split = + exec::test::HiveConnectorSplitBuilder("/path/to/nowhere.orc").build(); auto assertMissingFile = [&](bool ignoreMissingFiles) { AssertQueryBuilder(tableScanNode()) .connectorSessionProperty( @@ -1829,7 +1830,7 @@ TEST_F(TableScanTest, validFileNoData) { auto filePath = facebook::velox::test::getDataFilePath( "velox/exec/tests", "data/emptyPresto.dwrf"); - auto split = HiveConnectorSplitBuilder(filePath) + auto split = exec::test::HiveConnectorSplitBuilder(filePath) .start(0) .length(fs::file_size(filePath) / 2) .build(); @@ -1949,7 +1950,7 @@ TEST_F(TableScanTest, partitionedTableDateKey) { // Test partition filter on date column. { - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("pkey", partitionValue) .build(); auto outputType = ROW({"pkey", "c0", "c1"}, {DATE(), BIGINT(), DOUBLE()}); @@ -2851,9 +2852,10 @@ TEST_F(TableScanTest, bucket) { writeToFile(filePaths[i]->getPath(), rowVector); rowVectors.emplace_back(rowVector); - splits.emplace_back(HiveConnectorSplitBuilder(filePaths[i]->getPath()) - .tableBucketNumber(bucket) - .build()); + splits.emplace_back( + exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) + .tableBucketNumber(bucket) + .build()); } createDuckDbTable(rowVectors); @@ -2877,7 +2879,7 @@ TEST_F(TableScanTest, bucket) { for (int i = 0; i < buckets.size(); ++i) { int bucketValue = buckets[i]; - auto hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath()) + auto hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) .tableBucketNumber(bucketValue) .build(); @@ -2897,7 +2899,7 @@ TEST_F(TableScanTest, bucket) { // Filter on bucket column, but don't project it out auto rowTypes = ROW({"c0", "c1"}, {INTEGER(), BIGINT()}); - hsplit = HiveConnectorSplitBuilder(filePaths[i]->getPath()) + hsplit = exec::test::HiveConnectorSplitBuilder(filePaths[i]->getPath()) .tableBucketNumber(bucketValue) .build(); op = PlanBuilder() @@ -4168,7 +4170,7 @@ TEST_F(TableScanTest, reuseRowVector) { .tableScan(rowType, {}, "c0 < 5") .project({"c1.c0"}) .planNode(); - auto split = HiveConnectorSplitBuilder(file->getPath()).build(); + auto split = exec::test::HiveConnectorSplitBuilder(file->getPath()).build(); auto expected = makeRowVector( {makeFlatVector(10, [](auto i) { return i % 5; })}); AssertQueryBuilder(plan).splits({split, split}).assertResults(expected); @@ -4749,7 +4751,7 @@ TEST_F(TableScanTest, varbinaryPartitionKey) { {"a", regularColumn("c0", BIGINT())}, {"ds_alias", partitionKey("ds", VARBINARY())}}; - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); @@ -4788,7 +4790,7 @@ TEST_F(TableScanTest, timestampPartitionKey) { ColumnHandleMap assignments = {{"t", partitionKey("t", TIMESTAMP())}}; std::vector> splits; for (auto& t : inputs) { - splits.push_back(HiveConnectorSplitBuilder(filePath->getPath()) + splits.push_back(exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("t", t) .build()); } @@ -4807,7 +4809,7 @@ TEST_F(TableScanTest, partitionKeyNotMatchPartitionKeysHandle) { writeToFile(filePath->getPath(), vectors); createDuckDbTable(vectors); - auto split = HiveConnectorSplitBuilder(filePath->getPath()) + auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath()) .partitionKey("ds", "2021-12-02") .build(); diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index c2d227410c1e7..8e78d65f380d6 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -22,8 +22,10 @@ add_library( AssertQueryBuilder.cpp ArbitratorTestUtil.cpp Cursor.cpp + DistributedPlanBuilder.cpp HiveConnectorTestBase.cpp LocalExchangeSource.cpp + LocalRunnerTestBase.cpp OperatorTestBase.cpp PlanBuilder.cpp QueryAssertions.cpp @@ -35,6 +37,7 @@ add_library( target_link_libraries( velox_exec_test_lib + velox_local_runner velox_vector_test_lib velox_temp_path velox_core diff --git a/velox/exec/tests/utils/Cursor.cpp b/velox/exec/tests/utils/Cursor.cpp index 1aed8908206b9..ad7b4133c6c73 100644 --- a/velox/exec/tests/utils/Cursor.cpp +++ b/velox/exec/tests/utils/Cursor.cpp @@ -275,6 +275,10 @@ class MultiThreadedTaskCursor : public TaskCursorBase { /// Starts the task if not started yet. bool moveNext() override { start(); + if (error_) { + std::rethrow_exception(error_); + } + current_ = queue_->dequeue(); if (task_->error()) { // Wait for the task to finish (there's' a small period of time between @@ -302,6 +306,13 @@ class MultiThreadedTaskCursor : public TaskCursorBase { return current_; } + void setError(std::exception_ptr error) override { + error_ = error; + if (task_) { + task_->setError(error); + } + } + const std::shared_ptr& task() override { return task_; } @@ -316,6 +327,7 @@ class MultiThreadedTaskCursor : public TaskCursorBase { std::shared_ptr task_; RowVectorPtr current_; bool atEnd_{false}; + std::exception_ptr error_; }; class SingleThreadedTaskCursor : public TaskCursorBase { @@ -391,6 +403,13 @@ class SingleThreadedTaskCursor : public TaskCursorBase { return current_; } + void setError(std::exception_ptr error) override { + error_ = error; + if (task_) { + task_->setError(error); + } + } + const std::shared_ptr& task() override { return task_; } @@ -399,6 +418,7 @@ class SingleThreadedTaskCursor : public TaskCursorBase { std::shared_ptr task_; RowVectorPtr current_; RowVectorPtr next_; + std::exception_ptr error_; }; std::unique_ptr TaskCursor::create(const CursorParameters& params) { diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index 314eb89587609..9c8cfaf7d7fa0 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -139,6 +139,8 @@ class TaskCursor { virtual RowVectorPtr& current() = 0; + virtual void setError(std::exception_ptr error) = 0; + virtual const std::shared_ptr& task() = 0; }; diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.cpp b/velox/exec/tests/utils/DistributedPlanBuilder.cpp new file mode 100644 index 0000000000000..fddb79a6f3caf --- /dev/null +++ b/velox/exec/tests/utils/DistributedPlanBuilder.cpp @@ -0,0 +1,120 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/tests/utils/DistributedPlanBuilder.h" + +namespace facebook::velox::exec::test { + +DistributedPlanBuilder::DistributedPlanBuilder( + const runner::MultiFragmentPlan::Options& options, + std::shared_ptr planNodeIdGenerator, + memory::MemoryPool* pool) + : PlanBuilder(planNodeIdGenerator, pool), options_(options), root_(this) { + root_->stack_.push_back(this); + newFragment(); + current_->width = options_.numWorkers; +} + +DistributedPlanBuilder::DistributedPlanBuilder(DistributedPlanBuilder& root) + : PlanBuilder(root.planNodeIdGenerator(), root.pool()), + options_(root.options_), + root_(&root) { + root_->stack_.push_back(this); + newFragment(); + current_->width = options_.numWorkers; +} + +std::vector DistributedPlanBuilder::fragments() { + newFragment(); + return std::move(fragments_); +} + +void DistributedPlanBuilder::newFragment() { + if (current_) { + gatherScans(planNode_); + current_->fragment = core::PlanFragment(std::move(planNode_)); + fragments_.push_back(std::move(*current_)); + } + current_ = std::make_unique( + fmt::format("{}.{}", options_.queryId, root_->fragmentCounter_++)); + planNode_ = nullptr; +} + +PlanBuilder& DistributedPlanBuilder::shuffle( + const std::vector& partitionKeys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout) { + partitionedOutput( + partitionKeys, numPartitions, replicateNullsAndAny, outputLayout); + auto* output = + dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(output); + auto producerPrefix = current_->taskPrefix; + newFragment(); + current_->width = numPartitions; + exchange(output->outputType(), VectorSerde::Kind::kPresto); + auto* exchange = dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(exchange); + current_->inputStages.push_back( + runner::InputStage{exchange->id(), producerPrefix}); + return *this; +} + +core::PlanNodePtr DistributedPlanBuilder::shuffleResult( + const std::vector& partitionKeys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout) { + partitionedOutput( + partitionKeys, numPartitions, replicateNullsAndAny, outputLayout); + auto* output = + dynamic_cast(planNode_.get()); + VELOX_CHECK_NOT_NULL(output); + const auto producerPrefix = current_->taskPrefix; + auto result = planNode_; + newFragment(); + root_->stack_.pop_back(); + auto* consumer = root_->stack_.back(); + if (consumer->current_->width != 0) { + VELOX_CHECK_EQ( + numPartitions, + consumer->current_->width, + "The consumer width should match the producer fanout"); + } else { + consumer->current_->width = numPartitions; + } + + for (auto& fragment : fragments_) { + root_->fragments_.push_back(std::move(fragment)); + } + exchange(output->outputType(), VectorSerde::Kind::kPresto); + auto* exchange = dynamic_cast(planNode_.get()); + consumer->current_->inputStages.push_back( + runner::InputStage{exchange->id(), producerPrefix}); + return std::move(planNode_); +} + +void DistributedPlanBuilder::gatherScans(const core::PlanNodePtr& plan) { + if (auto scan = std::dynamic_pointer_cast(plan)) { + current_->scans.push_back(scan); + return; + } + for (auto& source : plan->sources()) { + gatherScans(source); + } +} +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/DistributedPlanBuilder.h b/velox/exec/tests/utils/DistributedPlanBuilder.h new file mode 100644 index 0000000000000..ba091ef8c66a6 --- /dev/null +++ b/velox/exec/tests/utils/DistributedPlanBuilder.h @@ -0,0 +1,82 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/runner/MultiFragmentPlan.h" + +namespace facebook::velox::exec::test { + +/// Builder for distributed plans. Adds a shuffle() and related +/// methods for building PartitionedOutput-Exchange pairs between +/// fragments. Not thread safe. +class DistributedPlanBuilder : public PlanBuilder { + public: + /// Constructs a top level DistributedPlanBuilder. + DistributedPlanBuilder( + const runner::MultiFragmentPlan::Options& options, + std::shared_ptr planNodeIdGenerator, + memory::MemoryPool* pool = nullptr); + + /// Constructs a child builder. Used for branching plans, e.g. the subplan for + /// a join build side. + DistributedPlanBuilder(DistributedPlanBuilder& root); + + /// Returns the planned fragments. The builder will be empty after this. This + /// is only called on the root builder. + std::vector fragments(); + + PlanBuilder& shuffle( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) override; + + core::PlanNodePtr shuffleResult( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) override; + + private: + void newFragment(); + + void gatherScans(const core::PlanNodePtr& plan); + + const runner::MultiFragmentPlan::Options& options_; + DistributedPlanBuilder* const root_; + + // Stack of outstanding builders. The last element is the immediately + // enclosing one. When returning an ExchangeNode from returnShuffle, the stack + // is used to establish the linkage between the fragment of the returning + // builder and the fragment current in the calling builder. Only filled in the + // root builder. + std::vector stack_; + + // Fragment counter. Only used in root builder. + int32_t fragmentCounter_{0}; + + // The fragment being built. Will be moved to the root builder's 'fragments_' + // when complete. + std::unique_ptr current_; + + // The fragments gathered under this builder. Moved to the root builder when + // returning the subplan. + std::vector fragments_; +}; + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 74dd5223307b9..a53b2634ef4b5 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -223,109 +223,15 @@ class HiveConnectorTestBase : public OperatorTestBase { } }; -class HiveConnectorSplitBuilder { +/// Same as connector::hive::HiveConnectorBuilder, except that this defaults +/// connectorId to kHiveConnectorId. +class HiveConnectorSplitBuilder + : public connector::hive::HiveConnectorSplitBuilder { public: explicit HiveConnectorSplitBuilder(std::string filePath) - : filePath_{std::move(filePath)} { - infoColumns_["$path"] = filePath_; + : connector::hive::HiveConnectorSplitBuilder(filePath) { + connectorId(kHiveConnectorId); } - - HiveConnectorSplitBuilder& start(uint64_t start) { - start_ = start; - return *this; - } - - HiveConnectorSplitBuilder& length(uint64_t length) { - length_ = length; - return *this; - } - - HiveConnectorSplitBuilder& splitWeight(int64_t splitWeight) { - splitWeight_ = splitWeight; - return *this; - } - - HiveConnectorSplitBuilder& fileFormat(dwio::common::FileFormat format) { - fileFormat_ = format; - return *this; - } - - HiveConnectorSplitBuilder& infoColumn( - const std::string& name, - const std::string& value) { - infoColumns_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& partitionKey( - std::string name, - std::optional value) { - partitionKeys_.emplace(std::move(name), std::move(value)); - return *this; - } - - HiveConnectorSplitBuilder& tableBucketNumber(int32_t bucket) { - tableBucketNumber_ = bucket; - infoColumns_["$bucket"] = std::to_string(bucket); - return *this; - } - - HiveConnectorSplitBuilder& customSplitInfo( - const std::unordered_map& customSplitInfo) { - customSplitInfo_ = customSplitInfo; - return *this; - } - - HiveConnectorSplitBuilder& extraFileInfo( - const std::shared_ptr& extraFileInfo) { - extraFileInfo_ = extraFileInfo; - return *this; - } - - HiveConnectorSplitBuilder& serdeParameters( - const std::unordered_map& serdeParameters) { - serdeParameters_ = serdeParameters; - return *this; - } - - HiveConnectorSplitBuilder& connectorId(const std::string& connectorId) { - connectorId_ = connectorId; - return *this; - } - - std::shared_ptr build() const { - static const std::unordered_map customSplitInfo; - static const std::shared_ptr extraFileInfo; - static const std::unordered_map serdeParameters; - return std::make_shared( - connectorId_, - filePath_, - fileFormat_, - start_, - length_, - partitionKeys_, - tableBucketNumber_, - customSplitInfo, - extraFileInfo, - serdeParameters, - splitWeight_, - infoColumns_, - std::nullopt); - } - - private: - const std::string filePath_; - dwio::common::FileFormat fileFormat_{dwio::common::FileFormat::DWRF}; - uint64_t start_{0}; - uint64_t length_{std::numeric_limits::max()}; - std::unordered_map> partitionKeys_; - std::optional tableBucketNumber_; - std::unordered_map customSplitInfo_ = {}; - std::shared_ptr extraFileInfo_ = {}; - std::unordered_map serdeParameters_ = {}; - std::unordered_map infoColumns_ = {}; - std::string connectorId_ = kHiveConnectorId; - int64_t splitWeight_{0}; }; } // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/LocalRunnerTestBase.cpp b/velox/exec/tests/utils/LocalRunnerTestBase.cpp new file mode 100644 index 0000000000000..c0572217644b9 --- /dev/null +++ b/velox/exec/tests/utils/LocalRunnerTestBase.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/tests/utils/LocalRunnerTestBase.h" +namespace facebook::velox::exec::test { + +void LocalRunnerTestBase::SetUp() { + HiveConnectorTestBase::SetUp(); + exec::ExchangeSource::factories().clear(); + exec::ExchangeSource::registerFactory(createLocalExchangeSource); + ensureTestData(); +} + +std::shared_ptr LocalRunnerTestBase::makeQueryCtx( + const std::string& queryId, + memory::MemoryPool* rootPool) { + auto config = config_; + auto hiveConfig = hiveConfig_; + std::unordered_map> + connectorConfigs; + auto copy = hiveConfig_; + connectorConfigs[kHiveConnectorId] = + std::make_shared(std::move(copy)); + + return core::QueryCtx::create( + schemaExecutor_.get(), + core::QueryConfig(config), + std::move(connectorConfigs), + cache::AsyncDataCache::getInstance(), + rootPool->shared_from_this(), + nullptr, + queryId); +} + +void LocalRunnerTestBase::ensureTestData() { + if (!files_) { + makeTables(testTables_, files_); + } + makeSchema(); + splitSourceFactory_ = + std::make_shared(schema_, 2); +} + +void LocalRunnerTestBase::makeSchema() { + auto schemaQueryCtx = makeQueryCtx("schema", rootPool_.get()); + common::SpillConfig spillConfig; + common::PrefixSortConfig prefixSortConfig(100, 130); + auto leafPool = schemaQueryCtx->pool()->addLeafChild("schemaReader"); + auto connectorQueryCtx = std::make_shared( + leafPool.get(), + schemaQueryCtx->pool(), + schemaQueryCtx->connectorSessionProperties(kHiveConnectorId), + &spillConfig, + prefixSortConfig, + std::make_unique( + schemaQueryCtx.get(), schemaPool_.get()), + schemaQueryCtx->cache(), + "scan_for_schema", + "schema", + "N/a", + 0, + schemaQueryCtx->queryConfig().sessionTimezone()); + auto connector = connector::getConnector(kHiveConnectorId); + schema_ = std::make_shared( + files_->getPath(), + dwio::common::FileFormat::DWRF, + reinterpret_cast(connector.get()), + connectorQueryCtx); +} + +void LocalRunnerTestBase::makeTables( + std::vector specs, + std::shared_ptr& directory) { + directory = exec::test::TempDirectoryPath::create(); + for (auto& spec : specs) { + auto tablePath = fmt::format("{}/{}", directory->getPath(), spec.name); + auto fs = filesystems::getFileSystem(tablePath, {}); + fs->mkdir(tablePath); + for (auto i = 0; i < spec.numFiles; ++i) { + auto vectors = HiveConnectorTestBase::makeVectors( + spec.columns, spec.numVectorsPerFile, spec.rowsPerVector); + if (spec.customizeData) { + for (auto& vector : vectors) { + spec.customizeData(vector); + } + } + writeToFile(fmt::format("{}/f{}", tablePath, i), vectors); + } + } +} + +std::vector readCursor( + std::shared_ptr runner) { + // 'result' borrows memory from cursor so the life cycle must be shorter. + std::vector result; + + while (auto rows = runner->next()) { + result.push_back(rows); + } + return result; +} + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/LocalRunnerTestBase.h b/velox/exec/tests/utils/LocalRunnerTestBase.h new file mode 100644 index 0000000000000..4558a4bdd6ffa --- /dev/null +++ b/velox/exec/tests/utils/LocalRunnerTestBase.h @@ -0,0 +1,99 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/exec/ExchangeSource.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/LocalExchangeSource.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/runner/LocalRunner.h" + +namespace facebook::velox::exec::test { + +struct TableSpec { + std::string name; + RowTypePtr columns; + int32_t rowsPerVector{10000}; + int32_t numVectorsPerFile{5}; + int32_t numFiles{5}; + + /// Function Applied to generated RowVectors for the table before writing. + /// May be used to insert non-random data on top of the random datafrom + /// HiveConnectorTestBase::makeVectors. + std::function customizeData; +}; + +/// Test helper class that manages a TestCase with a set of generated tables and +/// a LocalSchema and LocalSplitSource covering the test data. The lifetime of +/// the test data is the test case consisting of multiple TEST_F's. +class LocalRunnerTestBase : public HiveConnectorTestBase { + protected: + static void SetUpTestCase() { + HiveConnectorTestBase::SetUpTestCase(); + schemaExecutor_ = std::make_unique(4); + } + + static void TearDownTestCase() { + files_.reset(); + HiveConnectorTestBase::TearDownTestCase(); + } + + void SetUp() override; + + void ensureTestData(); + void makeSchema(); + + void makeTables( + std::vector specs, + std::shared_ptr& directory); + + std::shared_ptr splitSourceFactory( + const runner::LocalSchema& schema); + + // Creates a QueryCtx with 'pool'. 'pool' must be a root pool. + static std::shared_ptr makeQueryCtx( + const std::string& queryId, + memory::MemoryPool* pool); + + // Configs for creating QueryCtx. + inline static std::unordered_map config_; + inline static std::unordered_map hiveConfig_; + + // The specification of the test data. The data is created in ensureTestData() + // called from each SetUp()(. + inline static std::vector testTables_; + + // The top level directory with the test data. + inline static std::shared_ptr files_; + inline static std::unique_ptr schemaExecutor_; + + // The schema built from the data in 'files_'. + std::shared_ptr schema_; + + // Split source factory for making SplitSources that range over tables inside + // 'files_'. + std::shared_ptr splitSourceFactory_; + + // Leaf pool for schema. + std::shared_ptr schemaPool_; +}; + +/// Reads all results from 'runner'. +std::vector readCursor( + std::shared_ptr runner); + +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index f380ef4f03c7d..5ccf3a2167756 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -105,6 +105,8 @@ class PlanBuilder { planNodeIdGenerator_{std::move(planNodeIdGenerator)}, pool_{pool} {} + virtual ~PlanBuilder() = default; + static constexpr const std::string_view kHiveDefaultConnectorId{"test-hive"}; static constexpr const std::string_view kTpchDefaultConnectorId{"test-tpch"}; @@ -1029,7 +1031,9 @@ class PlanBuilder { return *this; } - /// Return the latest plan node, e.g. the root node of the plan tree. + /// Return the latest plan node, e.g. the root node of the plan + /// tree. The DistributedPlanBuilder override additionally moves stage + /// information to a parent PlanBuilder. const core::PlanNodePtr& planNode() const { return planNode_; } @@ -1054,6 +1058,28 @@ class PlanBuilder { return *this; } + /// In a DistributedPlanBuilder, introduces a shuffle boundary. The plan so + /// far is shuffled and subsequent nodes consume the shuffle. Arguments are as + /// in partitionedOutput(). + virtual PlanBuilder& shuffle( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) { + VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); + } + + /// In a DistributedPlanBuilder, returns an Exchange on top of the plan built + /// so far and couples it to the current stage in the enclosing builder. + /// Arguments are as in shuffle(). + virtual core::PlanNodePtr shuffleResult( + const std::vector& keys, + int numPartitions, + bool replicateNullsAndAny, + const std::vector& outputLayout = {}) { + VELOX_UNSUPPORTED("Needs DistributedPlanBuilder"); + } + protected: // Users who create custom operators might want to extend the PlanBuilder to // customize extended plan builders. Those functions are needed in such @@ -1063,6 +1089,14 @@ class PlanBuilder { std::shared_ptr inferTypes( const core::ExprPtr& untypedExpr); + std::shared_ptr planNodeIdGenerator() const { + return planNodeIdGenerator_; + } + + memory::MemoryPool* pool() const { + return pool_; + } + private: std::shared_ptr field(column_index_t index); diff --git a/velox/runner/CMakeLists.txt b/velox/runner/CMakeLists.txt new file mode 100644 index 0000000000000..4bac074d7af9a --- /dev/null +++ b/velox/runner/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_subdirectory(tests) + +velox_add_library(velox_local_runner LocalRunner.cpp LocalSchema.cpp Runner.cpp) + +velox_link_libraries( + velox_local_runner + velox_common_base + velox_memory + velox_hive_connector + velox_dwio_common + velox_dwio_dwrf_writer + velox_exec_test_lib + velox_exec) diff --git a/velox/runner/LocalRunner.cpp b/velox/runner/LocalRunner.cpp new file mode 100644 index 0000000000000..d18a325284167 --- /dev/null +++ b/velox/runner/LocalRunner.cpp @@ -0,0 +1,291 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/runner/LocalRunner.h" +#include "velox/common/time/Timer.h" + +#include "velox/connectors/hive/HiveConnectorSplit.h" + +namespace facebook::velox::runner { +namespace { +std::shared_ptr remoteSplit( + const std::string& taskId) { + return std::make_shared(taskId); +} +} // namespace + +RowVectorPtr LocalRunner::next() { + if (!cursor_) { + start(); + } + bool hasNext = cursor_->moveNext(); + if (!hasNext) { + state_ = State::kFinished; + return nullptr; + } + return cursor_->current(); +} + +void LocalRunner::start() { + VELOX_CHECK_EQ(state_, State::kInitialized); + auto lastStage = makeStages(); + params_.planNode = plan_->fragments().back().fragment.planNode; + auto cursor = exec::test::TaskCursor::create(params_); + stages_.push_back({cursor->task()}); + // Add table scan splits to the final gathere stage. + for (auto& scan : fragments_.back().scans) { + auto source = splitSourceFactory_->splitSourceForScan(*scan); + for (;;) { + auto split = source->next(0); + if (!split.hasConnectorSplit()) { + break; + } + cursor->task()->addSplit(scan->id(), std::move(split)); + } + cursor->task()->noMoreSplits(scan->id()); + } + // If the plan only has the final gather stage, there are no shuffles between + // the last + // and previous stages to set up. + if (!lastStage.empty()) { + const auto finalStageConsumer = + fragments_.back().inputStages[0].consumerNodeId; + for (auto& remote : lastStage) { + cursor->task()->addSplit(finalStageConsumer, exec::Split(remote)); + } + cursor->task()->noMoreSplits(finalStageConsumer); + } + { + std::lock_guard l(mutex_); + if (!error_) { + cursor_ = std::move(cursor); + state_ = State::kRunning; + } + } + if (!cursor_) { + // The cursor was not set because previous fragments had an error. + abort(); + std::rethrow_exception(error_); + } +} + +void LocalRunner::abort() { + // If called without previous error, we set the error to be cancellation. + if (!error_) { + try { + state_ = State::kCancelled; + VELOX_FAIL("Query cancelled"); + } catch (const std::exception& e) { + error_ = std::current_exception(); + } + } + VELOX_CHECK(state_ != State::kInitialized); + // Setting errors is thred safe. The stages do not change after + // initialization. + for (auto& stage : stages_) { + for (auto& task : stage) { + task->setError(error_); + } + } + if (cursor_) { + cursor_->setError(error_); + } +} + +void LocalRunner::waitForCompletion(int32_t maxWaitUs) { + VELOX_CHECK_NE(state_, State::kInitialized); + std::vector futures; + { + std::lock_guard l(mutex_); + for (auto& stage : stages_) { + for (auto& task : stage) { + futures.push_back(task->taskDeletionFuture()); + } + stage.clear(); + } + } + auto startTime = getCurrentTimeMicro(); + for (auto& future : futures) { + auto& executor = folly::QueuedImmediateExecutor::instance(); + if (getCurrentTimeMicro() - startTime > maxWaitUs) { + VELOX_FAIL("LocalRunner did not finish within {} us", maxWaitUs); + } + std::move(future) + .within(std::chrono::microseconds(maxWaitUs)) + .via(&executor) + .wait(); + } +} + +std::vector> +LocalRunner::makeStages() { + std::unordered_map stageMap; + auto sharedRunner = shared_from_this(); + auto onError = [self = sharedRunner, this](std::exception_ptr error) { + { + std::lock_guard l(mutex_); + if (error_) { + return; + } + state_ = State::kError; + error_ = error; + } + if (cursor_) { + abort(); + } + }; + + for (auto fragmentIndex = 0; fragmentIndex < fragments_.size() - 1; + ++fragmentIndex) { + auto& fragment = fragments_[fragmentIndex]; + stageMap[fragment.taskPrefix] = stages_.size(); + stages_.emplace_back(); + for (auto i = 0; i < fragment.width; ++i) { + exec::Consumer consumer = nullptr; + auto task = exec::Task::create( + fmt::format( + "local://{}/{}.{}", + params_.queryCtx->queryId(), + fragment.taskPrefix, + i), + fragment.fragment, + i, + params_.queryCtx, + exec::Task::ExecutionMode::kParallel, + consumer, + onError); + stages_.back().push_back(task); + if (fragment.numBroadcastDestinations) { + // TODO: Add support for Arbitrary partition type. + task->updateOutputBuffers(fragment.numBroadcastDestinations, true); + } + task->start(options_.numDrivers); + } + } + + for (auto fragmentIndex = 0; fragmentIndex < fragments_.size() - 1; + ++fragmentIndex) { + auto& fragment = fragments_[fragmentIndex]; + for (auto& scan : fragment.scans) { + auto source = splitSourceFactory_->splitSourceForScan(*scan); + bool allDone = false; + do { + for (auto i = 0; i < stages_[fragmentIndex].size(); ++i) { + auto split = source->next(i); + if (!split.hasConnectorSplit()) { + allDone = true; + break; + } + stages_[fragmentIndex][i]->addSplit(scan->id(), std::move(split)); + } + } while (!allDone); + } + for (auto& scan : fragment.scans) { + for (auto i = 0; i < stages_[fragmentIndex].size(); ++i) { + stages_[fragmentIndex][i]->noMoreSplits(scan->id()); + } + } + + for (auto& input : fragment.inputStages) { + const auto sourceStage = stageMap[input.producerTaskPrefix]; + std::vector> sourceSplits; + for (auto i = 0; i < stages_[sourceStage].size(); ++i) { + sourceSplits.push_back(remoteSplit(stages_[sourceStage][i]->taskId())); + } + for (auto& task : stages_[fragmentIndex]) { + for (auto& remote : sourceSplits) { + task->addSplit(input.consumerNodeId, exec::Split(remote)); + } + task->noMoreSplits(input.consumerNodeId); + } + } + } + if (stages_.empty()) { + return {}; + } + std::vector> lastStage; + for (auto& task : stages_.back()) { + lastStage.push_back(remoteSplit(task->taskId())); + } + return lastStage; +} + +exec::Split LocalSplitSource::next(int32_t /*worker*/) { + if (currentFile_ >= static_cast(table_->files().size())) { + return exec::Split(); + } + + if (currentSplit_ >= fileSplits_.size()) { + fileSplits_.clear(); + ++currentFile_; + if (currentFile_ >= table_->files().size()) { + return exec::Split(); + } + + currentSplit_ = 0; + auto filePath = table_->files()[currentFile_]; + const auto fileSize = fs::file_size(filePath); + // Take the upper bound. + const int splitSize = std::ceil((fileSize) / splitsPerFile_); + for (int i = 0; i < splitsPerFile_; ++i) { + fileSplits_.push_back( + connector::hive::HiveConnectorSplitBuilder(filePath) + .connectorId(table_->schema()->connector()->connectorId()) + .fileFormat(table_->format()) + .start(i * splitSize) + .length(splitSize) + .build()); + } + } + return exec::Split(std::move(fileSplits_[currentSplit_++])); +} + +std::unique_ptr LocalSplitSourceFactory::splitSourceForScan( + const core::TableScanNode& tableScan) { + auto* tableHandle = dynamic_cast( + tableScan.tableHandle().get()); + VELOX_CHECK_NOT_NULL(tableHandle); + auto* table = reinterpret_cast( + schema_->findTable(tableHandle->tableName())); + + return std::make_unique(table, splitsPerFile_); +} + +std::vector LocalRunner::stats() const { + std::vector result; + std::lock_guard l(mutex_); + for (auto i = 0; i < stages_.size(); ++i) { + auto& tasks = stages_[i]; + VELOX_CHECK(!tasks.empty()); + auto stats = tasks[0]->taskStats(); + for (auto j = 1; j < tasks.size(); ++j) { + auto moreStats = tasks[j]->taskStats(); + for (auto pipeline = 0; pipeline < stats.pipelineStats.size(); + ++pipeline) { + for (auto op = 0; + op < stats.pipelineStats[pipeline].operatorStats.size(); + ++op) { + stats.pipelineStats[pipeline].operatorStats[op].add( + moreStats.pipelineStats[pipeline].operatorStats[op]); + } + } + } + result.push_back(std::move(stats)); + } + return result; +} + +} // namespace facebook::velox::runner diff --git a/velox/runner/LocalRunner.h b/velox/runner/LocalRunner.h new file mode 100644 index 0000000000000..f5d3bb5c211fc --- /dev/null +++ b/velox/runner/LocalRunner.h @@ -0,0 +1,109 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/exec/Exchange.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/runner/LocalSchema.h" +#include "velox/runner/MultiFragmentPlan.h" +#include "velox/runner/Runner.h" + +namespace facebook::velox::runner { + +/// Runner for in-process execution of a distributed plan. +class LocalRunner : public Runner, + public std::enable_shared_from_this { + public: + LocalRunner( + MultiFragmentPlanPtr plan, + std::shared_ptr queryCtx, + std::shared_ptr splitSourceFactory) + : plan_(std::move(plan)), + fragments_(plan_->fragments()), + options_(plan_->options()), + splitSourceFactory_(std::move(splitSourceFactory)) { + params_.queryCtx = std::move(queryCtx); + } + + RowVectorPtr next() override; + + std::vector stats() const override; + + void abort() override; + + void waitForCompletion(int32_t maxWaitMicros) override; + + State state() const override { + return state_; + } + + private: + void start(); + + // Creates all stages except for the single worker final consumer stage. + std::vector> makeStages(); + + // Serializes 'cursor_' and 'error_'. + mutable std::mutex mutex_; + + const MultiFragmentPlanPtr plan_; + const std::vector fragments_; + const MultiFragmentPlan::Options& options_; + const std::shared_ptr splitSourceFactory_; + + exec::test::CursorParameters params_; + + tsan_atomic state_{State::kInitialized}; + + std::unique_ptr cursor_; + std::vector>> stages_; + std::exception_ptr error_; +}; + +/// Split source that produces splits from a LocalSchema. +class LocalSplitSource : public SplitSource { + public: + LocalSplitSource(const LocalTable* table, int32_t splitsPerFile) + : table_(table), splitsPerFile_(splitsPerFile) {} + + exec::Split next(int32_t worker) override; + + private: + const LocalTable* const table_; + const int32_t splitsPerFile_; + + std::vector> fileSplits_; + int32_t currentFile_{-1}; + int32_t currentSplit_{0}; +}; + +class LocalSplitSourceFactory : public SplitSourceFactory { + public: + LocalSplitSourceFactory( + std::shared_ptr schema, + int32_t splitsPerFile) + : schema_(std::move(schema)), splitsPerFile_(splitsPerFile) {} + + std::unique_ptr splitSourceForScan( + const core::TableScanNode& scan) override; + + private: + const std::shared_ptr schema_; + const int32_t splitsPerFile_; +}; + +} // namespace facebook::velox::runner diff --git a/velox/runner/LocalSchema.cpp b/velox/runner/LocalSchema.cpp new file mode 100644 index 0000000000000..b35b50b416df8 --- /dev/null +++ b/velox/runner/LocalSchema.cpp @@ -0,0 +1,321 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/runner/LocalSchema.h" +#include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/dwio/common/BufferedInput.h" +#include "velox/dwio/common/Reader.h" +#include "velox/dwio/common/ReaderFactory.h" + +namespace facebook::velox::runner { + +LocalSchema::LocalSchema( + const std::string& path, + dwio::common::FileFormat format, + connector::hive::HiveConnector* hiveConnector, + std::shared_ptr ctx) + : Schema(path, ctx->memoryPool()), + hiveConnector_(hiveConnector), + connectorId_(hiveConnector_->connectorId()), + connectorQueryCtx_(ctx), + format_(format) { + initialize(path); +} + +void LocalSchema::initialize(const std::string& path) { + for (auto const& dirEntry : fs::directory_iterator{path}) { + if (!dirEntry.is_directory() || + dirEntry.path().filename().c_str()[0] == '.') { + continue; + } + loadTable(dirEntry.path().filename(), dirEntry.path()); + } +} +// Feeds the values in 'vector' into 'builder'. +template +void addStats( + velox::dwrf::StatisticsBuilder* builder, + const BaseVector& vector) { + auto* typedVector = vector.asUnchecked>(); + for (auto i = 0; i < typedVector->size(); ++i) { + if (!typedVector->isNullAt(i)) { + reinterpret_cast(builder)->addValues(typedVector->valueAt(i)); + } + } +} + +std::pair LocalTable::sample( + float pct, + const std::vector& fields, + velox::connector::hive::SubfieldFilters filters, + const velox::core::TypedExprPtr& remainingFilter, + HashStringAllocator* /*allocator*/, + std::vector>* + statsBuilders) { + dwrf::StatisticsBuilderOptions options( + /*stringLengthLimit=*/100, /*initialSize=*/0); + std::vector> builders; + auto tableHandle = std::make_shared( + schema_->connector()->connectorId(), + name_, + true, + std::move(filters), + remainingFilter); + + std::unordered_map< + std::string, + std::shared_ptr> + columnHandles; + std::vector names; + std::vector types; + for (auto& field : fields) { + auto& path = field.path(); + auto column = + dynamic_cast(path[0].get()) + ->name(); + const auto idx = type_->getChildIdx(column); + names.push_back(type_->nameOf(idx)); + types.push_back(type_->childAt(idx)); + columnHandles[names.back()] = + std::make_shared( + names.back(), + connector::hive::HiveColumnHandle::ColumnType::kRegular, + types.back(), + types.back()); + switch (types.back()->kind()) { + case TypeKind::BIGINT: + case TypeKind::INTEGER: + case TypeKind::SMALLINT: + builders.push_back( + std::make_unique(options)); + break; + case TypeKind::REAL: + case TypeKind::DOUBLE: + builders.push_back( + std::make_unique(options)); + break; + case TypeKind::VARCHAR: + builders.push_back( + std::make_unique(options)); + break; + + default: + builders.push_back(nullptr); + } + } + + const auto outputType = ROW(std::move(names), std::move(types)); + int64_t passingRows = 0; + int64_t scannedRows = 0; + for (auto& file : files_) { + auto dataSource = schema_->connector()->createDataSource( + outputType, + tableHandle, + columnHandles, + schema_->connectorQueryCtx().get()); + + auto split = connector::hive::HiveConnectorSplitBuilder(file) + .fileFormat(format_) + .connectorId(schema_->connector()->connectorId()) + .build(); + dataSource->addSplit(split); + constexpr int32_t kBatchSize = 1000; + for (;;) { + ContinueFuture ignore{ContinueFuture::makeEmpty()}; + + auto data = dataSource->next(kBatchSize, ignore).value(); + if (data == nullptr) { + break; + } + passingRows += data->size(); + for (auto column = 0; column < builders.size(); ++column) { + if (!builders[column]) { + continue; + } + auto* builder = builders[column].get(); + auto loadChild = [](RowVectorPtr data, int32_t column) { + data->childAt(column) = + BaseVector::loadedVectorShared(data->childAt(column)); + }; + switch (type_->childAt(column)->kind()) { + case TypeKind::SMALLINT: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::INTEGER: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::BIGINT: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::REAL: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::DOUBLE: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + case TypeKind::VARCHAR: + loadChild(data, column); + addStats( + builder, *data->childAt(column)); + break; + + default: + break; + } + } + break; + } + scannedRows += dataSource->getCompletedRows(); + if (scannedRows > numRows_ * (pct / 100)) { + break; + } + } + if (statsBuilders) { + *statsBuilders = std::move(builders); + } + return std::pair(scannedRows, passingRows); +} + +void LocalSchema::loadTable( + const std::string& tableName, + const fs::path& tablePath) { + // open each file in the directory and check their type and add up the row + // counts. + RowTypePtr tableType; + LocalTable* table = nullptr; + + for (auto const& dirEntry : fs::directory_iterator{tablePath}) { + if (!dirEntry.is_regular_file()) { + continue; + } + // Ignore hidden files. + if (dirEntry.path().filename().c_str()[0] == '.') { + continue; + } + auto it = tables_.find(tableName); + if (it != tables_.end()) { + table = reinterpret_cast(it->second.get()); + } else { + tables_[tableName] = + std::make_unique(tableName, format_, this); + table = reinterpret_cast(tables_[tableName].get()); + } + dwio::common::ReaderOptions readerOptions{pool_}; + readerOptions.setFileFormat(format_); + auto input = std::make_unique( + std::make_shared(dirEntry.path().string()), + readerOptions.memoryPool()); + std::unique_ptr reader = + dwio::common::getReaderFactory(readerOptions.fileFormat()) + ->createReader(std::move(input), readerOptions); + const auto fileType = reader->rowType(); + if (!tableType) { + tableType = fileType; + } else if (fileType->size() > tableType->size()) { + // The larger type is the later since there is only addition of columns. + // TODO: Check the column types are compatible where they overlap. + tableType = fileType; + } + const auto rows = reader->numberOfRows(); + + if (rows.has_value()) { + table->numRows_ += rows.value(); + } + for (auto i = 0; i < fileType->size(); ++i) { + auto name = fileType->nameOf(i); + LocalColumn* column; + auto columnIt = table->columns().find(name); + if (columnIt != table->columns().end()) { + column = columnIt->second.get(); + } else { + table->columns()[name] = + std::make_unique(name, fileType->childAt(i)); + column = table->columns()[name].get(); + } + // Initialize the stats from the first file. + if (column->stats() == nullptr) { + column->setStats(reader->columnStatistics(i)); + } + } + table->files_.push_back(dirEntry.path()); + } + VELOX_CHECK_NOT_NULL(table, "Table directory {} is empty", tablePath); + + table->setType(tableType); + table->sampleNumDistincts(2, pool_); +} + +void LocalTable::sampleNumDistincts(float samplePct, memory::MemoryPool* pool) { + std::vector fields; + for (auto i = 0; i < type_->size(); ++i) { + fields.push_back(common::Subfield(type_->nameOf(i))); + } + + // Sample the table. Adjust distinct values according to the samples. + auto allocator = std::make_unique(pool); + std::vector> statsBuilders; + auto [sampled, passed] = + sample(samplePct, fields, {}, nullptr, allocator.get(), &statsBuilders); + numSampledRows_ = sampled; + for (auto i = 0; i < statsBuilders.size(); ++i) { + if (statsBuilders[i]) { + // TODO: Use HLL estimate of distinct values here after this is added to + // the stats builder. Now assume that all rows have a different value. + // Later refine this by observed min-max range. + int64_t approxNumDistinct = numRows_; + // For tiny tables the sample is 100% and the approxNumDistinct is + // accurate. For partial samples, the distinct estimate is left to be the + // distinct estimate of the sample if there are few distincts. This is an + // enumeration where values in unsampled rows are likely the same. If + // there are many distincts, we multiply by 1/sample rate assuming that + // unsampled rows will mostly have new values. + + if (numSampledRows_ < numRows_) { + if (approxNumDistinct > sampled / 50) { + float numDups = + numSampledRows_ / static_cast(approxNumDistinct); + approxNumDistinct = std::min(numRows_, numRows_ / numDups); + + // If the type is an integer type, num distincts cannot be larger than + // max - min. + if (auto* ints = dynamic_cast( + statsBuilders[i].get())) { + auto min = ints->getMinimum(); + auto max = ints->getMaximum(); + if (min.has_value() && max.has_value()) { + auto range = max.value() - min.value(); + approxNumDistinct = std::min(approxNumDistinct, range); + } + } + } + + columns()[type_->nameOf(i)]->setNumDistinct(approxNumDistinct); + } + } + } +} + +} // namespace facebook::velox::runner diff --git a/velox/runner/LocalSchema.h b/velox/runner/LocalSchema.h new file mode 100644 index 0000000000000..628dd71c738fe --- /dev/null +++ b/velox/runner/LocalSchema.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/base/Fs.h" +#include "velox/common/memory/HashStringAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/writer/StatisticsBuilder.h" +#include "velox/runner/Schema.h" + +namespace facebook::velox::runner { + +class LocalColumn : public Column { + public: + LocalColumn(const std::string& name, TypePtr type) : Column(name, type) {} + + friend class LocalSchema; +}; + +class LocalSchema; + +class LocalTable : public Table { + public: + LocalTable( + const std::string& name, + dwio::common::FileFormat format, + Schema* schema) + : Table(name, format, schema) {} + + std::unordered_map>& columns() { + return columns_; + } + + LocalSchema* schema() const { + return reinterpret_cast(schema_); + } + + void setType(const RowTypePtr& type) { + type_ = type; + } + + std::pair sample( + float pct, + const std::vector& columns, + connector::hive::SubfieldFilters filters, + const core::TypedExprPtr& remainingFilter, + HashStringAllocator* allocator = nullptr, + std::vector>* statsBuilders = + nullptr) override; + + /// Samples 'samplePct' % rows of the table and sets the num distincts + /// estimate for the columns. uses 'pool' for temporary data. + void sampleNumDistincts(float samplePct, memory::MemoryPool* pool); + + const std::vector& files() const { + return files_; + } + + private: + // All columns. Filled by loadTable(). + std::unordered_map> columns_; + + std::vector files_; + int64_t numRows_{0}; + int64_t numSampledRows_{0}; + + friend class LocalSchema; +}; + +class LocalSchema : public Schema { + public: + /// 'path' is the directory containing a subdirectory per table. + LocalSchema( + const std::string& path, + dwio::common::FileFormat format, + connector::hive::HiveConnector* hiveConector, + std::shared_ptr ctx); + + connector::Connector* connector() const override { + return hiveConnector_; + } + + const std::shared_ptr& connectorQueryCtx() + const override { + return connectorQueryCtx_; + } + + private: + void initialize(const std::string& path); + + void loadTable(const std::string& tableName, const fs::path& tablePath); + + connector::hive::HiveConnector* const hiveConnector_; + const std::string connectorId_; + const std::shared_ptr connectorQueryCtx_; + const dwio::common::FileFormat format_; +}; + +} // namespace facebook::velox::runner diff --git a/velox/runner/MultiFragmentPlan.h b/velox/runner/MultiFragmentPlan.h new file mode 100644 index 0000000000000..3bd79907a9236 --- /dev/null +++ b/velox/runner/MultiFragmentPlan.h @@ -0,0 +1,96 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/Task.h" + +namespace facebook::velox::runner { + +/// Describes an exchange source for an ExchangeNode a non-leaf stage. +struct InputStage { + // Id of ExchangeNode in the consumer fragment. + core::PlanNodeId consumerNodeId; + + /// Task prefix of producer stage. + std::string producerTaskPrefix; +}; + +/// Describes a fragment of a distributed plan. This allows a run +/// time to distribute fragments across workers and to set up +/// exchanges. A complete plan is a vector of these with the last +/// being the fragment that gathers results from the complete +/// plan. Different runtimes, e.g. local, streaming or +/// materialized shuffle can use this to describe exchange +/// parallel execution. Decisions on number of workers, location +/// of workers and mode of exchange are up to the runtime. +struct ExecutableFragment { + explicit ExecutableFragment(const std::string& taskPrefix) + : taskPrefix(taskPrefix) {} + std::string taskPrefix; + int32_t width{0}; + velox::core::PlanFragment fragment; + + /// Source fragments and Exchange node ids for remote shuffles producing input + /// for 'this'. + std::vector inputStages; + + /// Table scan nodes in 'this'. + std::vector> scans; + int32_t numBroadcastDestinations{0}; +}; + +/// Describes a distributed plan handed to a Runner for parallel/distributed +/// execution. The last element of 'fragments' is by convention the stage that +/// gathers the query result. Otherwise the order of 'fragments' is not +/// important since the producer-consumer relations are given by 'inputStages' +/// in each fragment. +class MultiFragmentPlan { + public: + /// Describes options for running a MultiFragmentPlan. + struct Options { + /// Query id used as a prefix for tasks ids. + std::string queryId; + + // Maximum Number of independent Tasks for one stage of execution. If 1, + // there are no exchanges. + int32_t numWorkers; + + // Number of threads in a fragment in a worker. If 1, there are no local + // exchanges. + int32_t numDrivers; + }; + + MultiFragmentPlan(std::vector fragments, Options options) + : fragments_(std::move(fragments)), options_(std::move(options)) {} + + const std::vector& fragments() const { + return fragments_; + } + + const Options& options() const { + return options_; + } + + std::string toString() const; + + private: + const std::vector fragments_; + const Options options_; +}; + +using MultiFragmentPlanPtr = std::shared_ptr; + +} // namespace facebook::velox::runner diff --git a/velox/runner/Runner.cpp b/velox/runner/Runner.cpp new file mode 100644 index 0000000000000..d4f18c29f404e --- /dev/null +++ b/velox/runner/Runner.cpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/runner/Runner.h" + +namespace facebook::velox::runner { + +// static +std::string Runner::stateString(Runner::State state) { + switch (state) { + case Runner::State::kInitialized: + return "initialized"; + case Runner::State::kRunning: + return "running"; + case Runner::State::kCancelled: + return "cancelled"; + case Runner::State::kError: + return "error"; + case Runner::State::kFinished: + return "finished"; + } + return fmt::format("invalid state {}", static_cast(state)); +} + +std::string MultiFragmentPlan::toString() const { + std::stringstream out; + for (auto i = 0; i < fragments_.size(); ++i) { + out << fmt::format( + "Fragment {}: {} numWorkers={}:\n", + i, + fragments_[i].taskPrefix, + fragments_[i].width); + out << fragments_[i].fragment.planNode->toString(true, true) << std::endl; + if (!fragments_[i].inputStages.empty()) { + out << "Inputs: "; + for (auto& input : fragments_[i].inputStages) { + out << fmt::format( + " {} <- {} ", input.consumerNodeId, input.producerTaskPrefix); + } + out << std::endl; + } + } + return out.str(); +} + +} // namespace facebook::velox::runner diff --git a/velox/runner/Runner.h b/velox/runner/Runner.h new file mode 100644 index 0000000000000..e570a0df1cf26 --- /dev/null +++ b/velox/runner/Runner.h @@ -0,0 +1,101 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/exec/Exchange.h" +#include "velox/exec/tests/utils/Cursor.h" +#include "velox/runner/LocalSchema.h" +#include "velox/runner/MultiFragmentPlan.h" + +/// Base classes for multifragment Velox query execution. +namespace facebook::velox::runner { + +/// Iterator for obtaining splits for a scan. One is created for each table +/// scan. +class SplitSource { + public: + virtual ~SplitSource() = default; + /// Returns a split for 'worker'. This may implement soft affinity or strict + /// bucket to worker mapping. + virtual exec::Split next(int32_t worker) = 0; +}; + +/// A factory for getting a SplitSource for each TableScan. The splits produced +/// may depend on partition keys, buckets etc mentioned by each tableScan. +class SplitSourceFactory { + public: + virtual ~SplitSourceFactory() = default; + + /// Returns a splitSource for one TableScan across all Tasks of + /// the fragment. The source will be invoked to produce splits for + /// each individual worker running the scan. + virtual std::unique_ptr splitSourceForScan( + const core::TableScanNode& scan) = 0; +}; + +/// Base class for executing multifragment Velox queries. One instance +/// of a Runner coordinates the execution of one multifragment +/// query. Different derived classes can support different shuffles +/// and different scheduling either in process or in a cluster. Unless +/// otherwise stated, the member functions are thread safe as long as +/// the caller holds an owning reference to the runner. +class Runner { + public: + enum class State { kInitialized, kRunning, kFinished, kError, kCancelled }; + + static std::string stateString(Runner::State state); + + virtual ~Runner() = default; + + /// Returns the next batch of results. Returns nullptr when no more results. + /// Throws any execution time errors. The result is allocated in the pool of + /// QueryCtx given to the Runner implementation. The caller is responsible for + /// serializing calls from different threads. + virtual RowVectorPtr next() = 0; + + /// Returns Task stats for each fragment of the plan. The stats correspond 1:1 + /// to the stages in the MultiFragmentPlan. This may be called at any time. + /// before waitForCompletion() or abort(). + virtual std::vector stats() const = 0; + + /// Returns the state of execution. + virtual State state() const = 0; + + /// Cancels the possibly pending execution. Returns immediately, thus before + /// the execution is actually finished. Use waitForCompletion() to wait for + /// all execution resources to be freed. May be called from any thread without + /// serialization. + virtual void abort() = 0; + + /// Waits up to 'maxWaitMicros' for all activity of the execution to cease. + /// This is used in tests to ensure that all pools are empty and unreferenced + /// before teradown. + + virtual void waitForCompletion(int32_t maxWaitMicros) = 0; +}; + +} // namespace facebook::velox::runner + +template <> +struct fmt::formatter + : formatter { + auto format(facebook::velox::runner::Runner::State state, format_context& ctx) + const { + return formatter::format( + facebook::velox::runner::Runner::stateString(state), ctx); + } +}; diff --git a/velox/runner/Schema.h b/velox/runner/Schema.h new file mode 100644 index 0000000000000..a0d6f35dd9c49 --- /dev/null +++ b/velox/runner/Schema.h @@ -0,0 +1,168 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/common/base/Fs.h" +#include "velox/common/memory/HashStringAllocator.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/connectors/hive/TableHandle.h" +#include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/writer/StatisticsBuilder.h" + +/// Base classes for schema elements used in execution. A Schema is a collection +/// of Tables. A Table is a collection of Columns. Tables and Columns have +/// statistics and Tables can be sampled. Derived classes connect to different +/// metadata stores and provide different metadata, e.g. order, partitioning, +/// bucketing etc. +namespace facebook::velox::runner { + +/// Base class for column. The column's name and type are immutable but the +/// stats may be set multiple times. +class Column { + public: + virtual ~Column() = default; + + Column(const std::string& name, TypePtr type) : name_(name), type_(type) {} + + dwio::common::ColumnStatistics* stats() const { + return latestStats_; + } + + /// Sets statistics. May be called multipl times if table contents change. + void setStats(std::unique_ptr stats) { + std::lock_guard l(mutex_); + allStats_.push_back(std::move(stats)); + latestStats_ = allStats_.back().get(); + } + + const std::string& name() const { + return name_; + } + + const TypePtr& type() const { + return type_; + } + + void setNumDistinct(int64_t numDistinct) { + approxNumDistinct_ = numDistinct; + } + + protected: + const std::string name_; + const TypePtr type_; + + // The latest element added to 'allStats_'. + tsan_atomic latestStats_{nullptr}; + + // All statistics recorded for this column. Old values can be purged when the + // containing Schema is not in use. + std::vector> allStats_; + + // Latest approximate count of distinct values. + std::optional approxNumDistinct_; + + private: + // Serializes changes to statistics. + std::mutex mutex_; +}; + +class Schema; + +/// Base class for table. This is used to identify a table for purposes of +/// Split generation, statistics, sampling etc. +class Table { + public: + virtual ~Table() = default; + + Table( + const std::string& name, + dwio::common::FileFormat format, + Schema* schema) + : schema_(schema), name_(name), format_(format) {} + + const std::string& name() const { + return name_; + } + + const RowTypePtr& rowType() const { + return type_; + } + + dwio::common::FileFormat format() const { + return format_; + } + + /// Samples 'pct' percent of rows for 'fields'. Applies 'filters' + /// before sampling. Returns {count of sampled, count matching filters}. + /// Returns statistics for the post-filtering values in 'stats' for each of + /// 'fields'. If 'fields' is empty, simply returns the number of + /// rows matching 'filter' in a sample of 'pct'% of the table. + /// + /// TODO: Introduce generic statistics builder in dwio/common. + virtual std::pair sample( + float pct, + const std::vector& columns, + connector::hive::SubfieldFilters filters, + const core::TypedExprPtr& remainingFilter, + HashStringAllocator* allocator = nullptr, + std::vector>* statsBuilders = + nullptr) { + VELOX_UNSUPPORTED("Table class does not support sampling."); + } + + protected: + Schema* const schema_; + const std::string name_; + + const dwio::common::FileFormat format_; + + // Discovered from data. In the event of different types, we take the + // latest (i.e. widest) table type. + RowTypePtr type_; +}; + +/// Base class for collection of tables. A query executes against a +/// Schema and its tables and columns are resolved against the +/// Schema. The schema is mutable and may acquire tables and the +/// tables may acquire stats during their lifetime. +class Schema { + public: + virtual ~Schema() = default; + + Schema(const std::string& name, memory::MemoryPool* pool) + : name_(name), pool_(std::move(pool)) {} + + Table* findTable(const std::string& name) { + auto it = tables_.find(name); + VELOX_CHECK(it != tables_.end(), "Table {} not found", name); + return it->second.get(); + } + + virtual connector::Connector* connector() const = 0; + + virtual const std::shared_ptr& + connectorQueryCtx() const = 0; + + protected: + const std::string name_; + + memory::MemoryPool* const pool_; + + std::unordered_map> tables_; +}; + +} // namespace facebook::velox::runner diff --git a/velox/runner/tests/CMakeLists.txt b/velox/runner/tests/CMakeLists.txt new file mode 100644 index 0000000000000..005161a505c15 --- /dev/null +++ b/velox/runner/tests/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(velox_local_runner_test LocalRunnerTest.cpp + ../../exec/tests/Main.cpp) + +add_test(velox_local_runner_test velox_local_runner_test) + +target_link_libraries( + velox_local_runner_test + velox_exec_test_lib + velox_dwio_common + velox_parse_parser + velox_parse_expression) diff --git a/velox/runner/tests/LocalRunnerTest.cpp b/velox/runner/tests/LocalRunnerTest.cpp new file mode 100644 index 0000000000000..ffd05cdc83f00 --- /dev/null +++ b/velox/runner/tests/LocalRunnerTest.cpp @@ -0,0 +1,175 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/tests/utils/DistributedPlanBuilder.h" +#include "velox/exec/tests/utils/LocalRunnerTestBase.h" +#include "velox/exec/tests/utils/QueryAssertions.h" + +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::runner; +using namespace facebook::velox::exec::test; + +class LocalRunnerTest : public LocalRunnerTestBase { + protected: + static constexpr int32_t kNumFiles = 5; + static constexpr int32_t kNumVectors = 5; + static constexpr int32_t kRowsPerVector = 10000; + static constexpr int32_t kNumRows = kNumFiles * kNumVectors * kRowsPerVector; + + static void SetUpTestCase() { + // The lambdas will be run after this scope returns, so make captures + // static. + static int32_t counter1; + // Clear 'counter1' so that --gtest_repeat runs get the same data. + counter1 = 0; + auto customize1 = [&](const RowVectorPtr& rows) { + makeAscending(rows, counter1); + }; + + static int32_t counter2; + counter2 = 0; + auto customize2 = [&](const RowVectorPtr& rows) { + makeAscending(rows, counter2); + }; + + rowType_ = ROW({"c0"}, {BIGINT()}); + testTables_ = { + TableSpec{ + .name = "T", + .columns = rowType_, + .rowsPerVector = kRowsPerVector, + .numVectorsPerFile = kNumVectors, + .numFiles = kNumFiles, + .customizeData = customize1}, + TableSpec{ + .name = "U", + .columns = rowType_, + .rowsPerVector = kRowsPerVector, + .numVectorsPerFile = kNumVectors, + .numFiles = kNumFiles, + .customizeData = customize2}}; + + // Creates the data and schema from 'testTables_'. These are created on the + // first test fixture initialization. + LocalRunnerTestBase::SetUpTestCase(); + } + + // Returns a plan with a table scan. This is a single stage if 'numWorkers' is + // 1, otherwise this is a scan stage plus shuffle to a stage that gathers the + // scan results. + MultiFragmentPlanPtr makeScanPlan(const std::string& id, int32_t numWorkers) { + MultiFragmentPlan::Options options = { + .queryId = id, .numWorkers = numWorkers, .numDrivers = 2}; + const int32_t width = 3; + + DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); + rootBuilder.tableScan("T", rowType_); + if (numWorkers > 1) { + rootBuilder.shuffle({}, 1, false); + } + return std::make_shared( + rootBuilder.fragments(), std::move(options)); + } + + MultiFragmentPlanPtr makeJoinPlan(std::string project = "c0") { + MultiFragmentPlan::Options options = { + .queryId = "test.", .numWorkers = 4, .numDrivers = 2}; + const int32_t width = 3; + + DistributedPlanBuilder rootBuilder(options, idGenerator_, pool_.get()); + rootBuilder.tableScan("T", rowType_) + .project({project}) + .shuffle({"c0"}, 3, false) + .hashJoin( + {"c0"}, + {"b0"}, + DistributedPlanBuilder(rootBuilder) + .tableScan("U", rowType_) + .project({"c0 as b0"}) + .shuffleResult({"b0"}, width, false), + "", + {"c0", "b0"}) + .shuffle({}, 1, false) + .finalAggregation({}, {"count(1)"}, {{BIGINT()}}); + return std::make_shared( + rootBuilder.fragments(), std::move(options)); + } + + static void makeAscending(const RowVectorPtr& rows, int32_t& counter) { + auto ints = rows->childAt(0)->as>(); + for (auto i = 0; i < ints->size(); ++i) { + ints->set(i, counter + i); + } + counter += ints->size(); + } + + void checkScanCount(const std::string& id, int32_t numWorkers) { + auto scan = makeScanPlan(id, numWorkers); + auto localRunner = std::make_shared( + std::move(scan), + makeQueryCtx("q1", rootPool_.get()), + splitSourceFactory_); + auto results = readCursor(localRunner); + + int32_t count = 0; + for (auto& rows : results) { + count += rows->size(); + } + localRunner->waitForCompletion(5000); + EXPECT_EQ(250'000, count); + } + + std::shared_ptr idGenerator_{ + std::make_shared()}; + // The below are declared static to be scoped to TestCase so as to reuse the + // dataset between tests. + + inline static RowTypePtr rowType_; +}; + +TEST_F(LocalRunnerTest, count) { + auto join = makeJoinPlan(); + auto localRunner = std::make_shared( + std::move(join), + makeQueryCtx("q1", rootPool_.get()), + splitSourceFactory_); + auto results = readCursor(localRunner); + auto stats = localRunner->stats(); + EXPECT_EQ(1, results.size()); + EXPECT_EQ(1, results[0]->size()); + EXPECT_EQ( + kNumRows, results[0]->childAt(0)->as>()->valueAt(0)); + results.clear(); + EXPECT_EQ(Runner::State::kFinished, localRunner->state()); + localRunner->waitForCompletion(5000); +} + +TEST_F(LocalRunnerTest, error) { + auto join = makeJoinPlan("if (c0 = 111, c0 / 0, c0 + 1) as c0"); + auto localRunner = std::make_shared( + std::move(join), + makeQueryCtx("q1", rootPool_.get()), + splitSourceFactory_); + EXPECT_THROW(readCursor(localRunner), VeloxUserError); + EXPECT_EQ(Runner::State::kError, localRunner->state()); + localRunner->waitForCompletion(5000); +} + +TEST_F(LocalRunnerTest, scan) { + checkScanCount("s1", 1); + checkScanCount("s2", 3); +}