diff --git a/velox/common/base/PrefixSortConfig.h b/velox/common/base/PrefixSortConfig.h new file mode 100644 index 000000000000..34adee1b7914 --- /dev/null +++ b/velox/common/base/PrefixSortConfig.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace facebook::velox::common { + +/// Specifies the config for prefix-sort. +struct PrefixSortConfig { + explicit PrefixSortConfig( + int64_t maxNormalizedKeySize, + int32_t threshold = 130) + : maxNormalizedKeySize(maxNormalizedKeySize), threshold(threshold) {} + + /// Max number of bytes can store normalized keys in prefix-sort buffer per + /// entry. + const int64_t maxNormalizedKeySize; + + /// PrefixSort will have performance regression when the dateset is too small. + const int32_t threshold; +}; +} // namespace facebook::velox::common diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 47a13fe295c9..ae46a1017aaa 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -17,6 +17,7 @@ #include "folly/CancellationToken.h" #include "velox/common/base/AsyncSource.h" +#include "velox/common/base/PrefixSortConfig.h" #include "velox/common/base/RuntimeMetrics.h" #include "velox/common/base/SpillConfig.h" #include "velox/common/base/SpillStats.h" @@ -255,6 +256,7 @@ class ConnectorQueryCtx { memory::MemoryPool* connectorPool, const Config* sessionProperties, const common::SpillConfig* spillConfig, + common::PrefixSortConfig prefixSortConfig, std::unique_ptr expressionEvaluator, cache::AsyncDataCache* cache, const std::string& queryId, @@ -267,6 +269,7 @@ class ConnectorQueryCtx { connectorPool_(connectorPool), sessionProperties_(sessionProperties), spillConfig_(spillConfig), + prefixSortConfig_(prefixSortConfig), expressionEvaluator_(std::move(expressionEvaluator)), cache_(cache), scanId_(fmt::format("{}.{}", taskId, planNodeId)), @@ -300,6 +303,10 @@ class ConnectorQueryCtx { return spillConfig_; } + const common::PrefixSortConfig& prefixSortConfig() const { + return prefixSortConfig_; + } + core::ExpressionEvaluator* expressionEvaluator() const { return expressionEvaluator_.get(); } @@ -349,6 +356,7 @@ class ConnectorQueryCtx { memory::MemoryPool* const connectorPool_; const Config* const sessionProperties_; const common::SpillConfig* const spillConfig_; + const common::PrefixSortConfig prefixSortConfig_; std::unique_ptr expressionEvaluator_; cache::AsyncDataCache* cache_; const std::string scanId_; diff --git a/velox/connectors/hive/HiveDataSink.cpp b/velox/connectors/hive/HiveDataSink.cpp index 0029a6d48cf9..25cadf4d25cc 100644 --- a/velox/connectors/hive/HiveDataSink.cpp +++ b/velox/connectors/hive/HiveDataSink.cpp @@ -748,6 +748,7 @@ HiveDataSink::maybeCreateBucketSortWriter( sortCompareFlags_, sortPool, writerInfo_.back()->nonReclaimableSectionHolder.get(), + connectorQueryCtx_->prefixSortConfig(), spillConfig_, writerInfo_.back()->spillStats.get()); return std::make_unique( diff --git a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp index 4bd3a882179f..869751325e02 100644 --- a/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp +++ b/velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.cpp @@ -15,8 +15,8 @@ */ #include "velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.h" - #include +#include "velox/exec/tests/utils/PrefixSortUtils.h" using namespace facebook::velox; using namespace facebook::velox::dwio; @@ -295,6 +295,7 @@ void IcebergSplitReaderBenchmark::readSingleColumn( connectorPool.get(), connectorSessionProperties_.get(), nullptr, + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.IcebergSplitReader", diff --git a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp index c3fe6eaf57ef..c452c9e0d499 100644 --- a/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp +++ b/velox/connectors/hive/tests/HiveConnectorUtilTest.cpp @@ -14,14 +14,14 @@ * limitations under the License. */ +#include "velox/connectors/hive/HiveConnectorUtil.h" #include -#include "velox/exec/tests/utils/HiveConnectorTestBase.h" - #include "velox/connectors/hive/HiveConfig.h" #include "velox/connectors/hive/HiveConnectorSplit.h" -#include "velox/connectors/hive/HiveConnectorUtil.h" #include "velox/connectors/hive/TableHandle.h" #include "velox/core/Config.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/exec/tests/utils/PrefixSortUtils.h" namespace facebook::velox::connector { @@ -48,6 +48,7 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) { pool_.get(), &sessionProperties, nullptr, + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.HiveConnectorUtilTest", diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 7257c53b6b98..975230e2068c 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -26,6 +26,7 @@ #include "velox/dwio/common/Options.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/PrefixSortUtils.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -117,6 +118,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { connectorPool_.get(), connectorSessionProperties_.get(), nullptr, + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.HiveDataSinkTest", @@ -702,6 +704,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, memoryReclaim) { connectorPool_.get(), connectorSessionProperties_.get(), spillConfig.get(), + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.HiveDataSinkTest", @@ -716,6 +719,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, memoryReclaim) { connectorPool_.get(), connectorSessionProperties_.get(), nullptr, + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.HiveDataSinkTest", @@ -841,6 +845,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { connectorPool_.get(), connectorSessionProperties_.get(), spillConfig.get(), + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.HiveDataSinkTest", @@ -855,6 +860,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) { connectorPool_.get(), connectorSessionProperties_.get(), nullptr, + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.HiveDataSinkTest", @@ -935,6 +941,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, sortWriterFailureTest) { connectorPool_.get(), connectorSessionProperties_.get(), spillConfig.get(), + exec::test::defaultPrefixSortConfig(), nullptr, nullptr, "query.HiveDataSinkTest", diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 2c8763f1e822..f784b8a69d0c 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -355,6 +355,15 @@ class QueryConfig { static constexpr const char* kDriverCpuTimeSliceLimitMs = "driver_cpu_time_slice_limit_ms"; + /// Maximum number of bytes to use for the normalized key in prefix-sort. Use + /// 0 to disable prefix-sort. + static constexpr const char* kPrefixSortNormalizedKeyMaxBytes = + "prefixsort_normalized_key_max_bytes"; + + /// Minimum number of rows to use prefix-sort. The default value has been + /// derived using micro-benchmarking. + static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows"; + uint64_t queryMaxMemoryPerNode() const { return toCapacity( get(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE); @@ -711,6 +720,14 @@ class QueryConfig { return get(kDriverCpuTimeSliceLimitMs, 0); } + int64_t prefixSortNormalizedKeyMaxBytes() const { + return get(kPrefixSortNormalizedKeyMaxBytes, 128); + } + + int32_t prefixSortMinRows() const { + return get(kPrefixSortMinRows, 130); + } + template T get(const std::string& key, const T& defaultValue) const { return config_->get(key, defaultValue); diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 9bd5296bd330..08ef569e390d 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -129,6 +129,14 @@ Generic Configuration - 0 - If it is not zero, specifies the time limit that a driver can continuously run on a thread before yield. If it is zero, then it no limit. + * - prefixsort_normalized_key_max_bytes + - integer + - 128 + - Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort. + * - prefixsort_min_rows + - integer + - 130 + - Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking. .. _expression-evaluation-conf: diff --git a/velox/exec/Driver.h b/velox/exec/Driver.h index bb88f9604fcd..46fe88dbb99b 100644 --- a/velox/exec/Driver.h +++ b/velox/exec/Driver.h @@ -294,6 +294,12 @@ struct DriverCtx { /// Builds the spill config for the operator with specified 'operatorId'. std::optional makeSpillConfig(int32_t operatorId) const; + + common::PrefixSortConfig prefixSortConfig() const { + return common::PrefixSortConfig{ + queryConfig().prefixSortNormalizedKeyMaxBytes(), + queryConfig().prefixSortMinRows()}; + } }; constexpr const char* kOpMethodNone = ""; diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 21e6ff2673c2..8bf3bad0fca3 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -59,6 +59,7 @@ OperatorCtx::createConnectorQueryCtx( connectorPool, task->queryCtx()->connectorSessionProperties(connectorId), spillConfig, + driverCtx_->prefixSortConfig(), std::make_unique( execCtx()->queryCtx(), execCtx()->pool()), task->queryCtx()->cache(), diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index 59b1bc43a0bc..b5939deef12e 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -65,6 +65,7 @@ OrderBy::OrderBy( sortCompareFlags, pool(), &nonReclaimableSection_, + driverCtx->prefixSortConfig(), spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr, &spillStats_); } diff --git a/velox/exec/PrefixSort.cpp b/velox/exec/PrefixSort.cpp index c5ea46a9bf9f..7fc0dff392c4 100644 --- a/velox/exec/PrefixSort.cpp +++ b/velox/exec/PrefixSort.cpp @@ -183,8 +183,6 @@ int PrefixSort::comparePartNormalizedKeys(char* left, char* right) { PrefixSort::PrefixSort( memory::MemoryPool* pool, RowContainer* rowContainer, - const std::vector& keyCompareFlags, - const PrefixSortConfig& config, const PrefixSortLayout& sortLayout) : pool_(pool), sortLayout_(sortLayout), rowContainer_(rowContainer) {} diff --git a/velox/exec/PrefixSort.h b/velox/exec/PrefixSort.h index 37bdaca36951..0f860c777ded 100644 --- a/velox/exec/PrefixSort.h +++ b/velox/exec/PrefixSort.h @@ -15,6 +15,7 @@ */ #pragma once +#include "velox/common/base/PrefixSortConfig.h" #include "velox/common/memory/MemoryAllocator.h" #include "velox/exec/RowContainer.h" #include "velox/exec/prefixsort/PrefixSortAlgorithm.h" @@ -41,20 +42,6 @@ FOLLY_ALWAYS_INLINE void stdSort( } }; // namespace detail -struct PrefixSortConfig { - PrefixSortConfig(uint32_t maxNormalizedKeySize, uint32_t threshold = 130) - : maxNormalizedKeySize(maxNormalizedKeySize), threshold(threshold) {} - - /// Max number of bytes can store normalized keys in prefix-sort buffer per - /// entry. - const uint32_t maxNormalizedKeySize; - - /// PrefixSort will have performance regression when the dateset is too small. - /// The threshold is set to 100 according to the benchmark test results by - /// default. - const int64_t threshold; -}; - /// The layout of prefix-sort buffer, a prefix entry includes: /// 1. normalized keys /// 2. non-normalized data ptr for semi-normalized types such as @@ -107,8 +94,6 @@ class PrefixSort { PrefixSort( memory::MemoryPool* pool, RowContainer* rowContainer, - const std::vector& keyCompareFlags, - const PrefixSortConfig& config, const PrefixSortLayout& sortLayout); /// Follow the steps below to sort the data in RowContainer: @@ -139,7 +124,7 @@ class PrefixSort { memory::MemoryPool* pool, RowContainer* rowContainer, const std::vector& compareFlags, - const PrefixSortConfig& config) { + const velox::common::PrefixSortConfig& config) { if (rowContainer->numRows() < config.threshold) { detail::stdSort(rows, rowContainer, compareFlags); return; @@ -154,7 +139,7 @@ class PrefixSort { return; } - PrefixSort prefixSort(pool, rowContainer, compareFlags, config, sortLayout); + PrefixSort prefixSort(pool, rowContainer, sortLayout); prefixSort.sortInternal(rows); } diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index d2df5567a4d5..cae316d19086 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -25,12 +25,14 @@ SortBuffer::SortBuffer( const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, + common::PrefixSortConfig prefixSortConfig, const common::SpillConfig* spillConfig, folly::Synchronized* spillStats) : input_(input), sortCompareFlags_(sortCompareFlags), pool_(pool), nonReclaimableSection_(nonReclaimableSection), + prefixSortConfig_(prefixSortConfig), spillConfig_(spillConfig), spillStats_(spillStats) { VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size()); @@ -112,19 +114,8 @@ void SortBuffer::noMoreInput() { sortedRows_.resize(numInputRows_); RowContainerIterator iter; data_->listRows(&iter, numInputRows_, sortedRows_.data()); - std::sort( - sortedRows_.begin(), - sortedRows_.end(), - [this](const char* leftRow, const char* rightRow) { - for (vector_size_t index = 0; index < sortCompareFlags_.size(); - ++index) { - if (auto result = data_->compare( - leftRow, rightRow, index, sortCompareFlags_[index])) { - return result < 0; - } - } - return false; - }); + PrefixSort::sort( + sortedRows_, pool_, data_.get(), sortCompareFlags_, prefixSortConfig_); } else { // Spill the remaining in-memory state to disk if spilling has been // triggered on this sort buffer. This is to simplify query OOM prevention diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index 473fe0e7e9b0..7b03dc7ea186 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -19,6 +19,7 @@ #include "velox/exec/ContainerRowSerde.h" #include "velox/exec/Operator.h" #include "velox/exec/OperatorUtils.h" +#include "velox/exec/PrefixSort.h" #include "velox/exec/RowContainer.h" #include "velox/exec/Spill.h" #include "velox/vector/BaseVector.h" @@ -36,6 +37,7 @@ class SortBuffer { const std::vector& sortCompareFlags, velox::memory::MemoryPool* pool, tsan_atomic* nonReclaimableSection, + common::PrefixSortConfig prefixSortConfig, const common::SpillConfig* spillConfig = nullptr, folly::Synchronized* spillStats = nullptr); @@ -87,6 +89,8 @@ class SortBuffer { // TableWriter to indicate if this sort buffer object is under non-reclaimable // execution section or not. tsan_atomic* const nonReclaimableSection_; + // Configuration settings for prefix-sort. + const common::PrefixSortConfig prefixSortConfig_; const common::SpillConfig* const spillConfig_; folly::Synchronized* const spillStats_; diff --git a/velox/exec/benchmarks/PrefixSortBenchmark.cpp b/velox/exec/benchmarks/PrefixSortBenchmark.cpp index 4edde03fa30b..9575fa201c4c 100644 --- a/velox/exec/benchmarks/PrefixSortBenchmark.cpp +++ b/velox/exec/benchmarks/PrefixSortBenchmark.cpp @@ -127,13 +127,13 @@ class TestCase { // You could config threshold, e.i. 0, to test prefix-sort for small // dateset. -static const PrefixSortConfig kDefaultSortConfig(1024, 100); +static const common::PrefixSortConfig kDefaultSortConfig(1024, 100); // For small dataset, in some test environments, if std-sort is defined in the // benchmark file, the test results may be strangely regressed. When the // threshold is particularly large, PrefixSort is actually std-sort, hence, we // can use this as std-sort benchmark base. -static const PrefixSortConfig kStdSortConfig( +static const common::PrefixSortConfig kStdSortConfig( 1024, std::numeric_limits::max()); diff --git a/velox/exec/tests/PrefixSortTest.cpp b/velox/exec/tests/PrefixSortTest.cpp index 6c42219d1181..13f3ccf96fbf 100644 --- a/velox/exec/tests/PrefixSortTest.cpp +++ b/velox/exec/tests/PrefixSortTest.cpp @@ -65,9 +65,10 @@ class PrefixSortTest : public exec::test::OperatorTestBase { pool_.get(), &rowContainer, compareFlags, - {1024, - // Set threshold to 0 to enable prefix-sort in small dataset. - 0}); + common::PrefixSortConfig{ + 1024, + // Set threshold to 0 to enable prefix-sort in small dataset. + 0}); // Extract data from the RowContainer in order. const RowVectorPtr actual = diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index 742fbe9764a5..2f837b91e3f8 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -65,6 +65,9 @@ class SortBufferTest : public OperatorTestBase { "none"); } + const velox::common::PrefixSortConfig prefixSortConfig_ = + velox::common::PrefixSortConfig{std::numeric_limits::max(), 130}; + const RowTypePtr inputType_ = ROW( {{"c0", BIGINT()}, {"c1", INTEGER()}, @@ -123,7 +126,8 @@ TEST_F(SortBufferTest, singleKey) { sortColumnIndices_, testData.sortCompareFlags, pool_.get(), - &nonReclaimableSection_); + &nonReclaimableSection_, + prefixSortConfig_); RowVectorPtr data = makeRowVector( {makeFlatVector({1, 2, 3, 4, 5}), @@ -153,7 +157,8 @@ TEST_F(SortBufferTest, multipleKeys) { sortColumnIndices_, sortCompareFlags_, pool_.get(), - &nonReclaimableSection_); + &nonReclaimableSection_, + prefixSortConfig_); RowVectorPtr data = makeRowVector( {makeFlatVector({1, 2, 3, 4, 5}), @@ -233,7 +238,8 @@ TEST_F(SortBufferTest, DISABLED_randomData) { testData.sortColumnIndices, testData.sortCompareFlags, pool_.get(), - &nonReclaimableSection_); + &nonReclaimableSection_, + prefixSortConfig_); const std::shared_ptr fuzzerPool = memory::memoryManager()->addLeafPool("VectorFuzzer"); @@ -307,6 +313,7 @@ TEST_F(SortBufferTest, batchOutput) { sortCompareFlags_, pool_.get(), &nonReclaimableSection_, + prefixSortConfig_, testData.triggerSpill ? &spillConfig : nullptr, &spillStats); ASSERT_EQ(sortBuffer->canSpill(), testData.triggerSpill); @@ -402,6 +409,7 @@ TEST_F(SortBufferTest, spill) { sortCompareFlags_, pool_.get(), &nonReclaimableSection_, + prefixSortConfig_, testData.spillEnabled ? &spillConfig : nullptr, &spillStats); @@ -463,6 +471,7 @@ TEST_F(SortBufferTest, emptySpill) { sortCompareFlags_, pool_.get(), &nonReclaimableSection_, + prefixSortConfig_, &spillConfig, &spillStats); diff --git a/velox/exec/tests/utils/PrefixSortUtils.h b/velox/exec/tests/utils/PrefixSortUtils.h new file mode 100644 index 000000000000..871eea3b226a --- /dev/null +++ b/velox/exec/tests/utils/PrefixSortUtils.h @@ -0,0 +1,29 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include "velox/common/base/PrefixSortConfig.h" + +namespace facebook::velox::exec::test { +/// PrefixSortConfig is obtained from QueryConfig in production code. In many +/// UTs etc. HiveConnector`s UT, there is no QueryConfig. This method returns a +/// default PrefixSortConfig for these cases. +inline velox::common::PrefixSortConfig defaultPrefixSortConfig() { + return velox::common::PrefixSortConfig{128, 130}; +} + +} // namespace facebook::velox::exec::test