Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Optimize subscript and array/map filter in favor of memory #11608

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions velox/functions/lib/SubscriptUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,16 @@ VectorPtr applyMapTyped(
baseMap->mapValues()->type(), rows.end(), context.pool());
}

// Subscript can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseMap->mapValues());
nullsBuilder.build(),
indices,
rows.end(),
baseMap->mapValues(),
true /*flattenIfRedundant*/);
}

VectorPtr applyMapComplexType(
Expand Down Expand Up @@ -294,8 +302,16 @@ VectorPtr applyMapComplexType(
baseMap->mapValues()->type(), rows.end(), context.pool());
}

// Subscript can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseMap->mapValues());
nullsBuilder.build(),
indices,
rows.end(),
baseMap->mapValues(),
true /*flattenIfRedundant*/);
}

} // namespace
Expand Down
10 changes: 9 additions & 1 deletion velox/functions/lib/SubscriptUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,16 @@ class SubscriptImpl : public exec::Subscript {
baseArray->elements()->type(), rows.end(), context.pool());
}

// Subscript can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
return BaseVector::wrapInDictionary(
nullsBuilder.build(), indices, rows.end(), baseArray->elements());
nullsBuilder.build(),
indices,
rows.end(),
baseArray->elements(),
true /*flattenIfRedundant*/);
}

// Normalize indices from 1 or 0-based into always 0-based (according to
Expand Down
25 changes: 19 additions & 6 deletions velox/functions/prestosql/FilterFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,16 @@ class ArrayFilterFunction : public FilterFunctionBase {
resultSizes,
selectedIndices);

// Filter can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
auto wrappedElements = numSelected ? BaseVector::wrapInDictionary(
BufferPtr(nullptr),
std::move(selectedIndices),
numSelected,
std::move(elements))
std::move(elements),
true /*flattenIfRedundant*/)
: nullptr;
// Set nulls for rows not present in 'rows'.
BufferPtr newNulls = addNullsForUnselectedRows(flatArray, rows);
Expand Down Expand Up @@ -196,15 +201,23 @@ class MapFilterFunction : public FilterFunctionBase {
resultSizes,
selectedIndices);

auto wrappedKeys = numSelected
? BaseVector::wrapInDictionary(
BufferPtr(nullptr), selectedIndices, numSelected, std::move(keys))
: nullptr;
// Filter can pass along very large elements vectors that can hold onto
// memory and copy operations on them can further put memory pressure. We
// try to flatten them if the dictionary layer is much smaller than the
// elements vector.
auto wrappedKeys = numSelected ? BaseVector::wrapInDictionary(
BufferPtr(nullptr),
selectedIndices,
numSelected,
std::move(keys),
true /*flattenIfRedundant*/)
: nullptr;
auto wrappedValues = numSelected ? BaseVector::wrapInDictionary(
BufferPtr(nullptr),
selectedIndices,
numSelected,
std::move(values))
std::move(values),
true /*flattenIfRedundant*/)
: nullptr;
// Set nulls for rows not present in 'rows'.
BufferPtr newNulls = addNullsForUnselectedRows(flatMap, rows);
Expand Down
20 changes: 20 additions & 0 deletions velox/functions/prestosql/tests/ArrayFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,23 @@ TEST_F(ArrayFilterTest, try) {
{{{1, 2}}, std::nullopt, {{6, 7, 8, 9}}, {{10, 11, 12, 13}}});
assertEqualVectors(expected, result);
}

TEST_F(ArrayFilterTest, selectiveFilter) {
// Verify that a selective filter will ensure the underlying elements
// vector is flattened before generating the result which is otherwise wrapped
// in a dictionary with the filter results. This ensures large element
// vectors are not passed along.
auto data = makeRowVector({
makeArrayVector<int64_t>({
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
}),
});
auto result = evaluate<BaseVector>("filter(c0, x -> (x = 1))", data);
auto base = result->as<ArrayVector>()->elements();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT);

result = evaluate<BaseVector>("filter(c0, x -> (x < 4))", data);
base = result->as<ArrayVector>()->elements();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY);
}
48 changes: 48 additions & 0 deletions velox/functions/prestosql/tests/ElementAtTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1427,3 +1427,51 @@ TEST_F(ElementAtTest, timestampWithTimeZoneWithCaching) {
std::vector<int64_t>{pack(5, 10)}, TIMESTAMP_WITH_TIME_ZONE())});
testCaching({mapOfRowKeys, lookup}, makeConstant<int32_t>(5, 1));
}

TEST_F(ElementAtTest, highlySelective) {
// Verify that selecting a single element from a large array/map will ensure
// the underlying elements vector is flattened before generating the result
// which is otherwise wrapped in a dictionary with indices pointing to the
// selected subscript. This ensures large element vectors are not passed
// along.
vector_size_t vectorSize = 100;
auto sizeAtLarge = [](vector_size_t /* row */) { return 10; };
auto sizeAtSmall = [](vector_size_t /* row */) { return 5; };
auto keyAt = [](vector_size_t idx) { return idx; };
auto valueAt = [](vector_size_t /* idx */) { return 10; };
{
auto mapVector = makeMapVector<int64_t, int64_t>(
vectorSize, sizeAtLarge, keyAt, valueAt);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({mapVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT);
}

{
auto mapVector = makeMapVector<int64_t, int64_t>(
vectorSize, sizeAtSmall, keyAt, valueAt);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({mapVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY);
}

auto valueAtArray = [](vector_size_t /* row */, vector_size_t /* idx */) {
return 10;
};

{
auto arrayVector =
makeArrayVector<int64_t>(vectorSize, sizeAtLarge, valueAtArray);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({arrayVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT);
}

{
auto arrayVector =
makeArrayVector<int64_t>(vectorSize, sizeAtSmall, valueAtArray);
auto result = evaluate<SimpleVector<int64_t>>(
"element_at(C0, 3)", makeRowVector({arrayVector}));
EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY);
}
}
38 changes: 38 additions & 0 deletions velox/functions/prestosql/tests/MapFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,41 @@ TEST_F(MapFilterTest, unknown) {
auto result = evaluate("map_filter(c0, (k, v) -> (v > 5))", data);
assertEqualVectors(data->childAt(0), result);
}

TEST_F(MapFilterTest, selectiveFilter) {
// Verify that a selective filter will ensure the underlying elements
// vector is flattened before generating the result which is otherwise wrapped
// in a dictionary with the filter results. This ensures large element
// vectors are not passed along.
auto data = makeRowVector({
makeMapVector<int64_t, int64_t>(
{{{1, 3},
{2, 3},
{3, 3},
{4, 3},
{5, 3},
{6, 3},
{7, 3},
{8, 3},
{9, 3},
{10, 3},
{11, 3},
{12, 3},
{13, 3},
{14, 3},
{15, 3},
{16, 3}}}),
});

auto result = evaluate("map_filter(c0, (k, v) -> (k = 1))", data);
auto base = result->as<MapVector>()->mapKeys();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT);
base = result->as<MapVector>()->mapValues();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT);

result = evaluate("map_filter(c0, (k, v) -> (k < 6))", data);
base = result->as<MapVector>()->mapKeys();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY);
base = result->as<MapVector>()->mapValues();
EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY);
}
20 changes: 18 additions & 2 deletions velox/vector/BaseVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ VectorPtr BaseVector::wrapInDictionary(
BufferPtr nulls,
BufferPtr indices,
vector_size_t size,
VectorPtr vector) {
VectorPtr vector,
bool flattenIfRedundant) {
// Dictionary that doesn't add nulls over constant is same as constant. Just
// make sure to adjust the size.
if (vector->encoding() == VectorEncoding::Simple::CONSTANT && !nulls) {
Expand All @@ -144,14 +145,29 @@ VectorPtr BaseVector::wrapInDictionary(
return BaseVector::wrapInConstant(size, 0, std::move(vector));
}

bool shouldFlatten = false;
if (flattenIfRedundant) {
auto base = vector;
while (base->encoding() == VectorEncoding::Simple::DICTIONARY) {
base = base->valueVector();
}
shouldFlatten = !isLazyNotLoaded(*base) && (base->size() / 8) > size;
}

auto kind = vector->typeKind();
return VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
auto result = VELOX_DYNAMIC_TYPE_DISPATCH_ALL(
addDictionary,
kind,
std::move(nulls),
std::move(indices),
size,
std::move(vector));

if (shouldFlatten) {
BaseVector::flattenVector(result);
}

return result;
}

template <TypeKind kind>
Expand Down
9 changes: 8 additions & 1 deletion velox/vector/BaseVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,18 @@ class BaseVector {
vector_size_t size,
velox::memory::MemoryPool* pool);

/// Wraps the 'vector' in the provided dictionary encoding. If the input
/// vector is constant and the nulls buffer is empty, this method may return a
/// ConstantVector. Additionally, if 'flattenIfRedundant' is true, this method
/// may return a flattened version of the expected dictionary vector if
/// applying the dictionary encoding would result in a suboptimally encoded
/// vector.
static VectorPtr wrapInDictionary(
BufferPtr nulls,
BufferPtr indices,
vector_size_t size,
VectorPtr vector);
VectorPtr vector,
bool flattenIfRedundant = false);

static VectorPtr
wrapInSequence(BufferPtr lengths, vector_size_t size, VectorPtr vector);
Expand Down
32 changes: 32 additions & 0 deletions velox/vector/tests/VectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3901,6 +3901,38 @@ TEST_F(VectorTest, testOverSizedArray) {
EXPECT_THROW(BaseVector::flattenVector(constArray), VeloxUserError);
}

TEST_F(VectorTest, testFlatteningOfRedundantDictionary) {
// Verify that BaseVector::wrapInDictionary() API flattens the output if
// 'flattenIfRedundant' is set to true and the wrapped vector has a size less
// than 1/8 the size of the base.
auto vectorSize = 100;
auto flat =
makeFlatVector<int32_t>(vectorSize, [](auto /*row*/) { return 1; });
{
auto dictionarySize = vectorSize / 10;
auto indices = makeIndices(dictionarySize, [](auto i) { return i; });
auto wrapped =
BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY);

wrapped = BaseVector::wrapInDictionary(
nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::FLAT);
}

{
auto dictionarySize = vectorSize / 3;
auto indices = makeIndices(dictionarySize, [](auto i) { return i; });
auto wrapped =
BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY);

wrapped = BaseVector::wrapInDictionary(
nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/);
EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY);
}
}

TEST_F(VectorTest, hasOverlappingRanges) {
auto test = [this](
vector_size_t length,
Expand Down
Loading