Skip to content

Commit

Permalink
Support string type key in prefix sort
Browse files Browse the repository at this point in the history
address comments

address comments

address comments

address comments

address comments

fix build

address comments

get string column max length from row container

fix ut

collect stats for orderBy op
  • Loading branch information
zhli1142015 committed Dec 7, 2024
1 parent e8a84d9 commit 0a54657
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 104 deletions.
12 changes: 10 additions & 2 deletions velox/common/base/PrefixSortConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ namespace facebook::velox::common {
struct PrefixSortConfig {
PrefixSortConfig() = default;

PrefixSortConfig(uint32_t _maxNormalizedKeyBytes, uint32_t _minNumRows)
PrefixSortConfig(
uint32_t _maxNormalizedKeyBytes,
uint32_t _minNumRows,
uint32_t _maxStringPrefixLength)
: maxNormalizedKeyBytes(_maxNormalizedKeyBytes),
minNumRows(_minNumRows) {}
minNumRows(_minNumRows),
maxStringPrefixLength(_maxStringPrefixLength) {}

/// Maximum bytes that can be used to store normalized keys in prefix-sort
/// buffer per entry. Same with QueryConfig kPrefixSortNormalizedKeyMaxBytes.
Expand All @@ -35,5 +39,9 @@ struct PrefixSortConfig {
/// Minimum number of rows to apply prefix sort. Prefix sort does not perform
/// with small datasets.
uint32_t minNumRows{128};

/// Max number of bytes to be stored in prefix-sort buffer for a string
/// column.
uint32_t maxStringPrefixLength{12};
};
} // namespace facebook::velox::common
9 changes: 9 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ class QueryConfig {
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";

/// Maximum number of bytes to be stored in prefix-sort buffer for a string
/// key.
static constexpr const char* kPrefixSortMaxStringPrefixLength =
"prefixsort_max_string_prefix_length";

/// Enable query tracing flag.
static constexpr const char* kQueryTraceEnabled = "query_trace_enabled";

Expand Down Expand Up @@ -844,6 +849,10 @@ class QueryConfig {
return get<uint32_t>(kPrefixSortMinRows, 128);
}

uint32_t prefixSortMaxStringPrefixLength() const {
return get<uint32_t>(kPrefixSortMaxStringPrefixLength, 12);
}

double scaleWriterRebalanceMaxMemoryUsageRatio() const {
return get<double>(kScaleWriterRebalanceMaxMemoryUsageRatio, 0.7);
}
Expand Down
4 changes: 4 additions & 0 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ Generic Configuration
- integer
- 128
- Minimum number of rows to use prefix-sort. The default value has been derived using micro-benchmarking.
* - prefixsort_max_string_prefix_length
- integer
- 12
- Byte length of the string prefix stored in the prefix-sort buffer. This doesn't include the null byte.

.. _expression-evaluation-conf:

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ struct DriverCtx {
common::PrefixSortConfig prefixSortConfig() const {
return common::PrefixSortConfig{
queryConfig().prefixSortNormalizedKeyMaxBytes(),
queryConfig().prefixSortMinRows()};
queryConfig().prefixSortMinRows(),
queryConfig().prefixSortMaxStringPrefixLength()};
}
};

Expand Down
57 changes: 45 additions & 12 deletions velox/exec/PrefixSort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ FOLLY_ALWAYS_INLINE void encodeRowColumn(
} else {
value = *(reinterpret_cast<T*>(row + rowColumn.offset()));
}
prefixSortLayout.encoders[index].encode(
value, prefixBuffer + prefixSortLayout.prefixOffsets[index]);
if constexpr (std::is_same_v<T, StringView>) {
prefixSortLayout.encoders[index].encode(
value,
prefixBuffer + prefixSortLayout.prefixOffsets[index],
prefixSortLayout.encodeSizes[index]);
} else {
prefixSortLayout.encoders[index].encode(
value, prefixBuffer + prefixSortLayout.prefixOffsets[index]);
}
}

FOLLY_ALWAYS_INLINE void extractRowColumnToPrefix(
Expand Down Expand Up @@ -86,6 +93,13 @@ FOLLY_ALWAYS_INLINE void extractRowColumnToPrefix(
prefixSortLayout, index, rowColumn, row, prefixBuffer);
return;
}
case TypeKind::VARCHAR:
[[fallthrough]];
case TypeKind::VARBINARY: {
encodeRowColumn<StringView>(
prefixSortLayout, index, rowColumn, row, prefixBuffer);
return;
}
default:
VELOX_UNSUPPORTED(
"prefix-sort does not support type kind: {}",
Expand Down Expand Up @@ -130,28 +144,42 @@ compareByWord(uint64_t* left, uint64_t* right, int32_t bytes) {
PrefixSortLayout PrefixSortLayout::makeSortLayout(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& compareFlags,
uint32_t maxNormalizedKeySize) {
uint32_t maxNormalizedKeySize,
uint32_t maxStringPrefixLength,
const std::vector<uint32_t>& maxStringLengths) {
const uint32_t numKeys = types.size();
std::vector<uint32_t> prefixOffsets;
prefixOffsets.reserve(numKeys);
std::vector<uint32_t> encodeSizes;
encodeSizes.reserve(numKeys);
std::vector<PrefixSortEncoder> encoders;
encoders.reserve(numKeys);

// Calculate encoders and prefix-offsets, and stop the loop if a key that
// cannot be normalized is encountered.
// cannot be normalized is encountered or only partial data of a key is
// normalized.
uint32_t normalizedKeySize{0};
uint32_t numNormalizedKeys{0};

bool lastKeyInPrefixIsPartial{false};
for (auto i = 0; i < numKeys; ++i) {
const std::optional<uint32_t> encodedSize =
PrefixSortEncoder::encodedSize(types[i]->kind());
const std::optional<uint32_t> encodedSize = PrefixSortEncoder::encodedSize(
types[i]->kind(), std::min(maxStringLengths[i], maxStringPrefixLength));
if (!encodedSize.has_value() ||
normalizedKeySize + encodedSize.value() > maxNormalizedKeySize) {
break;
}
prefixOffsets.push_back(normalizedKeySize);
encoders.push_back({compareFlags[i].ascending, compareFlags[i].nullsFirst});
encodeSizes.push_back(encodedSize.value());
normalizedKeySize += encodedSize.value();
++numNormalizedKeys;
if ((types[i]->kind() == TypeKind::VARCHAR ||
types[i]->kind() == TypeKind::VARBINARY) &&
maxStringPrefixLength < maxStringLengths[i]) {
lastKeyInPrefixIsPartial = true;
break;
}
}

const auto numPaddingBytes = alignmentPadding(normalizedKeySize, kAlignment);
Expand All @@ -165,7 +193,9 @@ PrefixSortLayout PrefixSortLayout::makeSortLayout(
compareFlags,
numNormalizedKeys != 0,
numNormalizedKeys < numKeys,
lastKeyInPrefixIsPartial ? numNormalizedKeys - 1 : numNormalizedKeys,
std::move(prefixOffsets),
std::move(encodeSizes),
std::move(encoders),
numPaddingBytes};
}
Expand All @@ -177,8 +207,10 @@ void PrefixSortLayout::optimizeSortKeysOrder(
std::vector<std::optional<uint32_t>> encodedKeySizes(
rowType->size(), std::nullopt);
for (const auto& projection : keyColumnProjections) {
// Set stringPrefixLength to UINT_MAX - 1 to ensure VARCHAR columns are
// processed after all other types.
encodedKeySizes[projection.inputChannel] = PrefixSortEncoder::encodedSize(
rowType->childAt(projection.inputChannel)->kind());
rowType->childAt(projection.inputChannel)->kind(), UINT_MAX - 1);
}

std::sort(
Expand Down Expand Up @@ -222,7 +254,8 @@ int PrefixSort::comparePartNormalizedKeys(char* left, char* right) {
// If prefixes are equal, compare the remaining sort keys with rowContainer.
char* leftRow = getRowAddrFromPrefixBuffer(left);
char* rightRow = getRowAddrFromPrefixBuffer(right);
for (auto i = sortLayout_.numNormalizedKeys; i < sortLayout_.numKeys; ++i) {
for (auto i = sortLayout_.comparisonStartIndex; i < sortLayout_.numKeys;
++i) {
result = rowContainer_->compare(
leftRow, rightRow, i, sortLayout_.compareFlags[i]);
if (result != 0) {
Expand Down Expand Up @@ -276,9 +309,8 @@ uint32_t PrefixSort::maxRequiredBytes(
if (rowContainer->numRows() < config.minNumRows) {
return 0;
}
VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size());
const auto sortLayout = PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes);
const auto sortLayout =
generateSortLayout(rowContainer, compareFlags, config);
if (!sortLayout.hasNormalizedKeys) {
return 0;
}
Expand Down Expand Up @@ -346,7 +378,8 @@ void PrefixSort::sortInternal(
RuntimeCounter(
sortLayout_.numNormalizedKeys, RuntimeCounter::Unit::kNone));
}
if (sortLayout_.hasNonNormalizedKey) {
if (sortLayout_.hasNonNormalizedKey ||
sortLayout_.comparisonStartIndex < sortLayout_.numNormalizedKeys) {
sortRunner.quickSort(
prefixBufferStart, prefixBufferEnd, [&](char* lhs, char* rhs) {
return comparePartNormalizedKeys(lhs, rhs);
Expand Down
45 changes: 40 additions & 5 deletions velox/exec/PrefixSort.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,18 @@ struct PrefixSortLayout {
/// Whether the sort keys contains non-normalized key.
const bool hasNonNormalizedKey;

/// Indicates the starting index for key comparison.
/// If the last key is only partially encoded in the prefix, start from
/// numNormalizedKeys - 1. Otherwise, start from numNormalizedKeys.
const uint32_t comparisonStartIndex;

/// Offsets of normalized keys, used to find write locations when
/// extracting columns
const std::vector<uint32_t> prefixOffsets;

/// Sizes of normalized keys.
const std::vector<uint32_t> encodeSizes;

/// The encoders for normalized keys.
const std::vector<prefixsort::PrefixSortEncoder> encoders;

Expand All @@ -67,7 +75,9 @@ struct PrefixSortLayout {
static PrefixSortLayout makeSortLayout(
const std::vector<TypePtr>& types,
const std::vector<CompareFlags>& compareFlags,
uint32_t maxNormalizedKeySize);
uint32_t maxNormalizedKeySize,
uint32_t maxStringPrefixLength,
const std::vector<uint32_t>& maxStringLengths);

/// Optimizes the order of sort key columns to maximize the number of prefix
/// sort keys for acceleration. This only applies for use case which doesn't
Expand Down Expand Up @@ -121,10 +131,8 @@ class PrefixSort {
stdSort(rows, rowContainer, compareFlags);
return;
}

VELOX_CHECK_EQ(rowContainer->keyTypes().size(), compareFlags.size());
const auto sortLayout = PrefixSortLayout::makeSortLayout(
rowContainer->keyTypes(), compareFlags, config.maxNormalizedKeyBytes);
const auto sortLayout =
generateSortLayout(rowContainer, compareFlags, config);
// All keys can not normalize, skip the binary string compare opt.
// Putting this outside sort-internal helps with stdSort.
if (!sortLayout.hasNormalizedKeys) {
Expand Down Expand Up @@ -158,6 +166,33 @@ class PrefixSort {
const RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags);

FOLLY_ALWAYS_INLINE static PrefixSortLayout generateSortLayout(
const RowContainer* rowContainer,
const std::vector<CompareFlags>& compareFlags,
const velox::common::PrefixSortConfig& config) {
const auto keyTypes = rowContainer->keyTypes();
VELOX_CHECK_EQ(keyTypes.size(), compareFlags.size());
std::vector<uint32_t> maxStringLengths;
maxStringLengths.reserve(keyTypes.size());
for (int i = 0; i < keyTypes.size(); ++i) {
auto maxPrefixLength = config.maxStringPrefixLength;
if (keyTypes[i]->kind() == TypeKind::VARBINARY ||
keyTypes[i]->kind() == TypeKind::VARCHAR) {
const auto stats = rowContainer->columnStats(i);
maxPrefixLength = stats.has_value() && stats.value().maxBytes() > 0
? stats.value().maxBytes()
: UINT_MAX;
}
maxStringLengths.emplace_back(maxPrefixLength);
}
return PrefixSortLayout::makeSortLayout(
keyTypes,
compareFlags,
config.maxNormalizedKeyBytes,
config.maxStringPrefixLength,
maxStringLengths);
}

// Estimates the memory required for prefix sort such as prefix buffer and
// swap buffer.
uint32_t maxRequiredBytes() const;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/RowContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ void RowContainer::store(
decoded,
rows,
isKey,
offsets_[column]);
offsets_[column],
column);
} else {
const auto rowColumn = rowColumns_[column];
VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/RowContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ class RowContainer {
for (int32_t i = 0; i < rows.size(); ++i) {
storeWithNulls<Kind>(
decoded, i, isKey, rows[i], offset, nullByte, nullMask, column);
updateColumnStats(decoded, i, rows[i], column);
}
}

Expand All @@ -1064,9 +1065,11 @@ class RowContainer {
const DecodedVector& decoded,
folly::Range<char**> rows,
bool isKey,
int32_t offset) {
int32_t offset,
int32_t column) {
for (int32_t i = 0; i < rows.size(); ++i) {
storeNoNulls<Kind>(decoded, i, isKey, rows[i], offset);
updateColumnStats(decoded, i, rows[i], column);
}
}

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ Window::Window(
pool(),
common::PrefixSortConfig{
driverCtx->queryConfig().prefixSortNormalizedKeyMaxBytes(),
driverCtx->queryConfig().prefixSortMinRows()},
driverCtx->queryConfig().prefixSortMinRows(),
driverCtx->queryConfig().prefixSortMaxStringPrefixLength()},
spillConfig,
&nonReclaimableSection_,
&spillStats_);
Expand Down
7 changes: 3 additions & 4 deletions velox/exec/benchmarks/PrefixSortBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,14 @@ class TestCase {

// You could config threshold, e.i. 0, to test prefix-sort for small
// dateset.
static const common::PrefixSortConfig kDefaultSortConfig(1024, 100);
static const common::PrefixSortConfig kDefaultSortConfig(1024, 100, 50);

// For small dataset, in some test environments, if std-sort is defined in the
// benchmark file, the test results may be strangely regressed. When the
// threshold is particularly large, PrefixSort is actually std-sort, hence, we
// can use this as std-sort benchmark base.
static const common::PrefixSortConfig kStdSortConfig(
1024,
std::numeric_limits<int>::max());
static const common::PrefixSortConfig
kStdSortConfig(1024, std::numeric_limits<int>::max(), 50);

class PrefixSortBenchmark {
public:
Expand Down
Loading

0 comments on commit 0a54657

Please sign in to comment.