diff --git a/velox/functions/lib/SubscriptUtil.cpp b/velox/functions/lib/SubscriptUtil.cpp index c01dc64d3cab..55031e0c31ee 100644 --- a/velox/functions/lib/SubscriptUtil.cpp +++ b/velox/functions/lib/SubscriptUtil.cpp @@ -174,8 +174,16 @@ VectorPtr applyMapTyped( baseMap->mapValues()->type(), rows.end(), context.pool()); } + // Subscript can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. return BaseVector::wrapInDictionary( - nullsBuilder.build(), indices, rows.end(), baseMap->mapValues()); + nullsBuilder.build(), + indices, + rows.end(), + baseMap->mapValues(), + true /*flattenIfRedundant*/); } VectorPtr applyMapComplexType( @@ -294,8 +302,16 @@ VectorPtr applyMapComplexType( baseMap->mapValues()->type(), rows.end(), context.pool()); } + // Subscript can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. return BaseVector::wrapInDictionary( - nullsBuilder.build(), indices, rows.end(), baseMap->mapValues()); + nullsBuilder.build(), + indices, + rows.end(), + baseMap->mapValues(), + true /*flattenIfRedundant*/); } } // namespace diff --git a/velox/functions/lib/SubscriptUtil.h b/velox/functions/lib/SubscriptUtil.h index 2700d0c1bf07..c649bb99954f 100644 --- a/velox/functions/lib/SubscriptUtil.h +++ b/velox/functions/lib/SubscriptUtil.h @@ -373,8 +373,16 @@ class SubscriptImpl : public exec::Subscript { baseArray->elements()->type(), rows.end(), context.pool()); } + // Subscript can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. return BaseVector::wrapInDictionary( - nullsBuilder.build(), indices, rows.end(), baseArray->elements()); + nullsBuilder.build(), + indices, + rows.end(), + baseArray->elements(), + true /*flattenIfRedundant*/); } // Normalize indices from 1 or 0-based into always 0-based (according to diff --git a/velox/functions/prestosql/FilterFunctions.cpp b/velox/functions/prestosql/FilterFunctions.cpp index f0257fa330af..63519caafe5a 100644 --- a/velox/functions/prestosql/FilterFunctions.cpp +++ b/velox/functions/prestosql/FilterFunctions.cpp @@ -133,11 +133,16 @@ class ArrayFilterFunction : public FilterFunctionBase { resultSizes, selectedIndices); + // Filter can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. auto wrappedElements = numSelected ? BaseVector::wrapInDictionary( BufferPtr(nullptr), std::move(selectedIndices), numSelected, - std::move(elements)) + std::move(elements), + true /*flattenIfRedundant*/) : nullptr; // Set nulls for rows not present in 'rows'. BufferPtr newNulls = addNullsForUnselectedRows(flatArray, rows); @@ -196,15 +201,23 @@ class MapFilterFunction : public FilterFunctionBase { resultSizes, selectedIndices); - auto wrappedKeys = numSelected - ? BaseVector::wrapInDictionary( - BufferPtr(nullptr), selectedIndices, numSelected, std::move(keys)) - : nullptr; + // Filter can pass along very large elements vectors that can hold onto + // memory and copy operations on them can further put memory pressure. We + // try to flatten them if the dictionary layer is much smaller than the + // elements vector. + auto wrappedKeys = numSelected ? BaseVector::wrapInDictionary( + BufferPtr(nullptr), + selectedIndices, + numSelected, + std::move(keys), + true /*flattenIfRedundant*/) + : nullptr; auto wrappedValues = numSelected ? BaseVector::wrapInDictionary( BufferPtr(nullptr), selectedIndices, numSelected, - std::move(values)) + std::move(values), + true /*flattenIfRedundant*/) : nullptr; // Set nulls for rows not present in 'rows'. BufferPtr newNulls = addNullsForUnselectedRows(flatMap, rows); diff --git a/velox/functions/prestosql/tests/ArrayFilterTest.cpp b/velox/functions/prestosql/tests/ArrayFilterTest.cpp index 423a3e8aef89..bc6444114982 100644 --- a/velox/functions/prestosql/tests/ArrayFilterTest.cpp +++ b/velox/functions/prestosql/tests/ArrayFilterTest.cpp @@ -233,3 +233,23 @@ TEST_F(ArrayFilterTest, try) { {{{1, 2}}, std::nullopt, {{6, 7, 8, 9}}, {{10, 11, 12, 13}}}); assertEqualVectors(expected, result); } + +TEST_F(ArrayFilterTest, selectiveFilter) { + // Verify that a selective filter will ensure the underlying elements + // vector is flattened before generating the result which is otherwise wrapped + // in a dictionary with the filter results. This ensures large element + // vectors are not passed along. + auto data = makeRowVector({ + makeArrayVector({ + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }), + }); + auto result = evaluate("filter(c0, x -> (x = 1))", data); + auto base = result->as()->elements(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT); + + result = evaluate("filter(c0, x -> (x < 4))", data); + base = result->as()->elements(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY); +} diff --git a/velox/functions/prestosql/tests/ElementAtTest.cpp b/velox/functions/prestosql/tests/ElementAtTest.cpp index 64b7f7085543..6b8d3f976792 100644 --- a/velox/functions/prestosql/tests/ElementAtTest.cpp +++ b/velox/functions/prestosql/tests/ElementAtTest.cpp @@ -1427,3 +1427,51 @@ TEST_F(ElementAtTest, timestampWithTimeZoneWithCaching) { std::vector{pack(5, 10)}, TIMESTAMP_WITH_TIME_ZONE())}); testCaching({mapOfRowKeys, lookup}, makeConstant(5, 1)); } + +TEST_F(ElementAtTest, highlySelective) { + // Verify that selecting a single element from a large array/map will ensure + // the underlying elements vector is flattened before generating the result + // which is otherwise wrapped in a dictionary with indices pointing to the + // selected subscript. This ensures large element vectors are not passed + // along. + vector_size_t vectorSize = 100; + auto sizeAtLarge = [](vector_size_t /* row */) { return 10; }; + auto sizeAtSmall = [](vector_size_t /* row */) { return 5; }; + auto keyAt = [](vector_size_t idx) { return idx; }; + auto valueAt = [](vector_size_t /* idx */) { return 10; }; + { + auto mapVector = makeMapVector( + vectorSize, sizeAtLarge, keyAt, valueAt); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({mapVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT); + } + + { + auto mapVector = makeMapVector( + vectorSize, sizeAtSmall, keyAt, valueAt); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({mapVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY); + } + + auto valueAtArray = [](vector_size_t /* row */, vector_size_t /* idx */) { + return 10; + }; + + { + auto arrayVector = + makeArrayVector(vectorSize, sizeAtLarge, valueAtArray); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({arrayVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::FLAT); + } + + { + auto arrayVector = + makeArrayVector(vectorSize, sizeAtSmall, valueAtArray); + auto result = evaluate>( + "element_at(C0, 3)", makeRowVector({arrayVector})); + EXPECT_EQ(result->encoding(), VectorEncoding::Simple::DICTIONARY); + } +} diff --git a/velox/functions/prestosql/tests/MapFilterTest.cpp b/velox/functions/prestosql/tests/MapFilterTest.cpp index ec8e89ebcec7..f9d1e33fbf9c 100644 --- a/velox/functions/prestosql/tests/MapFilterTest.cpp +++ b/velox/functions/prestosql/tests/MapFilterTest.cpp @@ -275,3 +275,41 @@ TEST_F(MapFilterTest, unknown) { auto result = evaluate("map_filter(c0, (k, v) -> (v > 5))", data); assertEqualVectors(data->childAt(0), result); } + +TEST_F(MapFilterTest, selectiveFilter) { + // Verify that a selective filter will ensure the underlying elements + // vector is flattened before generating the result which is otherwise wrapped + // in a dictionary with the filter results. This ensures large element + // vectors are not passed along. + auto data = makeRowVector({ + makeMapVector( + {{{1, 3}, + {2, 3}, + {3, 3}, + {4, 3}, + {5, 3}, + {6, 3}, + {7, 3}, + {8, 3}, + {9, 3}, + {10, 3}, + {11, 3}, + {12, 3}, + {13, 3}, + {14, 3}, + {15, 3}, + {16, 3}}}), + }); + + auto result = evaluate("map_filter(c0, (k, v) -> (k = 1))", data); + auto base = result->as()->mapKeys(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT); + base = result->as()->mapValues(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::FLAT); + + result = evaluate("map_filter(c0, (k, v) -> (k < 6))", data); + base = result->as()->mapKeys(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY); + base = result->as()->mapValues(); + EXPECT_EQ(base->encoding(), VectorEncoding::Simple::DICTIONARY); +} diff --git a/velox/vector/BaseVector.cpp b/velox/vector/BaseVector.cpp index cc98fc2af95b..180e4c33336f 100644 --- a/velox/vector/BaseVector.cpp +++ b/velox/vector/BaseVector.cpp @@ -134,7 +134,8 @@ VectorPtr BaseVector::wrapInDictionary( BufferPtr nulls, BufferPtr indices, vector_size_t size, - VectorPtr vector) { + VectorPtr vector, + bool flattenIfRedundant) { // Dictionary that doesn't add nulls over constant is same as constant. Just // make sure to adjust the size. if (vector->encoding() == VectorEncoding::Simple::CONSTANT && !nulls) { @@ -144,14 +145,29 @@ VectorPtr BaseVector::wrapInDictionary( return BaseVector::wrapInConstant(size, 0, std::move(vector)); } + bool shouldFlatten = false; + if (flattenIfRedundant) { + auto base = vector; + while (base->encoding() == VectorEncoding::Simple::DICTIONARY) { + base = base->valueVector(); + } + shouldFlatten = !isLazyNotLoaded(*base) && (base->size() / 8) > size; + } + auto kind = vector->typeKind(); - return VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + auto result = VELOX_DYNAMIC_TYPE_DISPATCH_ALL( addDictionary, kind, std::move(nulls), std::move(indices), size, std::move(vector)); + + if (shouldFlatten) { + BaseVector::flattenVector(result); + } + + return result; } template diff --git a/velox/vector/BaseVector.h b/velox/vector/BaseVector.h index 23b9dc7b8f01..883ee403cfb2 100644 --- a/velox/vector/BaseVector.h +++ b/velox/vector/BaseVector.h @@ -559,11 +559,18 @@ class BaseVector { vector_size_t size, velox::memory::MemoryPool* pool); + /// Wraps the 'vector' in the provided dictionary encoding. If the input + /// vector is constant and the nulls buffer is empty, this method may return a + /// ConstantVector. Additionally, if 'flattenIfRedundant' is true, this method + /// may return a flattened version of the expected dictionary vector if + /// applying the dictionary encoding would result in a suboptimally encoded + /// vector. static VectorPtr wrapInDictionary( BufferPtr nulls, BufferPtr indices, vector_size_t size, - VectorPtr vector); + VectorPtr vector, + bool flattenIfRedundant = false); static VectorPtr wrapInSequence(BufferPtr lengths, vector_size_t size, VectorPtr vector); diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index 44d35c1ca8b3..7bc5b2e4abc1 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -3901,6 +3901,38 @@ TEST_F(VectorTest, testOverSizedArray) { EXPECT_THROW(BaseVector::flattenVector(constArray), VeloxUserError); } +TEST_F(VectorTest, testFlatteningOfRedundantDictionary) { + // Verify that BaseVector::wrapInDictionary() API flattens the output if + // 'flattenIfRedundant' is set to true and the wrapped vector has a size less + // than 1/8 the size of the base. + auto vectorSize = 100; + auto flat = + makeFlatVector(vectorSize, [](auto /*row*/) { return 1; }); + { + auto dictionarySize = vectorSize / 10; + auto indices = makeIndices(dictionarySize, [](auto i) { return i; }); + auto wrapped = + BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY); + + wrapped = BaseVector::wrapInDictionary( + nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::FLAT); + } + + { + auto dictionarySize = vectorSize / 3; + auto indices = makeIndices(dictionarySize, [](auto i) { return i; }); + auto wrapped = + BaseVector::wrapInDictionary(nullptr, indices, dictionarySize, flat); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY); + + wrapped = BaseVector::wrapInDictionary( + nullptr, indices, dictionarySize, flat, true /*flattenIfRedundant*/); + EXPECT_EQ(wrapped->encoding(), VectorEncoding::Simple::DICTIONARY); + } +} + TEST_F(VectorTest, hasOverlappingRanges) { auto test = [this]( vector_size_t length,