Skip to content

Commit

Permalink
Enable prefixsort in OrderBy and TableWrite (#9541)
Browse files Browse the repository at this point in the history
Summary:
Apply PrefixSort optimization to SortBuffer.

Add 2 configuration properties:
- prefixsort_normalized_key_max_bytes to limit the size of normalized ke. The default value is 128 bytes (2 cache lines).
- prefixsort_min_rows to limit the minimum number of rows to apply PrefixSort. Default is 130 based on a benchmark.

Pull Request resolved: #9541

Reviewed By: Yuhta

Differential Revision: D59122372

Pulled By: bikramSingh91

fbshipit-source-id: 77c76e2352c663465f575abfc83571c3634c8439
  • Loading branch information
skadilover authored and facebook-github-bot committed Jul 5, 2024
1 parent 0201d9a commit b9268a6
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 45 deletions.
37 changes: 37 additions & 0 deletions velox/common/base/PrefixSortConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <stdint.h>

namespace facebook::velox::common {

/// Specifies the config for prefix-sort.
struct PrefixSortConfig {
explicit PrefixSortConfig(
int64_t maxNormalizedKeySize,
int32_t threshold = 130)
: maxNormalizedKeySize(maxNormalizedKeySize), threshold(threshold) {}

/// Max number of bytes can store normalized keys in prefix-sort buffer per
/// entry.
const int64_t maxNormalizedKeySize;

/// PrefixSort will have performance regression when the dateset is too small.
const int32_t threshold;
};
} // namespace facebook::velox::common
8 changes: 8 additions & 0 deletions velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "folly/CancellationToken.h"
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/PrefixSortConfig.h"
#include "velox/common/base/RuntimeMetrics.h"
#include "velox/common/base/SpillConfig.h"
#include "velox/common/base/SpillStats.h"
Expand Down Expand Up @@ -255,6 +256,7 @@ class ConnectorQueryCtx {
memory::MemoryPool* connectorPool,
const Config* sessionProperties,
const common::SpillConfig* spillConfig,
common::PrefixSortConfig prefixSortConfig,
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator,
cache::AsyncDataCache* cache,
const std::string& queryId,
Expand All @@ -267,6 +269,7 @@ class ConnectorQueryCtx {
connectorPool_(connectorPool),
sessionProperties_(sessionProperties),
spillConfig_(spillConfig),
prefixSortConfig_(prefixSortConfig),
expressionEvaluator_(std::move(expressionEvaluator)),
cache_(cache),
scanId_(fmt::format("{}.{}", taskId, planNodeId)),
Expand Down Expand Up @@ -300,6 +303,10 @@ class ConnectorQueryCtx {
return spillConfig_;
}

const common::PrefixSortConfig& prefixSortConfig() const {
return prefixSortConfig_;
}

core::ExpressionEvaluator* expressionEvaluator() const {
return expressionEvaluator_.get();
}
Expand Down Expand Up @@ -349,6 +356,7 @@ class ConnectorQueryCtx {
memory::MemoryPool* const connectorPool_;
const Config* const sessionProperties_;
const common::SpillConfig* const spillConfig_;
const common::PrefixSortConfig prefixSortConfig_;
std::unique_ptr<core::ExpressionEvaluator> expressionEvaluator_;
cache::AsyncDataCache* cache_;
const std::string scanId_;
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ HiveDataSink::maybeCreateBucketSortWriter(
sortCompareFlags_,
sortPool,
writerInfo_.back()->nonReclaimableSectionHolder.get(),
connectorQueryCtx_->prefixSortConfig(),
spillConfig_,
writerInfo_.back()->spillStats.get());
return std::make_unique<dwio::common::SortingWriter>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

#include "velox/connectors/hive/iceberg/tests/IcebergSplitReaderBenchmark.h"

#include <filesystem>
#include "velox/exec/tests/utils/PrefixSortUtils.h"

using namespace facebook::velox;
using namespace facebook::velox::dwio;
Expand Down Expand Up @@ -295,6 +295,7 @@ void IcebergSplitReaderBenchmark::readSingleColumn(
connectorPool.get(),
connectorSessionProperties_.get(),
nullptr,
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.IcebergSplitReader",
Expand Down
7 changes: 4 additions & 3 deletions velox/connectors/hive/tests/HiveConnectorUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
* limitations under the License.
*/

#include "velox/connectors/hive/HiveConnectorUtil.h"
#include <gtest/gtest.h>
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"

#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/HiveConnectorUtil.h"
#include "velox/connectors/hive/TableHandle.h"
#include "velox/core/Config.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PrefixSortUtils.h"

namespace facebook::velox::connector {

Expand All @@ -48,6 +48,7 @@ TEST_F(HiveConnectorUtilTest, configureReaderOptions) {
pool_.get(),
&sessionProperties,
nullptr,
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.HiveConnectorUtilTest",
Expand Down
7 changes: 7 additions & 0 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/dwio/common/Options.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/PrefixSortUtils.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

Expand Down Expand Up @@ -117,6 +118,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
connectorPool_.get(),
connectorSessionProperties_.get(),
nullptr,
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.HiveDataSinkTest",
Expand Down Expand Up @@ -702,6 +704,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, memoryReclaim) {
connectorPool_.get(),
connectorSessionProperties_.get(),
spillConfig.get(),
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.HiveDataSinkTest",
Expand All @@ -716,6 +719,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, memoryReclaim) {
connectorPool_.get(),
connectorSessionProperties_.get(),
nullptr,
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.HiveDataSinkTest",
Expand Down Expand Up @@ -841,6 +845,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) {
connectorPool_.get(),
connectorSessionProperties_.get(),
spillConfig.get(),
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.HiveDataSinkTest",
Expand All @@ -855,6 +860,7 @@ TEST_F(HiveDataSinkTest, memoryReclaimAfterClose) {
connectorPool_.get(),
connectorSessionProperties_.get(),
nullptr,
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.HiveDataSinkTest",
Expand Down Expand Up @@ -935,6 +941,7 @@ DEBUG_ONLY_TEST_F(HiveDataSinkTest, sortWriterFailureTest) {
connectorPool_.get(),
connectorSessionProperties_.get(),
spillConfig.get(),
exec::test::defaultPrefixSortConfig(),
nullptr,
nullptr,
"query.HiveDataSinkTest",
Expand Down
17 changes: 17 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ class QueryConfig {
static constexpr const char* kDriverCpuTimeSliceLimitMs =
"driver_cpu_time_slice_limit_ms";

/// Maximum number of bytes to use for the normalized key in prefix-sort. Use
/// 0 to disable prefix-sort.
static constexpr const char* kPrefixSortNormalizedKeyMaxBytes =
"prefixsort_normalized_key_max_bytes";

/// Minimum number of rows to use prefix-sort. The default value has been
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";

uint64_t queryMaxMemoryPerNode() const {
return toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"), CapacityUnit::BYTE);
Expand Down Expand Up @@ -711,6 +720,14 @@ class QueryConfig {
return get<uint32_t>(kDriverCpuTimeSliceLimitMs, 0);
}

int64_t prefixSortNormalizedKeyMaxBytes() const {
return get<int64_t>(kPrefixSortNormalizedKeyMaxBytes, 128);
}

int32_t prefixSortMinRows() const {
return get<int32_t>(kPrefixSortMinRows, 130);
}

template <typename T>
T get(const std::string& key, const T& defaultValue) const {
return config_->get<T>(key, defaultValue);
Expand Down
8 changes: 8 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ Generic Configuration
- 0
- If it is not zero, specifies the time limit that a driver can continuously
run on a thread before yield. If it is zero, then it no limit.
* - prefixsort_normalized_key_max_bytes
- integer
- 128
- Maximum number of bytes to use for the normalized key in prefix-sort. Use 0 to disable prefix-sort.
* - prefixsort_min_rows
- integer
- 130
- Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking.

.. _expression-evaluation-conf:

Expand Down
6 changes: 6 additions & 0 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ struct DriverCtx {

/// Builds the spill config for the operator with specified 'operatorId'.
std::optional<common::SpillConfig> makeSpillConfig(int32_t operatorId) const;

common::PrefixSortConfig prefixSortConfig() const {
return common::PrefixSortConfig{
queryConfig().prefixSortNormalizedKeyMaxBytes(),
queryConfig().prefixSortMinRows()};
}
};

constexpr const char* kOpMethodNone = "";
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ OperatorCtx::createConnectorQueryCtx(
connectorPool,
task->queryCtx()->connectorSessionProperties(connectorId),
spillConfig,
driverCtx_->prefixSortConfig(),
std::make_unique<SimpleExpressionEvaluator>(
execCtx()->queryCtx(), execCtx()->pool()),
task->queryCtx()->cache(),
Expand Down
1 change: 1 addition & 0 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ OrderBy::OrderBy(
sortCompareFlags,
pool(),
&nonReclaimableSection_,
driverCtx->prefixSortConfig(),
spillConfig_.has_value() ? &(spillConfig_.value()) : nullptr,
&spillStats_);
}
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,6 @@ int PrefixSort::comparePartNormalizedKeys(char* left, char* right) {
PrefixSort::PrefixSort(
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& keyCompareFlags,
const PrefixSortConfig& config,
const PrefixSortLayout& sortLayout)
: pool_(pool), sortLayout_(sortLayout), rowContainer_(rowContainer) {}

Expand Down
21 changes: 3 additions & 18 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include "velox/common/base/PrefixSortConfig.h"
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/exec/RowContainer.h"
#include "velox/exec/prefixsort/PrefixSortAlgorithm.h"
Expand All @@ -41,20 +42,6 @@ FOLLY_ALWAYS_INLINE void stdSort(
}
}; // namespace detail

struct PrefixSortConfig {
PrefixSortConfig(uint32_t maxNormalizedKeySize, uint32_t threshold = 130)
: maxNormalizedKeySize(maxNormalizedKeySize), threshold(threshold) {}

/// Max number of bytes can store normalized keys in prefix-sort buffer per
/// entry.
const uint32_t maxNormalizedKeySize;

/// PrefixSort will have performance regression when the dateset is too small.
/// The threshold is set to 100 according to the benchmark test results by
/// default.
const int64_t threshold;
};

/// The layout of prefix-sort buffer, a prefix entry includes:
/// 1. normalized keys
/// 2. non-normalized data ptr for semi-normalized types such as
Expand Down Expand Up @@ -107,8 +94,6 @@ class PrefixSort {
PrefixSort(
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& keyCompareFlags,
const PrefixSortConfig& config,
const PrefixSortLayout& sortLayout);

/// Follow the steps below to sort the data in RowContainer:
Expand Down Expand Up @@ -139,7 +124,7 @@ class PrefixSort {
memory::MemoryPool* pool,
RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
const PrefixSortConfig& config) {
const velox::common::PrefixSortConfig& config) {
if (rowContainer->numRows() < config.threshold) {
detail::stdSort(rows, rowContainer, compareFlags);
return;
Expand All @@ -154,7 +139,7 @@ class PrefixSort {
return;
}

PrefixSort prefixSort(pool, rowContainer, compareFlags, config, sortLayout);
PrefixSort prefixSort(pool, rowContainer, sortLayout);
prefixSort.sortInternal(rows);
}

Expand Down
17 changes: 4 additions & 13 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ SortBuffer::SortBuffer(
const std::vector<CompareFlags>& sortCompareFlags,
velox::memory::MemoryPool* pool,
tsan_atomic<bool>* nonReclaimableSection,
common::PrefixSortConfig prefixSortConfig,
const common::SpillConfig* spillConfig,
folly::Synchronized<velox::common::SpillStats>* spillStats)
: input_(input),
sortCompareFlags_(sortCompareFlags),
pool_(pool),
nonReclaimableSection_(nonReclaimableSection),
prefixSortConfig_(prefixSortConfig),
spillConfig_(spillConfig),
spillStats_(spillStats) {
VELOX_CHECK_GE(input_->size(), sortCompareFlags_.size());
Expand Down Expand Up @@ -112,19 +114,8 @@ void SortBuffer::noMoreInput() {
sortedRows_.resize(numInputRows_);
RowContainerIterator iter;
data_->listRows(&iter, numInputRows_, sortedRows_.data());
std::sort(
sortedRows_.begin(),
sortedRows_.end(),
[this](const char* leftRow, const char* rightRow) {
for (vector_size_t index = 0; index < sortCompareFlags_.size();
++index) {
if (auto result = data_->compare(
leftRow, rightRow, index, sortCompareFlags_[index])) {
return result < 0;
}
}
return false;
});
PrefixSort::sort(
sortedRows_, pool_, data_.get(), sortCompareFlags_, prefixSortConfig_);
} else {
// Spill the remaining in-memory state to disk if spilling has been
// triggered on this sort buffer. This is to simplify query OOM prevention
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/exec/ContainerRowSerde.h"
#include "velox/exec/Operator.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/PrefixSort.h"
#include "velox/exec/RowContainer.h"
#include "velox/exec/Spill.h"
#include "velox/vector/BaseVector.h"
Expand All @@ -36,6 +37,7 @@ class SortBuffer {
const std::vector<CompareFlags>& sortCompareFlags,
velox::memory::MemoryPool* pool,
tsan_atomic<bool>* nonReclaimableSection,
common::PrefixSortConfig prefixSortConfig,
const common::SpillConfig* spillConfig = nullptr,
folly::Synchronized<velox::common::SpillStats>* spillStats = nullptr);

Expand Down Expand Up @@ -87,6 +89,8 @@ class SortBuffer {
// TableWriter to indicate if this sort buffer object is under non-reclaimable
// execution section or not.
tsan_atomic<bool>* const nonReclaimableSection_;
// Configuration settings for prefix-sort.
const common::PrefixSortConfig prefixSortConfig_;
const common::SpillConfig* const spillConfig_;
folly::Synchronized<common::SpillStats>* const spillStats_;

Expand Down
Loading

0 comments on commit b9268a6

Please sign in to comment.