From d7c9a5065feb0fca822f219841021fe16ed3ad7a Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Mon, 11 Nov 2024 18:26:20 -0800 Subject: [PATCH] Spill prefix sort related code cleanup plus test improvement (#11508) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11508 The followup is to add session property at Prestissimo to allow configure in Presto Reviewed By: tanjialiang Differential Revision: D65791663 fbshipit-source-id: 59c37c543aa763030968fe9dea94e6c6ad820175 --- velox/core/QueryConfig.h | 12 +++---- velox/docs/configs.rst | 6 ++-- velox/exec/Driver.cpp | 2 +- velox/exec/tests/SpillTest.cpp | 59 +++++++++++++++++++++++++++------- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 9e9846590867..7041126e2153 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -234,10 +234,10 @@ class QueryConfig { "spill_compression_codec"; /// Enable the prefix sort or fallback to timsort in spill. The prefix sort is - /// faster than timsort but requires the memory to build prefix data, which - /// may cause out of memory. - static constexpr const char* kSpillEnablePrefixSort = - "spill_enable_prefix_sort"; + /// faster than std::sort but requires the memory to build normalized prefix + /// keys, which might have potential risk of running out of server memory. + static constexpr const char* kSpillPrefixSortEnabled = + "spill_prefixsort_enabled"; /// Specifies spill write buffer size in bytes. The spiller tries to buffer /// serialized spill data up to the specified size before write to storage @@ -641,8 +641,8 @@ class QueryConfig { return get(kSpillCompressionKind, "none"); } - bool spillEnablePrefixSort() const { - return get(kSpillEnablePrefixSort, false); + bool spillPrefixSortEnabled() const { + return get(kSpillPrefixSortEnabled, false); } uint64_t spillWriteBufferSize() const { diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index e6fe264a7cac..d16dbb3ecef7 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -349,11 +349,11 @@ Spilling - Specifies the compression algorithm type to compress the spilled data before write to disk to trade CPU for IO efficiency. The supported compression codecs are: ZLIB, SNAPPY, LZO, ZSTD, LZ4 and GZIP. NONE means no compression. - * - spill_enable_prefix_sort + * - spill_prefixsort_enabled - bool - false - - Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than timsort but requires the - memory to build prefix data, which might have potential risk of running out of server memory. + - Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than std::sort but requires the + memory to build normalized prefix keys, which might have potential risk of running out of server memory. * - spiller_start_partition_bit - integer - 29 diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 9156672ea32e..9f2b8adfa7a9 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -141,7 +141,7 @@ std::optional DriverCtx::makeSpillConfig( queryConfig.maxSpillRunRows(), queryConfig.writerFlushThresholdBytes(), queryConfig.spillCompressionKind(), - queryConfig.spillEnablePrefixSort() + queryConfig.spillPrefixSortEnabled() ? std::optional(prefixSortConfig()) : std::nullopt, queryConfig.spillFileCreateConfig()); diff --git a/velox/exec/tests/SpillTest.cpp b/velox/exec/tests/SpillTest.cpp index af320c828b16..351dc462f481 100644 --- a/velox/exec/tests/SpillTest.cpp +++ b/velox/exec/tests/SpillTest.cpp @@ -54,15 +54,30 @@ class TestRuntimeStatWriter : public BaseRuntimeStatWriter { } // namespace struct TestParam { - common::CompressionKind compressionKind; - bool enablePrefixSort; + const common::CompressionKind compressionKind; + const bool enablePrefixSort; TestParam(common::CompressionKind _compressionKind, bool _enablePrefixSort) : compressionKind(_compressionKind), enablePrefixSort(_enablePrefixSort) {} + + TestParam(uint32_t value) + : compressionKind(static_cast(value >> 1)), + enablePrefixSort(!!(value & 1)) {} + + uint32_t value() const { + return static_cast(compressionKind) << 1 | enablePrefixSort; + } + + std::string toString() const { + return fmt::format( + "compressionKind: {}, enablePrefixSort: {}", + compressionKind, + enablePrefixSort); + } }; -class SpillTest : public ::testing::TestWithParam, +class SpillTest : public ::testing::TestWithParam, public facebook::velox::test::VectorTestBase { public: explicit SpillTest() @@ -75,13 +90,33 @@ class SpillTest : public ::testing::TestWithParam, setThreadLocalRunTimeStatWriter(nullptr); } - static std::vector getTestParams() { - std::vector testParams; - testParams.emplace_back(common::CompressionKind::CompressionKind_NONE); - testParams.emplace_back(common::CompressionKind::CompressionKind_ZLIB); - testParams.emplace_back(common::CompressionKind::CompressionKind_SNAPPY); - testParams.emplace_back(common::CompressionKind::CompressionKind_ZSTD); - testParams.emplace_back(common::CompressionKind::CompressionKind_LZ4); + static std::vector getTestParams() { + std::vector testParams; + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_NONE, false} + .value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_ZLIB, false} + .value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_SNAPPY, false} + .value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_ZSTD, false} + .value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_LZ4, false}.value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_NONE, true}.value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_ZLIB, true}.value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_SNAPPY, true} + .value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_ZSTD, true}.value()); + testParams.emplace_back( + TestParam{common::CompressionKind::CompressionKind_LZ4, true}.value()); return testParams; } @@ -103,8 +138,8 @@ class SpillTest : public ::testing::TestWithParam, tempDir_ = exec::test::TempDirectoryPath::create(); filesystems::registerLocalFileSystem(); rng_.seed(1); - compressionKind_ = GetParam(); - enablePrefixSort_ = true; + compressionKind_ = TestParam{GetParam()}.compressionKind; + enablePrefixSort_ = TestParam{GetParam()}.enablePrefixSort; } uint8_t randPartitionBitOffset() {