Skip to content

Commit

Permalink
Add MapVector::update (facebookincubator#8801)
Browse files Browse the repository at this point in the history
Summary:

Add utility function to implement map update functionality for table
mutability use case.

Differential Revision: D53928298
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 20, 2024
1 parent 3020196 commit b80b06e
Show file tree
Hide file tree
Showing 5 changed files with 414 additions and 0 deletions.
210 changes: 210 additions & 0 deletions velox/vector/ComplexVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,216 @@ void MapVector::copyRanges(
copyRangesImpl(source, ranges, &values_, &keys_);
}

namespace {

struct UpdateSource {
vector_size_t entryIndex;
int8_t sourceIndex;
};

template <typename T>
class UpdateMapRow {
public:
void insert(
const DecodedVector* decoded,
vector_size_t entryIndex,
int8_t sourceIndex) {
values_[decoded->valueAt<T>(entryIndex)] = {entryIndex, sourceIndex};
}

template <typename F>
void forEachEntry(F&& func) {
for (auto& [_, source] : values_) {
func(source);
}
}

void clear() {
values_.clear();
}

private:
folly::F14FastMap<T, UpdateSource> values_;
};

template <>
class UpdateMapRow<void> {
public:
void insert(
const DecodedVector* decoded,
vector_size_t entryIndex,
int8_t sourceIndex) {
references_[{decoded, entryIndex}] = {entryIndex, sourceIndex};
}

template <typename F>
void forEachEntry(F&& func) {
for (auto& [_, source] : references_) {
func(source);
}
}

void clear() {
references_.clear();
}

private:
struct Reference {
const DecodedVector* decoded;
vector_size_t index;

bool operator==(const Reference& other) const {
return decoded->base()->equalValueAt(
other.decoded->base(),
decoded->index(index),
other.decoded->index(other.index));
}
};

struct ReferenceHasher {
uint64_t operator()(const Reference& key) const {
return key.decoded->base()->hashValueAt(key.decoded->index(key.index));
}
};

folly::F14FastMap<Reference, UpdateSource, ReferenceHasher> references_;
};

} // namespace

template <TypeKind kKeyTypeKind>
MapVectorPtr MapVector::updateImpl(
const std::vector<MapVectorPtr>& others,
bool deleteNullMapValue) const {
auto newNulls = nulls();
bool allocatedNewNulls = false;
for (auto& other : others) {
if (!other->nulls()) {
continue;
}
if (!newNulls) {
newNulls = other->nulls();
continue;
}
if (!allocatedNewNulls) {
auto* prevNewNulls = newNulls->as<uint64_t>();
newNulls = allocateNulls(size(), pool());
allocatedNewNulls = true;
bits::andBits(
newNulls->asMutable<uint64_t>(),
prevNewNulls,
other->rawNulls(),
0,
size());
}
}

auto newOffsets = allocateIndices(size(), pool());
auto* rawNewOffsets = newOffsets->asMutable<vector_size_t>();
auto newSizes = allocateIndices(size(), pool());
auto* rawNewSizes = newSizes->asMutable<vector_size_t>();

std::vector<DecodedVector> keys;
keys.reserve(1 + others.size());
for (auto& other : others) {
VELOX_CHECK(*keys_->type() == *other->keys_->type());
keys.emplace_back(*other->keys_);
}
keys.emplace_back(*keys_);

std::vector<std::vector<BaseVector::CopyRange>> ranges(1 + others.size());
UpdateMapRow<typename TypeTraits<kKeyTypeKind>::NativeType> mapRow;
vector_size_t numEntries = 0;
for (vector_size_t i = 0; i < size(); ++i) {
rawNewOffsets[i] = numEntries;
if (newNulls && bits::isBitNull(newNulls->as<uint64_t>(), i)) {
rawNewSizes[i] = 0;
continue;
}
bool needUpdate = false;
for (auto& other : others) {
if (other->sizeAt(i) > 0) {
needUpdate = true;
break;
}
}
if (!needUpdate) {
// Fast path for no update on current row. Common in case of table
// mutability delta update.
rawNewSizes[i] = sizeAt(i);
if (sizeAt(i) > 0) {
ranges.back().push_back({offsetAt(i), numEntries, sizeAt(i)});
numEntries += sizeAt(i);
}
continue;
}
auto offset = offsetAt(i);
auto size = sizeAt(i);
for (vector_size_t j = 0; j < size; ++j) {
auto jj = offset + j;
VELOX_DCHECK(!keys.back().isNullAt(jj));
VELOX_DCHECK(!deleteNullMapValue || !values_->isNullAt(jj));
mapRow.insert(&keys.back(), jj, others.size());
}
for (int k = 0; k < others.size(); ++k) {
offset = others[k]->offsetAt(i);
size = others[k]->sizeAt(i);
for (vector_size_t j = 0; j < size; ++j) {
auto jj = offset + j;
VELOX_DCHECK(!keys[k].isNullAt(jj));
mapRow.insert(&keys[k], jj, k);
}
}
vector_size_t newSize = 0;
mapRow.forEachEntry([&](UpdateSource source) {
if (deleteNullMapValue) {
auto* vector = source.sourceIndex == others.size()
? this
: others[source.sourceIndex].get();
if (vector->values_->isNullAt(source.entryIndex)) {
return;
}
}
ranges[source.sourceIndex].push_back(
{source.entryIndex, numEntries + newSize, 1});
++newSize;
});
mapRow.clear();
rawNewSizes[i] = newSize;
numEntries += newSize;
}

auto newKeys = BaseVector::create(mapKeys()->type(), numEntries, pool());
auto newValues = BaseVector::create(mapValues()->type(), numEntries, pool());
for (int k = 0; k < ranges.size(); ++k) {
auto* vector = k == others.size() ? this : others[k].get();
newKeys->copyRanges(vector->mapKeys().get(), ranges[k]);
newValues->copyRanges(vector->mapValues().get(), ranges[k]);
}

return std::make_shared<MapVector>(
pool(),
type(),
std::move(newNulls),
size(),
std::move(newOffsets),
std::move(newSizes),
std::move(newKeys),
std::move(newValues));
}

MapVectorPtr MapVector::update(
const std::vector<MapVectorPtr>& others,
bool deleteNullMapValue) const {
VELOX_CHECK(!others.empty());
VELOX_CHECK(others.size() < std::numeric_limits<int8_t>::max());
for (auto& other : others) {
VELOX_CHECK_EQ(size(), other->size());
}
return VELOX_DYNAMIC_TYPE_DISPATCH(
updateImpl, keys_->typeKind(), others, deleteNullMapValue);
}

void RowVector::appendNulls(vector_size_t numberOfRows) {
VELOX_CHECK_GE(numberOfRows, 0);
if (numberOfRows == 0) {
Expand Down
20 changes: 20 additions & 0 deletions velox/vector/ComplexVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,20 @@ class MapVector : public ArrayVectorBase {

void validate(const VectorValidateOptions& options) const override;

/// Update this map vector (base) with a list of map vectors (updates) of same
/// size. Maps are updated row-wise, i.e. for a certain key in each row, we
/// keep the entry from the last update map containing the key. If no update
/// map contains the key, we use the entry from base. Any null map in either
/// base or updates creates a null row in the result.
///
/// When deleteNullMapValue is true, if a map value is null in update vector
/// (argument), we drop the entry from base, instead of overwriting it with an
/// entry with null value. No null map value in base vector is allowed in
/// this case.
std::shared_ptr<MapVector> update(
const std::vector<std::shared_ptr<MapVector>>& others,
bool deleteNullMapValue) const;

protected:
virtual void resetDataDependentFlags(const SelectivityVector* rows) override {
BaseVector::resetDataDependentFlags(rows);
Expand All @@ -629,6 +643,11 @@ class MapVector : public ArrayVectorBase {
// get elements in key order in each map.
BufferPtr elementIndices() const;

template <TypeKind kKeyTypeKind>
std::shared_ptr<MapVector> updateImpl(
const std::vector<std::shared_ptr<MapVector>>& others,
bool deleteNullMapValue) const;

VectorPtr keys_;
VectorPtr values_;
bool sortedKeys_;
Expand All @@ -649,4 +668,5 @@ inline BufferPtr allocateOffsets(vector_size_t size, memory::MemoryPool* pool) {
inline BufferPtr allocateSizes(vector_size_t size, memory::MemoryPool* pool) {
return AlignedBuffer::allocate<vector_size_t>(size, pool, 0);
}

} // namespace facebook::velox
4 changes: 4 additions & 0 deletions velox/vector/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ add_executable(copy_benchmark CopyBenchmark.cpp)

target_link_libraries(copy_benchmark velox_vector_test_lib Folly::folly
${FOLLY_BENCHMARK})

add_executable(velox_vector_map_update_benchmark MapUpdateBenchmark.cpp)
target_link_libraries(velox_vector_map_update_benchmark velox_vector_test_lib
Folly::folly ${FOLLY_BENCHMARK})
101 changes: 101 additions & 0 deletions velox/vector/benchmarks/MapUpdateBenchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/vector/ComplexVector.h"
#include "velox/vector/tests/utils/VectorMaker.h"

#include <folly/Benchmark.h>
#include <folly/init/Init.h>

#include <random>

namespace facebook::velox {
namespace {

MapVectorPtr generateBase(
memory::MemoryPool* pool,
std::default_random_engine& gen) {
constexpr int kSize = 1000;
constexpr int kMapSize = 10;
test::VectorMaker maker(pool);
std::uniform_real_distribution valueDist;
return maker.mapVector<int64_t, double>(
kSize,
[&](auto /*i*/) { return kMapSize; },
[](auto /*i*/, auto j) { return j; },
[&](auto /*i*/, auto /*j*/) { return valueDist(gen); });
}

MapVectorPtr generateUpdate(
memory::MemoryPool* pool,
std::default_random_engine& gen,
const MapVector& base) {
constexpr int kMapSize = 5;
auto offsets = allocateIndices(base.size(), pool);
auto* rawOffsets = offsets->asMutable<vector_size_t>();
rawOffsets[0] = 0;
for (int i = 1; i < base.size(); ++i) {
rawOffsets[i] = rawOffsets[i - 1] + kMapSize;
}
auto sizes = allocateIndices(base.size(), pool);
auto* rawSizes = sizes->asMutable<vector_size_t>();
std::fill(rawSizes, rawSizes + base.size(), kMapSize);
int64_t keyCandidates[10];
std::iota(std::begin(keyCandidates), std::end(keyCandidates), 0);
auto keys = BaseVector::create<FlatVector<int64_t>>(
BIGINT(), kMapSize * base.size(), pool);
for (int i = 0; i < base.size(); ++i) {
std::shuffle(std::begin(keyCandidates), std::end(keyCandidates), gen);
for (int j = 0; j < kMapSize; ++j) {
keys->set(j + i * kMapSize, keyCandidates[j]);
}
}
test::VectorMaker maker(pool);
std::uniform_real_distribution valueDist;
auto values = maker.flatVector<double>(
kMapSize * base.size(), [&](auto /*i*/) { return valueDist(gen); });
return std::make_shared<MapVector>(
pool,
base.type(),
nullptr,
base.size(),
std::move(offsets),
std::move(sizes),
std::move(keys),
std::move(values));
}

} // namespace
} // namespace facebook::velox

int main(int argc, char* argv[]) {
using namespace facebook::velox;
folly::Init follyInit(&argc, &argv);
memory::MemoryManager::initialize({});
auto pool = memory::memoryManager()->addLeafPool();
std::default_random_engine gen(std::random_device{}());
auto base = generateBase(pool.get(), gen);
auto update = generateUpdate(pool.get(), gen, *base);

folly::addBenchmark(__FILE__, "int64 keys", [&] {
auto result = base->update({update}, true);
folly::doNotOptimizeAway(result);
return 1000;
});
folly::runBenchmarks();

return 0;
}
Loading

0 comments on commit b80b06e

Please sign in to comment.