diff --git a/velox/vector/ComplexVector.cpp b/velox/vector/ComplexVector.cpp index a40329a3ccd83..256714558ea98 100644 --- a/velox/vector/ComplexVector.cpp +++ b/velox/vector/ComplexVector.cpp @@ -1327,6 +1327,216 @@ void MapVector::copyRanges( copyRangesImpl(source, ranges, &values_, &keys_); } +namespace { + +struct UpdateSource { + vector_size_t entryIndex; + int8_t sourceIndex; +}; + +template +class UpdateMapRow { + public: + void insert( + const DecodedVector* decoded, + vector_size_t entryIndex, + int8_t sourceIndex) { + values_[decoded->valueAt(entryIndex)] = {entryIndex, sourceIndex}; + } + + template + void forEachEntry(F&& func) { + for (auto& [_, source] : values_) { + func(source); + } + } + + void clear() { + values_.clear(); + } + + private: + folly::F14FastMap values_; +}; + +template <> +class UpdateMapRow { + public: + void insert( + const DecodedVector* decoded, + vector_size_t entryIndex, + int8_t sourceIndex) { + references_[{decoded, entryIndex}] = {entryIndex, sourceIndex}; + } + + template + void forEachEntry(F&& func) { + for (auto& [_, source] : references_) { + func(source); + } + } + + void clear() { + references_.clear(); + } + + private: + struct Reference { + const DecodedVector* decoded; + vector_size_t index; + + bool operator==(const Reference& other) const { + return decoded->base()->equalValueAt( + other.decoded->base(), + decoded->index(index), + other.decoded->index(other.index)); + } + }; + + struct ReferenceHasher { + uint64_t operator()(const Reference& key) const { + return key.decoded->base()->hashValueAt(key.decoded->index(key.index)); + } + }; + + folly::F14FastMap references_; +}; + +} // namespace + +template +MapVectorPtr MapVector::updateImpl( + const std::vector& others, + bool deleteNullMapValue) const { + auto newNulls = nulls(); + bool allocatedNewNulls = false; + for (auto& other : others) { + if (!other->nulls()) { + continue; + } + if (!newNulls) { + newNulls = other->nulls(); + continue; + } + if (!allocatedNewNulls) { + auto* prevNewNulls = newNulls->as(); + newNulls = allocateNulls(size(), pool()); + allocatedNewNulls = true; + bits::andBits( + newNulls->asMutable(), + prevNewNulls, + other->rawNulls(), + 0, + size()); + } + } + + auto newOffsets = allocateIndices(size(), pool()); + auto* rawNewOffsets = newOffsets->asMutable(); + auto newSizes = allocateIndices(size(), pool()); + auto* rawNewSizes = newSizes->asMutable(); + + std::vector keys; + keys.reserve(1 + others.size()); + for (auto& other : others) { + VELOX_CHECK(*keys_->type() == *other->keys_->type()); + keys.emplace_back(*other->keys_); + } + keys.emplace_back(*keys_); + + std::vector> ranges(1 + others.size()); + UpdateMapRow::NativeType> mapRow; + vector_size_t numEntries = 0; + for (vector_size_t i = 0; i < size(); ++i) { + rawNewOffsets[i] = numEntries; + if (newNulls && bits::isBitNull(newNulls->as(), i)) { + rawNewSizes[i] = 0; + continue; + } + bool needUpdate = false; + for (auto& other : others) { + if (other->sizeAt(i) > 0) { + needUpdate = true; + break; + } + } + if (!needUpdate) { + // Fast path for no update on current row. Common in case of table + // mutability delta update. + rawNewSizes[i] = sizeAt(i); + if (sizeAt(i) > 0) { + ranges.back().push_back({offsetAt(i), numEntries, sizeAt(i)}); + numEntries += sizeAt(i); + } + continue; + } + auto offset = offsetAt(i); + auto size = sizeAt(i); + for (vector_size_t j = 0; j < size; ++j) { + auto jj = offset + j; + VELOX_DCHECK(!keys.back().isNullAt(jj)); + VELOX_DCHECK(!deleteNullMapValue || !values_->isNullAt(jj)); + mapRow.insert(&keys.back(), jj, others.size()); + } + for (int k = 0; k < others.size(); ++k) { + offset = others[k]->offsetAt(i); + size = others[k]->sizeAt(i); + for (vector_size_t j = 0; j < size; ++j) { + auto jj = offset + j; + VELOX_DCHECK(!keys[k].isNullAt(jj)); + mapRow.insert(&keys[k], jj, k); + } + } + vector_size_t newSize = 0; + mapRow.forEachEntry([&](UpdateSource source) { + if (deleteNullMapValue) { + auto* vector = source.sourceIndex == others.size() + ? this + : others[source.sourceIndex].get(); + if (vector->values_->isNullAt(source.entryIndex)) { + return; + } + } + ranges[source.sourceIndex].push_back( + {source.entryIndex, numEntries + newSize, 1}); + ++newSize; + }); + mapRow.clear(); + rawNewSizes[i] = newSize; + numEntries += newSize; + } + + auto newKeys = BaseVector::create(mapKeys()->type(), numEntries, pool()); + auto newValues = BaseVector::create(mapValues()->type(), numEntries, pool()); + for (int k = 0; k < ranges.size(); ++k) { + auto* vector = k == others.size() ? this : others[k].get(); + newKeys->copyRanges(vector->mapKeys().get(), ranges[k]); + newValues->copyRanges(vector->mapValues().get(), ranges[k]); + } + + return std::make_shared( + pool(), + type(), + std::move(newNulls), + size(), + std::move(newOffsets), + std::move(newSizes), + std::move(newKeys), + std::move(newValues)); +} + +MapVectorPtr MapVector::update( + const std::vector& others, + bool deleteNullMapValue) const { + VELOX_CHECK(!others.empty()); + VELOX_CHECK(others.size() < std::numeric_limits::max()); + for (auto& other : others) { + VELOX_CHECK_EQ(size(), other->size()); + } + return VELOX_DYNAMIC_TYPE_DISPATCH( + updateImpl, keys_->typeKind(), others, deleteNullMapValue); +} + void RowVector::appendNulls(vector_size_t numberOfRows) { VELOX_CHECK_GE(numberOfRows, 0); if (numberOfRows == 0) { diff --git a/velox/vector/ComplexVector.h b/velox/vector/ComplexVector.h index 5104a2f680807..9ed61b8f0f0ac 100644 --- a/velox/vector/ComplexVector.h +++ b/velox/vector/ComplexVector.h @@ -614,6 +614,20 @@ class MapVector : public ArrayVectorBase { void validate(const VectorValidateOptions& options) const override; + /// Update this map vector (base) with a list of map vectors (updates) of same + /// size. Maps are updated row-wise, i.e. for a certain key in each row, we + /// keep the entry from the last update map containing the key. If no update + /// map contains the key, we use the entry from base. Any null map in either + /// base or updates creates a null row in the result. + /// + /// When deleteNullMapValue is true, if a map value is null in update vector + /// (argument), we drop the entry from base, instead of overwriting it with an + /// entry with null value. No null map value in base vector is allowed in + /// this case. + std::shared_ptr update( + const std::vector>& others, + bool deleteNullMapValue) const; + protected: virtual void resetDataDependentFlags(const SelectivityVector* rows) override { BaseVector::resetDataDependentFlags(rows); @@ -629,6 +643,11 @@ class MapVector : public ArrayVectorBase { // get elements in key order in each map. BufferPtr elementIndices() const; + template + std::shared_ptr updateImpl( + const std::vector>& others, + bool deleteNullMapValue) const; + VectorPtr keys_; VectorPtr values_; bool sortedKeys_; @@ -649,4 +668,5 @@ inline BufferPtr allocateOffsets(vector_size_t size, memory::MemoryPool* pool) { inline BufferPtr allocateSizes(vector_size_t size, memory::MemoryPool* pool) { return AlignedBuffer::allocate(size, pool, 0); } + } // namespace facebook::velox diff --git a/velox/vector/benchmarks/CMakeLists.txt b/velox/vector/benchmarks/CMakeLists.txt index 0ebca6e668541..eb9c71e4d8782 100644 --- a/velox/vector/benchmarks/CMakeLists.txt +++ b/velox/vector/benchmarks/CMakeLists.txt @@ -28,3 +28,7 @@ add_executable(copy_benchmark CopyBenchmark.cpp) target_link_libraries(copy_benchmark velox_vector_test_lib Folly::folly ${FOLLY_BENCHMARK}) + +add_executable(velox_vector_map_update_benchmark MapUpdateBenchmark.cpp) +target_link_libraries(velox_vector_map_update_benchmark velox_vector_test_lib + Folly::folly ${FOLLY_BENCHMARK}) diff --git a/velox/vector/benchmarks/MapUpdateBenchmark.cpp b/velox/vector/benchmarks/MapUpdateBenchmark.cpp new file mode 100644 index 0000000000000..25e169ab21472 --- /dev/null +++ b/velox/vector/benchmarks/MapUpdateBenchmark.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/vector/ComplexVector.h" +#include "velox/vector/tests/utils/VectorMaker.h" + +#include +#include + +#include + +namespace facebook::velox { +namespace { + +MapVectorPtr generateBase( + memory::MemoryPool* pool, + std::default_random_engine& gen) { + constexpr int kSize = 1000; + constexpr int kMapSize = 10; + test::VectorMaker maker(pool); + std::uniform_real_distribution valueDist; + return maker.mapVector( + kSize, + [&](auto /*i*/) { return kMapSize; }, + [](auto /*i*/, auto j) { return j; }, + [&](auto /*i*/, auto /*j*/) { return valueDist(gen); }); +} + +MapVectorPtr generateUpdate( + memory::MemoryPool* pool, + std::default_random_engine& gen, + const MapVector& base) { + constexpr int kMapSize = 5; + auto offsets = allocateIndices(base.size(), pool); + auto* rawOffsets = offsets->asMutable(); + rawOffsets[0] = 0; + for (int i = 1; i < base.size(); ++i) { + rawOffsets[i] = rawOffsets[i - 1] + kMapSize; + } + auto sizes = allocateIndices(base.size(), pool); + auto* rawSizes = sizes->asMutable(); + std::fill(rawSizes, rawSizes + base.size(), kMapSize); + int64_t keyCandidates[10]; + std::iota(std::begin(keyCandidates), std::end(keyCandidates), 0); + auto keys = BaseVector::create>( + BIGINT(), kMapSize * base.size(), pool); + for (int i = 0; i < base.size(); ++i) { + std::shuffle(std::begin(keyCandidates), std::end(keyCandidates), gen); + for (int j = 0; j < kMapSize; ++j) { + keys->set(j + i * kMapSize, keyCandidates[j]); + } + } + test::VectorMaker maker(pool); + std::uniform_real_distribution valueDist; + auto values = maker.flatVector( + kMapSize * base.size(), [&](auto /*i*/) { return valueDist(gen); }); + return std::make_shared( + pool, + base.type(), + nullptr, + base.size(), + std::move(offsets), + std::move(sizes), + std::move(keys), + std::move(values)); +} + +} // namespace +} // namespace facebook::velox + +int main(int argc, char* argv[]) { + using namespace facebook::velox; + folly::Init follyInit(&argc, &argv); + memory::MemoryManager::initialize({}); + auto pool = memory::memoryManager()->addLeafPool(); + std::default_random_engine gen(std::random_device{}()); + auto base = generateBase(pool.get(), gen); + auto update = generateUpdate(pool.get(), gen, *base); + + folly::addBenchmark(__FILE__, "int64 keys", [&] { + auto result = base->update({update}, true); + folly::doNotOptimizeAway(result); + return 1000; + }); + folly::runBenchmarks(); + + return 0; +} diff --git a/velox/vector/tests/VectorTest.cpp b/velox/vector/tests/VectorTest.cpp index f0bdd951c3625..2c26dffe1b72d 100644 --- a/velox/vector/tests/VectorTest.cpp +++ b/velox/vector/tests/VectorTest.cpp @@ -3647,5 +3647,84 @@ TEST_F(VectorTest, getLargeStringBuffer) { EXPECT_GE(buffer->capacity(), size); } +TEST_F(VectorTest, mapUpdate) { + auto base = makeNullableMapVector({ + {{{1, 1}, {2, 1}}}, + {{}}, + {{{3, 1}}}, + std::nullopt, + {{{4, 1}}}, + }); + auto update = makeNullableMapVector({ + {{{2, 2}, {3, 2}}}, + {{{4, 2}}}, + {{}}, + {{{5, 2}}}, + std::nullopt, + }); + auto expected = makeNullableMapVector({ + {{{2, 2}, {3, 2}, {1, 1}}}, + {{{4, 2}}}, + {{{3, 1}}}, + std::nullopt, + std::nullopt, + }); + auto actual = base->update({update}, false); + ASSERT_EQ(actual->size(), expected->size()); + for (int i = 0; i < actual->size(); ++i) { + ASSERT_TRUE(actual->equalValueAt(expected.get(), i, i)); + } +} + +TEST_F(VectorTest, mapUpdateRowKeyType) { + auto base = makeMapVector( + {0, 2}, + makeRowVector({ + makeFlatVector({1, 2}), + makeFlatVector({1, 2}), + }), + makeFlatVector({1, 1})); + auto update = makeMapVector( + {0, 2}, + makeRowVector({ + makeFlatVector({2, 3}), + makeFlatVector({2, 3}), + }), + makeFlatVector({2, 2})); + auto expected = makeMapVector( + {0, 3}, + makeRowVector({ + makeFlatVector({1, 2, 3}), + makeFlatVector({1, 2, 3}), + }), + makeFlatVector({1, 2, 2})); + auto actual = base->update({update}, false); + ASSERT_EQ(actual->size(), expected->size()); + for (int i = 0; i < actual->size(); ++i) { + ASSERT_TRUE(actual->equalValueAt(expected.get(), i, i)); + } +} + +TEST_F(VectorTest, mapUpdateDeleteNullMapValue) { + auto base = makeNullableMapVector({ + {{{1, 1}, {2, 1}}}, + }); + auto update = makeNullableMapVector({ + {{{2, std::nullopt}, {3, 2}}}, + }); + auto expected = makeNullableMapVector({ + {{{1, 1}, {3, 2}}}, + }); + auto actual = base->update({update}, true); + ASSERT_EQ(actual->size(), expected->size()); + for (int i = 0; i < actual->size(); ++i) { + ASSERT_TRUE(actual->equalValueAt(expected.get(), i, i)); + } +#ifndef NDEBUG + // No null map value is allowed in base. + ASSERT_THROW(update->update({base}, true), VeloxException); +#endif +} + } // namespace } // namespace facebook::velox