From 15a05dcd1e25ef405a5a480d789732afe61b5ca9 Mon Sep 17 00:00:00 2001 From: Bikramjeet Vig Date: Thu, 21 Nov 2024 10:41:04 -0800 Subject: [PATCH] fix: Optimize subscript and array/map filter in favor of memory (#11608) Summary: Currently, these functions wrap the underlying element vectors of the map/array with a dictionary layer where the indices point to the selected/remaining elements. If the element vector is large and only a small subset of elements are selected, then this can create counterproductive dictionaries where the base is much larger than the dictionary. This can then result in large amounts of memory being passed up the execution pipeline, holding onto memory, and furthermore, expression eval can peel these vectors and operate on the large bases that can end up creating intermediate vectors of similar large size. This can put further memory pressure on queries, causing them to hit their memory limits, resulting in failures or spills. This change ensures that the aforementioned functions that can generate these kinds of results would instead flatten the elements vector if the size of the dictionary is less than 1/8th the size of the base (elements vector). This helped reduce memory usage in the filterProject operator in a particular query from 2.6GB to 380MB. Differential Revision: D66253163 --- velox/functions/lib/SubscriptUtil.cpp | 20 +++++++- velox/functions/lib/SubscriptUtil.h | 10 +++- velox/functions/prestosql/FilterFunctions.cpp | 25 +++++++--- .../prestosql/tests/ArrayFilterTest.cpp | 20 ++++++++ .../prestosql/tests/ElementAtTest.cpp | 48 +++++++++++++++++++ .../prestosql/tests/MapFilterTest.cpp | 38 +++++++++++++++ velox/vector/BaseVector.cpp | 20 +++++++- velox/vector/BaseVector.h | 9 +++- velox/vector/tests/VectorTest.cpp | 32 +++++++++++++ 9 files changed, 210 insertions(+), 12 deletions(-) diff --git a/velox/functions/lib/SubscriptUtil.cpp b/velox/functions/lib/SubscriptUtil.cpp index c01dc64d3cab..55031e0c31ee 100644 --- a/velox/functions/lib/SubscriptUtil.cpp +++ b/velox/functions/lib/SubscriptUtil.cpp @@ -174,8 +174,16 @@ VectorPtr applyMapTyped( baseMap->mapValues()->type(), rows.end(), context.pool()); } + // Subscript can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. return BaseVector::wrapInDictionary( - nullsBuilder.build(), indices, rows.end(), baseMap->mapValues()); + nullsBuilder.build(), + indices, + rows.end(), + baseMap->mapValues(), + true /*flattenIfRedundant*/); } VectorPtr applyMapComplexType( @@ -294,8 +302,16 @@ VectorPtr applyMapComplexType( baseMap->mapValues()->type(), rows.end(), context.pool()); } + // Subscript can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. return BaseVector::wrapInDictionary( - nullsBuilder.build(), indices, rows.end(), baseMap->mapValues()); + nullsBuilder.build(), + indices, + rows.end(), + baseMap->mapValues(), + true /*flattenIfRedundant*/); } } // namespace diff --git a/velox/functions/lib/SubscriptUtil.h b/velox/functions/lib/SubscriptUtil.h index 2700d0c1bf07..c649bb99954f 100644 --- a/velox/functions/lib/SubscriptUtil.h +++ b/velox/functions/lib/SubscriptUtil.h @@ -373,8 +373,16 @@ class SubscriptImpl : public exec::Subscript { baseArray->elements()->type(), rows.end(), context.pool()); } + // Subscript can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. return BaseVector::wrapInDictionary( - nullsBuilder.build(), indices, rows.end(), baseArray->elements()); + nullsBuilder.build(), + indices, + rows.end(), + baseArray->elements(), + true /*flattenIfRedundant*/); } // Normalize indices from 1 or 0-based into always 0-based (according to diff --git a/velox/functions/prestosql/FilterFunctions.cpp b/velox/functions/prestosql/FilterFunctions.cpp index f0257fa330af..63519caafe5a 100644 --- a/velox/functions/prestosql/FilterFunctions.cpp +++ b/velox/functions/prestosql/FilterFunctions.cpp @@ -133,11 +133,16 @@ class ArrayFilterFunction : public FilterFunctionBase { resultSizes, selectedIndices); + // Filter can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. auto wrappedElements = numSelected ? BaseVector::wrapInDictionary( BufferPtr(nullptr), std::move(selectedIndices), numSelected, - std::move(elements)) + std::move(elements), + true /*flattenIfRedundant*/) : nullptr; // Set nulls for rows not present in 'rows'. BufferPtr newNulls = addNullsForUnselectedRows(flatArray, rows); @@ -196,15 +201,23 @@ class MapFilterFunction : public FilterFunctionBase { resultSizes, selectedIndices); - auto wrappedKeys = numSelected - ? BaseVector::wrapInDictionary( - BufferPtr(nullptr), selectedIndices, numSelected, std::move(keys)) - : nullptr; + // Filter can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. + auto wrappedKeys = numSelected ? BaseVector::wrapInDictionary( + BufferPtr(nullptr), + selectedIndices, + numSelected, + std::move(keys), + true /*flattenIfRedundant*/) + : nullptr; auto wrappedValues = numSelected ? BaseVector::wrapInDictionary( BufferPtr(nullptr), selectedIndices, numSelected, - std::move(values)) + std::move(values), + true /*flattenIfRedundant*/) : nullptr; // Set nulls for rows not present in 'rows'. BufferPtr newNulls = addNullsForUnselectedRows(flatMap, rows); diff --git a/velox/functions/prestosql/tests/ArrayFilterTest.cpp b/velox/functions/prestosql/tests/ArrayFilterTest.cpp index 423a3e8aef89..bc6444114982 100644 --- a/velox/functions/prestosql/tests/ArrayFilterTest.cpp +++ b/velox/functions/prestosql/tests/ArrayFilterTest.cpp @@ -233,3 +233,23 @@ TEST_F(ArrayFilterTest, try) { {{{1, 2}}, std::nullopt, {{6, 7, 8, 9}}, {{10, 11, 12, 13}}}); assertEqualVectors(expected, result); } + +TEST_F(ArrayFilterTest, selectiveFilter) { + // Verify that a selective filter will ensure the underlying elements + // vector is flattened before generating the result which is otherwise wrapped + // in a dictionary with the filter results. This ensures large element + // vectors are not passed along. + auto data = makeRowVector({ + makeArrayVector({ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }), + }); + auto result = evaluate("filter(c0, x -> (x = 1))", data); + auto base = result->as()->elements(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT); + + result = evaluate("filter(c0, x -> (x < 4))", data); + base = result->as()->elements(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY); +} diff --git a/velox/functions/prestosql/tests/ElementAtTest.cpp b/velox/functions/prestosql/tests/ElementAtTest.cpp index 64b7f7085543..6b8d3f976792 100644 --- a/velox/functions/prestosql/tests/ElementAtTest.cpp +++ b/velox/functions/prestosql/tests/ElementAtTest.cpp @@ -1427,3 +1427,51 @@ TEST_F(ElementAtTest, timestampWithTimeZoneWithCaching) { std::vector{pack(5, 10)}, TIMESTAMP_WITH_TIME_ZONE())}); testCaching({mapOfRowKeys, lookup}, makeConstant(5, 1)); } + +TEST_F(ElementAtTest, highlySelective) { + // Verify that selecting a single element from a large array/map will ensure + // the underlying elements vector is flattened before generating the result + // which is otherwise wrapped in a dictionary with indices pointing to the + // selected subscript. This ensures large element vectors are not passed + // along. + vector_size_t vectorSize = 100; + auto sizeAtLarge = [](vector_size_t /* row */) { return 10; }; + auto sizeAtSmall = [](vector_size_t /* row */) { return 5; }; + auto keyAt = [](vector_size_t idx) { return idx; }; + auto valueAt = [](vector_size_t /* idx */) { return 10; }; + { + auto mapVector = makeMapVector( + vectorSize, sizeAtLarge, keyAt, valueAt); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({mapVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT); + } + + { + auto mapVector = makeMapVector( + vectorSize, sizeAtSmall, keyAt, valueAt); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({mapVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY); + } + + auto valueAtArray = [](vector_size_t /* row */, vector_size_t /* idx */) { + return 10; + }; + + { + auto arrayVector = + makeArrayVector(vectorSize, sizeAtLarge, valueAtArray); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({arrayVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT); + } + + { + auto arrayVector = + makeArrayVector(vectorSize, sizeAtSmall, valueAtArray); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({arrayVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY); + } +} diff --git a/velox/functions/prestosql/tests/MapFilterTest.cpp b/velox/functions/prestosql/tests/MapFilterTest.cpp index ec8e89ebcec7..f9d1e33fbf9c 100644 --- a/velox/functions/prestosql/tests/MapFilterTest.cpp +++ b/velox/functions/prestosql/tests/MapFilterTest.cpp @@ -275,3 +275,41 @@ TEST_F(MapFilterTest, unknown) { auto result = evaluate("map_filter(c0, (k, v) -> (v > 5))", data); assertEqualVectors(data->childAt(0), result); } + +TEST_F(MapFilterTest, selectiveFilter) { + // Verify that a selective filter will ensure the underlying elements + // vector is flattened before generating the result which is otherwise wrapped + // in a dictionary with the filter results. This ensures large element + // vectors are not passed along. + auto data = makeRowVector({ + makeMapVector( + {{{1, 3}, + {2, 3}, + {3, 3}, + {4, 3}, + {5, 3}, + {6, 3}, + {7, 3}, + {8, 3}, + {9, 3}, + {10, 3}, + {11, 3}, + {12, 3}, + {13, 3}, + {14, 3}, + {15, 3}, + {16, 3}}}), + }); + + auto result = evaluate("map_filter(c0, (k, v) -> (k = 1))", data); + auto base = result->as()->mapKeys(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT); + base = result->as()->mapValues(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT); + + result = evaluate("map_filter(c0, (k, v) -> (k < 6))", data); + base = result->as()->mapKeys(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY); + base = result->as()->mapValues(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY); +} diff --git a/velox/vector/BaseVector.cpp b/velox/vector/BaseVector.cpp index cc98fc2af95b..180e4c33336f 100644 --- a/velox/vector/BaseVector.cpp +++ b/velox/vector/BaseVector.cpp @@ -134,7 +134,8 @@ VectorPtr BaseVector::wrapInDictionary( BufferPtr nulls, BufferPtr indices, vector_size_t size, - VectorPtr vector) { + VectorPtr vector, + bool flattenIfRedundant) { // Dictionary that doesn't add nulls over constant is same as constant. Just // make sure to adjust the size. if (vector->encoding() == VectorEncoding::Simple::CONSTANT && !nulls) { @@ -144,14 +145,29 @@ VectorPtr BaseVector::wrapInDictionary( return BaseVector::wrapInConstant(size, 0, std::move(vector)); } + bool shouldFlatten = false; + if (flattenIfRedundant) { + auto base = vector; + while (base->encoding() == VectorEncoding::Simple::DICTIONARY) { + base = base->valueVector(); + } + shouldFlatten = !isLazyNotLoaded(*base) && (base->size() / 8) > size; + } + auto kind = vector->typeKind(); - return VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + auto result = VELOX_DYNAMIC_TYPE_DISPATCH_ALL( addDictionary, kind, std::move(nulls), std::move(indices), size, std::move(vector)); + + if (shouldFlatten) { + BaseVector::flattenVector(result); + } + + return result; } template diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index 23b9dc7b8f01..883ee403cfb2 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -559,11 +559,18 @@ class BaseVector { vector_size_t size, velox::memory::MemoryPool* pool); + /// Wraps the 'vector' in the provided dictionary encoding. If the input + /// vector is constant and the nulls buffer is empty, this method may return a + /// ConstantVector. Additionally, if 'flattenIfRedundant' is true, this method + /// may return a flattened version of the expected dictionary vector if + /// applying the dictionary encoding would result in a suboptimally encoded + /// vector. static VectorPtr wrapInDictionary( BufferPtr nulls, BufferPtr indices, vector_size_t size, - VectorPtr vector); + VectorPtr vector, + bool flattenIfRedundant = false); static VectorPtr wrapInSequence(BufferPtr lengths, vector_size_t size, VectorPtr vector); diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index 44d35c1ca8b3..7bc5b2e4abc1 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -3901,6 +3901,38 @@ TEST_F(VectorTest, testOverSizedArray) { EXPECT_THROW(BaseVector::flattenVector(constArray), VeloxUserError); } +TEST_F(VectorTest, testFlatteningOfRedundantDictionary) { + // Verify that BaseVector::wrapInDictionary() API flattens the output if + // 'flattenIfRedundant' is set to true and the wrapped vector has a size less + // than 1/8 the size of the base. + auto vectorSize = 100; + auto flat = + makeFlatVector(vectorSize, [](auto /*row*/) { return 1; }); + { + auto dictionarySize = vectorSize / 10; + auto indices = makeIndices(dictionarySize, [](auto i) { return i; }); + auto wrapped = + BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY); + + wrapped = BaseVector::wrapInDictionary( + nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::FLAT); + } + + { + auto dictionarySize = vectorSize / 3; + auto indices = makeIndices(dictionarySize, [](auto i) { return i; }); + auto wrapped = + BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY); + + wrapped = BaseVector::wrapInDictionary( + nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY); + } +} + TEST_F(VectorTest, hasOverlappingRanges) { auto test = [this]( vector_size_t length,