Skip to content

Commit

Permalink
fix: Optimize subscript and array/map filter in favor of memory
Browse files Browse the repository at this point in the history
Summary:
Currently, these functions wrap the underlying element vectors of the
map/array with a dictionary layer where the indices point to the
selected/remaining elements. If the element vector is large and only
a small subset of elements are selected, then this can create
counterproductive dictionaries where the base is much larger than the
dictionary. This can then result in large amounts of memory being
passed up the execution pipeline, holding onto memory, and
furthermore, expression eval can peel these vectors and operate on
the large bases that can end up creating intermediate vectors of
similar large size. This can put further memory pressure on queries,
causing them to hit their memory limits, resulting in failures or
spills.

This change ensures that the aforementioned functions that can
generate these kinds of results would instead flatten the elements
vector if the size of the dictionary is less than 1/8th the size of
the base (elements vector).

This helped reduce memory usage in the filterProject operator in a
particular query from 2.6GB to 380MB.

Differential Revision: D66253163
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Nov 20, 2024
1 parent c286451 commit e7a06a0
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 12 deletions.
20 changes: 18 additions & 2 deletions velox/functions/lib/SubscriptUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,16 @@ VectorPtr applyMapTyped(
baseMap->mapValues()->type(), rows.end(), context.pool());
}

// Subscript can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseMap->mapValues());
nullsBuilder.build(),
indices,
rows.end(),
baseMap->mapValues(),
true /*flattenIfRedundant*/);
}

VectorPtr applyMapComplexType(
Expand Down Expand Up @@ -294,8 +302,16 @@ VectorPtr applyMapComplexType(
baseMap->mapValues()->type(), rows.end(), context.pool());
}

// Subscript can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseMap->mapValues());
nullsBuilder.build(),
indices,
rows.end(),
baseMap->mapValues(),
true /*flattenIfRedundant*/);
}

} // namespace
Expand Down
10 changes: 9 additions & 1 deletion velox/functions/lib/SubscriptUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,16 @@ class SubscriptImpl : public exec::Subscript {
baseArray->elements()->type(), rows.end(), context.pool());
}

// Subscript can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseArray->elements());
nullsBuilder.build(),
indices,
rows.end(),
baseArray->elements(),
true /*flattenIfRedundant*/);
}

// Normalize indices from 1 or 0-based into always 0-based (according to
Expand Down
25 changes: 19 additions & 6 deletions velox/functions/prestosql/FilterFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,16 @@ class ArrayFilterFunction : public FilterFunctionBase {
resultSizes,
selectedIndices);

// Filter can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
auto wrappedElements = numSelected ? BaseVector::wrapInDictionary(
BufferPtr(nullptr),
std::move(selectedIndices),
numSelected,
std::move(elements))
std::move(elements),
true /*flattenIfRedundant*/)
: nullptr;
// Set nulls for rows not present in 'rows'.
BufferPtr newNulls = addNullsForUnselectedRows(flatArray, rows);
Expand Down Expand Up @@ -196,15 +201,23 @@ class MapFilterFunction : public FilterFunctionBase {
resultSizes,
selectedIndices);

auto wrappedKeys = numSelected
? BaseVector::wrapInDictionary(
BufferPtr(nullptr), selectedIndices, numSelected, std::move(keys))
: nullptr;
// Filter can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
auto wrappedKeys = numSelected ? BaseVector::wrapInDictionary(
BufferPtr(nullptr),
selectedIndices,
numSelected,
std::move(keys),
true /*flattenIfRedundant*/)
: nullptr;
auto wrappedValues = numSelected ? BaseVector::wrapInDictionary(
BufferPtr(nullptr),
selectedIndices,
numSelected,
std::move(values))
std::move(values),
true /*flattenIfRedundant*/)
: nullptr;
// Set nulls for rows not present in 'rows'.
BufferPtr newNulls = addNullsForUnselectedRows(flatMap, rows);
Expand Down
20 changes: 20 additions & 0 deletions velox/functions/prestosql/tests/ArrayFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,23 @@ TEST_F(ArrayFilterTest, try) {
{{{1, 2}}, std::nullopt, {{6, 7, 8, 9}}, {{10, 11, 12, 13}}});
assertEqualVectors(expected, result);
}

TEST_F(ArrayFilterTest, selectiveFilter) {
// Verify that a selective filter will ensure the underlying elements
// vector is flattened before generating the result which is otherwise wrapped
// in a dictionary with the filter results. This ensures large element
// vectors are not passed along.
auto data = makeRowVector({
makeArrayVector<int64_t>({
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
}),
});
auto result = evaluate<BaseVector>("filter(c0, x -> (x = 1))", data);
auto base = result->as<ArrayVector>()->elements();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT);

result = evaluate<BaseVector>("filter(c0, x -> (x < 4))", data);
base = result->as<ArrayVector>()->elements();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY);
}
48 changes: 48 additions & 0 deletions velox/functions/prestosql/tests/ElementAtTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1427,3 +1427,51 @@ TEST_F(ElementAtTest, timestampWithTimeZoneWithCaching) {
std::vector<int64_t>{pack(5, 10)}, TIMESTAMP_WITH_TIME_ZONE())});
testCaching({mapOfRowKeys, lookup}, makeConstant<int32_t>(5, 1));
}

TEST_F(ElementAtTest, highlySelective) {
// Verify that selecting a single element from a large array/map will ensure
// the underlying elements vector is flattened before generating the result
// which is otherwise wrapped in a dictionary with indices pointing to the
// selected subscript. This ensures large element vectors are not passed
// along.
vector_size_t vectorSize = 100;
auto sizeAtLarge = [](vector_size_t /* row */) { return 10; };
auto sizeAtSmall = [](vector_size_t /* row */) { return 5; };
auto keyAt = [](vector_size_t idx) { return idx; };
auto valueAt = [](vector_size_t /* idx */) { return 10; };
{
auto mapVector = makeMapVector<int64_t, int64_t>(
vectorSize, sizeAtLarge, keyAt, valueAt);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({mapVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT);
}

{
auto mapVector = makeMapVector<int64_t, int64_t>(
vectorSize, sizeAtSmall, keyAt, valueAt);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({mapVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY);
}

auto valueAtArray = [](vector_size_t /* row */, vector_size_t /* idx */) {
return 10;
};

{
auto arrayVector =
makeArrayVector<int64_t>(vectorSize, sizeAtLarge, valueAtArray);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({arrayVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT);
}

{
auto arrayVector =
makeArrayVector<int64_t>(vectorSize, sizeAtSmall, valueAtArray);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({arrayVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY);
}
}
38 changes: 38 additions & 0 deletions velox/functions/prestosql/tests/MapFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,41 @@ TEST_F(MapFilterTest, unknown) {
auto result = evaluate("map_filter(c0, (k, v) -> (v > 5))", data);
assertEqualVectors(data->childAt(0), result);
}

TEST_F(MapFilterTest, selectiveFilter) {
// Verify that a selective filter will ensure the underlying elements
// vector is flattened before generating the result which is otherwise wrapped
// in a dictionary with the filter results. This ensures large element
// vectors are not passed along.
auto data = makeRowVector({
makeMapVector<int64_t, int64_t>(
{{{1, 3},
{2, 3},
{3, 3},
{4, 3},
{5, 3},
{6, 3},
{7, 3},
{8, 3},
{9, 3},
{10, 3},
{11, 3},
{12, 3},
{13, 3},
{14, 3},
{15, 3},
{16, 3}}}),
});

auto result = evaluate("map_filter(c0, (k, v) -> (k = 1))", data);
auto base = result->as<MapVector>()->mapKeys();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT);
base = result->as<MapVector>()->mapValues();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT);

result = evaluate("map_filter(c0, (k, v) -> (k < 6))", data);
base = result->as<MapVector>()->mapKeys();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY);
base = result->as<MapVector>()->mapValues();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY);
}
20 changes: 18 additions & 2 deletions velox/vector/BaseVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ VectorPtr BaseVector::wrapInDictionary(
BufferPtr nulls,
BufferPtr indices,
vector_size_t size,
VectorPtr vector) {
VectorPtr vector,
bool flattenIfRedundant) {
// Dictionary that doesn't add nulls over constant is same as constant. Just
// make sure to adjust the size.
if (vector->encoding() == VectorEncoding::Simple::CONSTANT && !nulls) {
Expand All @@ -144,14 +145,29 @@ VectorPtr BaseVector::wrapInDictionary(
return BaseVector::wrapInConstant(size, 0, std::move(vector));
}

bool shouldFlatten = false;
if (flattenIfRedundant) {
auto base = vector;
while (base->encoding() == VectorEncoding::Simple::DICTIONARY) {
base = base->valueVector();
}
shouldFlatten = (base->size() / 8) > size;
}

auto kind = vector->typeKind();
return VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
auto result = VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
addDictionary,
kind,
std::move(nulls),
std::move(indices),
size,
std::move(vector));

if (shouldFlatten) {
BaseVector::flattenVector(result);
}

return result;
}

template <TypeKind kind>
Expand Down
9 changes: 8 additions & 1 deletion velox/vector/BaseVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,18 @@ class BaseVector {
vector_size_t size,
velox::memory::MemoryPool* pool);

/// Wraps the 'vector' in the provided dictionary encoding. If the input
/// vector is constant and the nulls buffer is empty, this method may return a
/// ConstantVector. Additionally, if 'flattenIfRedundant' is true, this method
/// may return a flattened version of the expected dictionary vector if
/// applying the dictionary encoding would result in a suboptimally encoded
/// vector.
static VectorPtr wrapInDictionary(
BufferPtr nulls,
BufferPtr indices,
vector_size_t size,
VectorPtr vector);
VectorPtr vector,
bool flattenIfRedundant = false);

static VectorPtr
wrapInSequence(BufferPtr lengths, vector_size_t size, VectorPtr vector);
Expand Down
32 changes: 32 additions & 0 deletions velox/vector/tests/VectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3901,6 +3901,38 @@ TEST_F(VectorTest, testOverSizedArray) {
EXPECT_THROW(BaseVector::flattenVector(constArray), VeloxUserError);
}

TEST_F(VectorTest, testFlatteningOfRedundantDictionary) {
// Verify that BaseVector::wrapInDictionary() API flattens the output if
// 'flattenIfRedundant' is set to true and the wrapped vector has a size less
// than 1/8 the size of the base.
auto vectorSize = 100;
auto flat =
makeFlatVector<int32_t>(vectorSize, [](auto /*row*/) { return 1; });
{
auto dictionarySize = vectorSize / 10;
auto indices = makeIndices(dictionarySize, [](auto i) { return i; });
auto wrapped =
BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY);

wrapped = BaseVector::wrapInDictionary(
nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::FLAT);
}

{
auto dictionarySize = vectorSize / 3;
auto indices = makeIndices(dictionarySize, [](auto i) { return i; });
auto wrapped =
BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY);

wrapped = BaseVector::wrapInDictionary(
nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY);
}
}

TEST_F(VectorTest, hasOverlappingRanges) {
auto test = [this](
vector_size_t length,
Expand Down

0 comments on commit e7a06a0

Please sign in to comment.