Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prefix sort): Support string type key in prefix sort #11527

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions velox/common/base/PrefixSortConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ namespace facebook::velox::common {
struct PrefixSortConfig {
PrefixSortConfig() = default;

PrefixSortConfig(uint32_t _maxNormalizedKeyBytes, uint32_t _minNumRows)
PrefixSortConfig(
uint32_t _maxNormalizedKeyBytes,
uint32_t _minNumRows,
uint32_t _maxStringPrefixLength)
: maxNormalizedKeyBytes(_maxNormalizedKeyBytes),
minNumRows(_minNumRows) {}
minNumRows(_minNumRows),
maxStringPrefixLength(_maxStringPrefixLength) {}

/// Maximum bytes that can be used to store normalized keys in prefix-sort
/// buffer per entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes.
Expand All @@ -35,5 +39,9 @@ struct PrefixSortConfig {
/// Minimum number of rows to apply prefix sort. Prefix sort does not perform
/// with small datasets.
uint32_t minNumRows{128};

/// Maximum number of bytes to be stored in prefix-sort buffer for a string
/// column.
uint32_t maxStringPrefixLength{16};
};
} // namespace facebook::velox::common
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ class QueryConfig {
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";

/// Maximum number of bytes to be stored in prefix-sort buffer for a string
/// key.
static constexpr const char* kPrefixSortMaxStringPrefixLength =
"prefixsort_max_string_prefix_length";
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved

/// Enable query tracing flag.
static constexpr const char* kQueryTraceEnabled = "query_trace_enabled";

Expand Down Expand Up @@ -844,6 +849,10 @@ class QueryConfig {
return get<uint32_t>(kPrefixSortMinRows, 128);
}

uint32_t prefixSortMaxStringPrefixLength() const {
return get<uint32_t>(kPrefixSortMaxStringPrefixLength, 16);
}

double scaleWriterRebalanceMaxMemoryUsageRatio() const {
return get<double>(kScaleWriterRebalanceMaxMemoryUsageRatio, 0.7);
}
Expand Down
4 changes: 4 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ Generic Configuration
- integer
- 128
- Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking.
* - prefixsort_max_string_prefix_length
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved
- integer
- 16
- Byte length of the string prefix stored in the prefix-sort buffer. This doesn't include the null byte.

.. _expression-evaluation-conf:

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ struct DriverCtx {
common::PrefixSortConfig prefixSortConfig() const {
return common::PrefixSortConfig{
queryConfig().prefixSortNormalizedKeyMaxBytes(),
queryConfig().prefixSortMinRows()};
queryConfig().prefixSortMinRows(),
queryConfig().prefixSortMaxStringPrefixLength()};
}
};

Expand Down
57 changes: 45 additions & 12 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ FOLLY_ALWAYS_INLINE void encodeRowColumn(
value = *(reinterpret_cast<T*>(row + rowColumn.offset()));
}
prefixSortLayout.encoders[index].encode(
value, prefixBuffer + prefixSortLayout.prefixOffsets[index]);
value,
prefixBuffer + prefixSortLayout.prefixOffsets[index],
prefixSortLayout.encodeSizes[index]);
}

FOLLY_ALWAYS_INLINE void extractRowColumnToPrefix(
Expand Down Expand Up @@ -86,6 +88,13 @@ FOLLY_ALWAYS_INLINE void extractRowColumnToPrefix(
prefixSortLayout, index, rowColumn, row, prefixBuffer);
return;
}
case TypeKind::VARCHAR:
[[fallthrough]];
case TypeKind::VARBINARY: {
encodeRowColumn<StringView>(
prefixSortLayout, index, rowColumn, row, prefixBuffer);
return;
}
default:
VELOX_UNSUPPORTED(
"prefix-sort does not support type kind: {}",
Expand Down Expand Up @@ -127,31 +136,49 @@ compareByWord(uint64_t* left, uint64_t* right, int32_t bytes) {
} // namespace

// static.
PrefixSortLayout PrefixSortLayout::makeSortLayout(
PrefixSortLayout PrefixSortLayout::generate(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& compareFlags,
uint32_t maxNormalizedKeySize) {
uint32_t maxNormalizedKeySize,
uint32_t maxStringPrefixLength,
const std::vector<std::optional<uint32_t>>& maxStringLengths) {
const uint32_t numKeys = types.size();
std::vector<uint32_t> prefixOffsets;
prefixOffsets.reserve(numKeys);
std::vector<uint32_t> encodeSizes;
encodeSizes.reserve(numKeys);
std::vector<PrefixSortEncoder> encoders;
encoders.reserve(numKeys);

// Calculate encoders and prefix-offsets, and stop the loop if a key that
// cannot be normalized is encountered.
// cannot be normalized is encountered or only partial data of a key is
// normalized.
uint32_t normalizedKeySize{0};
uint32_t numNormalizedKeys{0};

bool lastPrefixKeyPartial{false};
for (auto i = 0; i < numKeys; ++i) {
const std::optional<uint32_t> encodedSize =
PrefixSortEncoder::encodedSize(types[i]->kind());
const std::optional<uint32_t> encodedSize = PrefixSortEncoder::encodedSize(
types[i]->kind(),
maxStringLengths[i].has_value()
? std::min(maxStringLengths[i].value(), maxStringPrefixLength)
: maxStringPrefixLength);
if (!encodedSize.has_value() ||
normalizedKeySize + encodedSize.value() > maxNormalizedKeySize) {
break;
}
prefixOffsets.push_back(normalizedKeySize);
encoders.push_back({compareFlags[i].ascending, compareFlags[i].nullsFirst});
encodeSizes.push_back(encodedSize.value());
normalizedKeySize += encodedSize.value();
++numNormalizedKeys;
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved
if ((types[i]->kind() == TypeKind::VARCHAR ||
types[i]->kind() == TypeKind::VARBINARY) &&
(!maxStringLengths[i].has_value() ||
maxStringPrefixLength < maxStringLengths[i].value())) {
lastPrefixKeyPartial = true;
break;
}
}

const auto numPaddingBytes = alignmentPadding(normalizedKeySize, kAlignment);
Expand All @@ -165,7 +192,10 @@ PrefixSortLayout PrefixSortLayout::makeSortLayout(
compareFlags,
numNormalizedKeys != 0,
numNormalizedKeys < numKeys,
/*nonPrefixSortStartIndex=*/
lastPrefixKeyPartial ? numNormalizedKeys - 1 : numNormalizedKeys,
std::move(prefixOffsets),
std::move(encodeSizes),
std::move(encoders),
numPaddingBytes};
}
Expand All @@ -177,8 +207,10 @@ void PrefixSortLayout::optimizeSortKeysOrder(
std::vector<std::optional<uint32_t>> encodedKeySizes(
rowType->size(), std::nullopt);
for (const auto& projection : keyColumnProjections) {
// Set maxStringPrefixLength to UINT_MAX - 1 to ensure VARCHAR columns are
// placed after all other supported types and before un-supported types.
encodedKeySizes[projection.inputChannel] = PrefixSortEncoder::encodedSize(
rowType->childAt(projection.inputChannel)->kind());
rowType->childAt(projection.inputChannel)->kind(), UINT_MAX - 1);
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved
}

std::sort(
Expand Down Expand Up @@ -222,7 +254,8 @@ int PrefixSort::comparePartNormalizedKeys(char* left, char* right) {
// If prefixes are equal, compare the remaining sort keys with rowContainer.
char* leftRow = getRowAddrFromPrefixBuffer(left);
char* rightRow = getRowAddrFromPrefixBuffer(right);
for (auto i = sortLayout_.numNormalizedKeys; i < sortLayout_.numKeys; ++i) {
for (auto i = sortLayout_.nonPrefixSortStartIndex; i < sortLayout_.numKeys;
++i) {
result = rowContainer_->compare(
leftRow, rightRow, i, sortLayout_.compareFlags[i]);
if (result != 0) {
Expand Down Expand Up @@ -276,9 +309,8 @@ uint32_t PrefixSort::maxRequiredBytes(
if (rowContainer->numRows() < config.minNumRows) {
return 0;
}
VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size());
const auto sortLayout = PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes);
const auto sortLayout =
generateSortLayout(rowContainer, compareFlags, config);
if (!sortLayout.hasNormalizedKeys) {
return 0;
}
Expand Down Expand Up @@ -346,7 +378,8 @@ void PrefixSort::sortInternal(
RuntimeCounter(
sortLayout_.numNormalizedKeys, RuntimeCounter::Unit::kNone));
}
if (sortLayout_.hasNonNormalizedKey) {
if (sortLayout_.hasNonNormalizedKey ||
sortLayout_.nonPrefixSortStartIndex < sortLayout_.numNormalizedKeys) {
sortRunner.quickSort(
prefixBufferStart, prefixBufferEnd, [&](char* lhs, char* rhs) {
return comparePartNormalizedKeys(lhs, rhs);
Expand Down
49 changes: 43 additions & 6 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
#pragma once

#include <optional>

#include "velox/common/base/PrefixSortConfig.h"
#include "velox/exec/Operator.h"
#include "velox/exec/RowContainer.h"
Expand Down Expand Up @@ -53,21 +55,31 @@ struct PrefixSortLayout {
/// Whether the sort keys contains non-normalized key.
const bool hasNonNormalizedKey;

/// Indicates the starting index for key comparison.
/// If the last key is only partially encoded in the prefix, start from
/// numNormalizedKeys - 1. Otherwise, start from numNormalizedKeys.
const uint32_t nonPrefixSortStartIndex;

/// Offsets of normalized keys, used to find write locations when
/// extracting columns
const std::vector<uint32_t> prefixOffsets;

/// Sizes of normalized keys.
const std::vector<uint32_t> encodeSizes;

/// The encoders for normalized keys.
const std::vector<prefixsort::PrefixSortEncoder> encoders;

/// The number of padding bytes to align each prefix encoded row size to 8
/// for fast long compare.
const int32_t numPaddingBytes;

static PrefixSortLayout makeSortLayout(
static PrefixSortLayout generate(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& compareFlags,
uint32_t maxNormalizedKeySize);
uint32_t maxNormalizedKeySize,
uint32_t maxStringPrefixLength,
const std::vector<std::optional<uint32_t>>& maxStringLengths);

/// Optimizes the order of sort key columns to maximize the number of prefix
/// sort keys for acceleration. This only applies for use case which doesn't
Expand Down Expand Up @@ -121,10 +133,8 @@ class PrefixSort {
stdSort(rows, rowContainer, compareFlags);
return;
}

VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size());
const auto sortLayout = PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes);
const auto sortLayout =
generateSortLayout(rowContainer, compareFlags, config);
// All keys can not normalize, skip the binary string compare opt.
// Putting this outside sort-internal helps with stdSort.
if (!sortLayout.hasNormalizedKeys) {
Expand Down Expand Up @@ -158,6 +168,33 @@ class PrefixSort {
const RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags);

FOLLY_ALWAYS_INLINE static PrefixSortLayout generateSortLayout(
const RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
const velox::common::PrefixSortConfig& config) {
const auto keyTypes = rowContainer->keyTypes();
VELOX_CHECK_EQ(keyTypes.size(), compareFlags.size());
std::vector<std::optional<uint32_t>> maxStringLengths;
maxStringLengths.reserve(keyTypes.size());
for (int i = 0; i < keyTypes.size(); ++i) {
std::optional<uint32_t> maxStringLength = std::nullopt;
if (keyTypes[i]->kind() == TypeKind::VARBINARY ||
keyTypes[i]->kind() == TypeKind::VARCHAR) {
const auto stats = rowContainer->columnStats(i);
if (stats.has_value()) {
maxStringLength = stats.value().maxBytes();
}
}
maxStringLengths.emplace_back(maxStringLength);
}
return PrefixSortLayout::generate(
keyTypes,
compareFlags,
config.maxNormalizedKeyBytes,
config.maxStringPrefixLength,
maxStringLengths);
}

// Estimates the memory required for prefix sort such as prefix buffer and
// swap buffer.
uint32_t maxRequiredBytes() const;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ void RowContainer::store(
decoded,
rows,
isKey,
offsets_[column]);
offsets_[column],
column);
} else {
const auto rowColumn = rowColumns_[column];
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ class RowContainer {
for (int32_t i = 0; i < rows.size(); ++i) {
storeWithNulls<Kind>(
decoded, i, isKey, rows[i], offset, nullByte, nullMask, column);
updateColumnStats(decoded, i, rows[i], column);
zhli1142015 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -1064,9 +1065,11 @@ class RowContainer {
const DecodedVector& decoded,
folly::Range<char**> rows,
bool isKey,
int32_t offset) {
int32_t offset,
int32_t column) {
for (int32_t i = 0; i < rows.size(); ++i) {
storeNoNulls<Kind>(decoded, i, isKey, rows[i], offset);
updateColumnStats(decoded, i, rows[i], column);
}
}

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ Window::Window(
pool(),
common::PrefixSortConfig{
driverCtx->queryConfig().prefixSortNormalizedKeyMaxBytes(),
driverCtx->queryConfig().prefixSortMinRows()},
driverCtx->queryConfig().prefixSortMinRows(),
driverCtx->queryConfig().prefixSortMaxStringPrefixLength()},
spillConfig,
&nonReclaimableSection_,
&spillStats_);
Expand Down
7 changes: 3 additions & 4 deletions velox/exec/benchmarks/PrefixSortBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,14 @@ class TestCase {

// You could config threshold, e.i. 0, to test prefix-sort for small
// dateset.
static const common::PrefixSortConfig kDefaultSortConfig(1024, 100);
static const common::PrefixSortConfig kDefaultSortConfig(1024, 100, 50);

// For small dataset, in some test environments, if std-sort is defined in the
// benchmark file, the test results may be strangely regressed. When the
// threshold is particularly large, PrefixSort is actually std-sort, hence, we
// can use this as std-sort benchmark base.
static const common::PrefixSortConfig kStdSortConfig(
1024,
std::numeric_limits<int>::max());
static const common::PrefixSortConfig
kStdSortConfig(1024, std::numeric_limits<int>::max(), 50);

class PrefixSortBenchmark {
public:
Expand Down
Loading
Loading