Skip to content

Commit

Permalink
feat: Support prefix comparator in spill merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Dec 11, 2024
1 parent 1bd480e commit 0031e27
Show file tree
Hide file tree
Showing 28 changed files with 606 additions and 72 deletions.
2 changes: 2 additions & 0 deletions velox/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ SpillConfig::SpillConfig(
uint64_t _maxFileSize,
uint64_t _writeBufferSize,
uint64_t _readBufferSize,
bool _mergePrefixComparatorEnabled,
folly::Executor* _executor,
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
Expand All @@ -44,6 +45,7 @@ SpillConfig::SpillConfig(
: _maxFileSize),
writeBufferSize(_writeBufferSize),
readBufferSize(_readBufferSize),
mergePrefixComparatorEnabled(_mergePrefixComparatorEnabled),
executor(_executor),
minSpillableReservationPct(_minSpillableReservationPct),
spillableReservationGrowthPct(_spillableReservationGrowthPct),
Expand Down
7 changes: 7 additions & 0 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct SpillConfig {
uint64_t _maxFileSize,
uint64_t _writeBufferSize,
uint64_t _readBufferSize,
bool _mergePrefixComparatorEnabled,
folly::Executor* _executor,
int32_t _minSpillableReservationPct,
int32_t _spillableReservationGrowthPct,
Expand Down Expand Up @@ -105,6 +106,12 @@ struct SpillConfig {
/// which doubles the buffer used to read from each spill file.
uint64_t readBufferSize;

/// Enable the prefix comparator for the spill merge ordered reader. The more
/// the number of sort keys, the faster the prefix comparator. But it requires
/// the memory to build normalized prefix keys, which might have potential
/// risk of running out of server memory.
bool mergePrefixComparatorEnabled;

/// Executor for spilling. If nullptr spilling writes on the Driver's thread.
folly::Executor* executor; // Not owned.

Expand Down
3 changes: 3 additions & 0 deletions velox/common/base/tests/SpillConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ TEST_P(SpillConfigTest, spillLevel) {
0,
0,
0,
true,
nullptr,
0,
0,
Expand Down Expand Up @@ -125,6 +126,7 @@ TEST_P(SpillConfigTest, spillLevelLimit) {
0,
0,
0,
true,
nullptr,
0,
0,
Expand Down Expand Up @@ -172,6 +174,7 @@ TEST_P(SpillConfigTest, spillableReservationPercentages) {
0,
0,
0,
true,
nullptr,
testData.minPct,
testData.growthPct,
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
0,
0,
/*readBufferSize=*/1 << 20,
true,
spillExecutor_.get(),
10,
20,
Expand Down
11 changes: 11 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ class QueryConfig {
static constexpr const char* kSpillPrefixSortEnabled =
"spill_prefixsort_enabled";

/// Enable the prefix comparator for the spill merge ordered reader. The more
/// the number of sort keys, the faster the prefix comparator. But it requires
/// the memory to build normalized prefix keys, which might have potential
/// risk of running out of server memory.
static constexpr const char* kSpillMergePrefixComparatorEnabled =
"spill_merge_prefix_comparator_enabled";

/// Specifies spill write buffer size in bytes. The spiller tries to buffer
/// serialized spill data up to the specified size before write to storage
/// underneath for io efficiency. If it is set to zero, then spill write
Expand Down Expand Up @@ -689,6 +696,10 @@ class QueryConfig {
return get<bool>(kSpillPrefixSortEnabled, false);
}

bool spillMergePrefixComparatorEnabled() const {
return get<bool>(kSpillMergePrefixComparatorEnabled, true);
}

uint64_t spillWriteBufferSize() const {
// The default write buffer size set to 1MB.
return get<uint64_t>(kSpillWriteBufferSize, 1L << 20);
Expand Down
5 changes: 5 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ Spilling
- false
- Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than std::sort but requires the
memory to build normalized prefix keys, which might have potential risk of running out of server memory.
* - spill_merge_prefix_comparator_enabled
- bool
- true
- Enable the prefix comparator for the spill merge ordered reader. The more the number of sort keys, the faster the prefix comparator.
But it requires the memory to build normalized prefix keys, which might have potential risk of running out of server memory.
* - spiller_start_partition_bit
- integer
- 29
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ class E2EWriterTest : public testing::Test {
0,
0,
0,
true,
nullptr,
minSpillableReservationPct,
spillableReservationGrowthPct,
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
queryConfig.maxSpillFileSize(),
queryConfig.spillWriteBufferSize(),
queryConfig.spillReadBufferSize(),
queryConfig.spillMergePrefixComparatorEnabled(),
task->queryCtx()->spillExecutor(),
queryConfig.minSpillableReservationPct(),
queryConfig.spillableReservationGrowthPct(),
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,10 @@ bool GroupingSet::prepareNextSpillPartitionOutput() {
VELOX_CHECK_NE(outputSpillPartition_, it->first.partitionNumber());
outputSpillPartition_ = it->first.partitionNumber();
merge_ = it->second->createOrderedReader(
spillConfig_->readBufferSize, &pool_, spillStats_);
spillConfig_->readBufferSize,
spillConfig_->mergePrefixComparatorEnabled,
&pool_,
spillStats_);
spillPartitionSet_.erase(it);
return true;
}
Expand Down
33 changes: 33 additions & 0 deletions velox/exec/IdentityProjection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "velox/vector/ComplexVector.h"

namespace facebook::velox::exec {

/// Represents a column that is copied from input to output, possibly
/// with cardinality change, i.e. values removed or duplicated.
struct IdentityProjection {
IdentityProjection(
column_index_t _inputChannel,
column_index_t _outputChannel)
: inputChannel(_inputChannel), outputChannel(_outputChannel) {}

column_index_t inputChannel;
column_index_t outputChannel;
};
} // namespace facebook::velox::exec
13 changes: 1 addition & 12 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,14 @@
#include "velox/common/time/CpuWallTimer.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/Driver.h"
#include "velox/exec/IdentityProjection.h"
#include "velox/exec/JoinBridge.h"
#include "velox/exec/OperatorTraceWriter.h"
#include "velox/exec/Spiller.h"
#include "velox/type/Filter.h"

namespace facebook::velox::exec {

/// Represents a column that is copied from input to output, possibly
/// with cardinality change, i.e. values removed or duplicated.
struct IdentityProjection {
IdentityProjection(
column_index_t _inputChannel,
column_index_t _outputChannel)
: inputChannel(_inputChannel), outputChannel(_outputChannel) {}

column_index_t inputChannel;
column_index_t outputChannel;
};

struct MemoryStats {
uint64_t userMemoryReservation{0};
uint64_t revocableMemoryReservation{0};
Expand Down
87 changes: 87 additions & 0 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,63 @@ namespace {
// to bitswap32.
static constexpr int32_t kAlignment = 8;

template <typename T>
FOLLY_ALWAYS_INLINE void encodeColumn(
const PrefixSortLayout& prefixSortLayout,
const DecodedVector& decoded,
vector_size_t row,
column_index_t col,
char* prefixBuffer) {
std::optional<T> value;
if (!decoded.mayHaveNulls()) {
value = decoded.valueAt<T>(row);
} else {
if (decoded.isNullAt(row)) {
value = std::nullopt;
} else {
value = decoded.valueAt<T>(row);
}
}
prefixSortLayout.encoders[col].encode(
value, prefixBuffer + prefixSortLayout.prefixOffsets[col]);
}

FOLLY_ALWAYS_INLINE void extractColumnToPrefix(
TypeKind typeKind,
const PrefixSortLayout& prefixSortLayout,
const DecodedVector& decoded,
vector_size_t row,
column_index_t col,
char* prefixBuffer) {
switch (typeKind) {
case TypeKind::SMALLINT:
return encodeColumn<int16_t>(
prefixSortLayout, decoded, row, col, prefixBuffer);
case TypeKind::INTEGER:
return encodeColumn<int32_t>(
prefixSortLayout, decoded, row, col, prefixBuffer);
case TypeKind::BIGINT:
return encodeColumn<int64_t>(
prefixSortLayout, decoded, row, col, prefixBuffer);
case TypeKind::REAL:
return encodeColumn<float>(
prefixSortLayout, decoded, row, col, prefixBuffer);
case TypeKind::DOUBLE:
return encodeColumn<double>(
prefixSortLayout, decoded, row, col, prefixBuffer);
case TypeKind::TIMESTAMP:
return encodeColumn<Timestamp>(
prefixSortLayout, decoded, row, col, prefixBuffer);
case TypeKind::HUGEINT:
return encodeColumn<int128_t>(
prefixSortLayout, decoded, row, col, prefixBuffer);
default:
VELOX_UNSUPPORTED(
"prefix-sort does not support type kind: {}",
mapTypeKindToName(typeKind));
}
}

template <typename T>
FOLLY_ALWAYS_INLINE void encodeRowColumn(
const PrefixSortLayout& prefixSortLayout,
Expand Down Expand Up @@ -206,6 +263,36 @@ void PrefixSortLayout::optimizeSortKeysOrder(
});
}

void VectorPrefixEncoder::encode(
const PrefixSortLayout& sortLayout,
const std::vector<TypePtr>& keyTypes,
const std::vector<DecodedVector>& decoded,
vector_size_t numRows,
char* prefixBuffer) {
VELOX_CHECK_EQ(decoded.size(), keyTypes.size());
VELOX_CHECK_EQ(decoded.size(), sortLayout.numNormalizedKeys);
for (auto i = 0; i < numRows; ++i) {
char* bufferForRow = prefixBuffer + i * sortLayout.normalizedBufferSize;
for (auto j = 0; j < keyTypes.size(); ++j) {
extractColumnToPrefix(
keyTypes[j]->kind(), sortLayout, decoded[j], i, j, bufferForRow);
}
simd::memset(
bufferForRow + sortLayout.normalizedBufferSize -
sortLayout.numPaddingBytes,
0,
sortLayout.numPaddingBytes);

// When comparing in std::memcmp, each byte is compared. If it is changed to
// compare every 8 bytes, the number of comparisons will be reduced and the
// performance will be improved.
// Use uint64_t compare to implement the above-mentioned comparison of every
// 8 bytes, assuming the system is little-endian, need to reverse bytes for
// every 8 bytes.
bitsSwapByWord((uint64_t*)bufferForRow, sortLayout.normalizedBufferSize);
}
}

FOLLY_ALWAYS_INLINE int PrefixSort::compareAllNormalizedKeys(
char* left,
char* right) {
Expand Down
32 changes: 21 additions & 11 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#pragma once

#include "velox/common/base/PrefixSortConfig.h"
#include "velox/exec/Operator.h"
#include "velox/exec/IdentityProjection.h"
#include "velox/exec/RowContainer.h"
#include "velox/exec/prefixsort/PrefixSortAlgorithm.h"
#include "velox/exec/prefixsort/PrefixSortEncoder.h"
Expand All @@ -32,37 +32,37 @@ namespace facebook::velox::exec {
struct PrefixSortLayout {
/// Number of bytes to store a prefix, it equals to:
/// normalizedKeySize_ + 8 (non-normalized-ptr) + 8(row address).
const uint64_t entrySize;
uint64_t entrySize;

/// If a sort key supports normalization and can be added to the prefix
/// sort buffer, it is called a normalized key.
const uint32_t normalizedBufferSize;
uint32_t normalizedBufferSize;

const uint32_t numNormalizedKeys;
uint32_t numNormalizedKeys;

/// The num of sort keys include normalized and non-normalized.
const uint32_t numKeys;
uint32_t numKeys;

/// CompareFlags of all sort keys.
const std::vector<CompareFlags> compareFlags;
std::vector<CompareFlags> compareFlags;

/// Whether the sort keys contains normalized key.
/// It equals to 'numNormalizedKeys != 0', a little faster.
const bool hasNormalizedKeys;
bool hasNormalizedKeys;

/// Whether the sort keys contains non-normalized key.
const bool hasNonNormalizedKey;
bool hasNonNormalizedKey;

/// Offsets of normalized keys, used to find write locations when
/// extracting columns
const std::vector<uint32_t> prefixOffsets;
std::vector<uint32_t> prefixOffsets;

/// The encoders for normalized keys.
const std::vector<prefixsort::PrefixSortEncoder> encoders;
std::vector<prefixsort::PrefixSortEncoder> encoders;

/// The number of padding bytes to align each prefix encoded row size to 8
/// for fast long compare.
const int32_t numPaddingBytes;
int32_t numPaddingBytes;

static PrefixSortLayout makeSortLayout(
const std::vector<TypePtr>& types,
Expand All @@ -81,6 +81,16 @@ struct PrefixSortLayout {
std::vector<IdentityProjection>& keyColumnProjections);
};

class VectorPrefixEncoder {
public:
static void encode(
const PrefixSortLayout& sortLayout,
const std::vector<TypePtr>& keyTypes,
const std::vector<DecodedVector>& decoded,
vector_size_t numRows,
char* prefixBuffer);
};

class PrefixSort {
public:
PrefixSort(
Expand Down
1 change: 0 additions & 1 deletion velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "velox/common/memory/MemoryAllocator.h"
#include "velox/core/PlanNode.h"
#include "velox/exec/ContainerRowSerde.h"
#include "velox/exec/Spill.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/VectorTypeUtils.h"

Expand Down
5 changes: 4 additions & 1 deletion velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,10 @@ void SortBuffer::prepareOutputWithSpill() {

VELOX_CHECK_EQ(spillPartitionSet_.size(), 1);
spillMerger_ = spillPartitionSet_.begin()->second->createOrderedReader(
spillConfig_->readBufferSize, pool(), spillStats_);
spillConfig_->readBufferSize,
spillConfig_->mergePrefixComparatorEnabled,
pool(),
spillStats_);
spillPartitionSet_.clear();
}

Expand Down
Loading

0 comments on commit 0031e27

Please sign in to comment.