Skip to content

Commit

Permalink
Allow copying vector to a new memory pool (#10647)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #10647

Until we have a proper solution to transfer vector memory ownership from velox/koski to downstream components, we extend the vector api to currently copy the vector with a downstream memory pool.

Reviewed By: Yuhta

Differential Revision: D60616369

fbshipit-source-id: 13515ae32ef9beadc2500e2c050f40244d46dc8e
  • Loading branch information
Huameng (Michael) Jiang authored and facebook-github-bot committed Aug 2, 2024
1 parent 5cff4e5 commit 84428f9
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 42 deletions.
15 changes: 9 additions & 6 deletions velox/vector/BaseVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,11 @@ class BaseVector {
const vector_size_t* toSourceRow);

/// Utility for making a deep copy of a whole vector.
static VectorPtr copy(const BaseVector& vector) {
auto result =
BaseVector::create(vector.type(), vector.size(), vector.pool());
static VectorPtr copy(
const BaseVector& vector,
velox::memory::MemoryPool* pool = nullptr) {
auto result = BaseVector::create(
vector.type(), vector.size(), pool ? pool : vector.pool());
result->copy(&vector, 0, 0, vector.size());
return result;
}
Expand Down Expand Up @@ -497,10 +499,11 @@ class BaseVector {
}

/// This makes a deep copy of the Vector allocating new child Vectors and
// Buffers recursively. Unlike copy, this preserves encodings recursively.
virtual VectorPtr copyPreserveEncodings() const = 0;
/// Buffers recursively. Unlike copy, this preserves encodings recursively.
virtual VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const = 0;

// Construct a zero-copy slice of the vector with the indicated offset and
/// Construct a zero-copy slice of the vector with the indicated offset and
/// length.
virtual VectorPtr slice(vector_size_t offset, vector_size_t length) const = 0;

Expand Down
10 changes: 6 additions & 4 deletions velox/vector/BiasVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ class BiasVector : public SimpleVector<T> {
VELOX_NYI();
}

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
auto selfPool = pool ? pool : BaseVector::pool_;
return std::make_shared<BiasVector<T>>(
BaseVector::pool_,
AlignedBuffer::copy(BaseVector::pool_, BaseVector::nulls_),
selfPool,
AlignedBuffer::copy(selfPool, BaseVector::nulls_),
BaseVector::length_,
valueType_,
AlignedBuffer::copy(BaseVector::pool_, values_),
AlignedBuffer::copy(selfPool, values_),
bias_,
SimpleVector<T>::stats_,
BaseVector::distinctValueCount_,
Expand Down
40 changes: 23 additions & 17 deletions velox/vector/ComplexVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,19 @@ class RowVector : public BaseVector {
const BaseVector* source,
const folly::Range<const CopyRange*>& ranges) override;

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
std::vector<VectorPtr> copiedChildren(children_.size());

for (auto i = 0; i < children_.size(); ++i) {
copiedChildren[i] = children_[i]->copyPreserveEncodings();
copiedChildren[i] = children_[i]->copyPreserveEncodings(pool);
}

auto selfPool = pool ? pool : pool_;
return std::make_shared<RowVector>(
pool_,
selfPool,
type_,
AlignedBuffer::copy(pool_, nulls_),
AlignedBuffer::copy(selfPool, nulls_),
length_,
copiedChildren,
nullCount_);
Expand Down Expand Up @@ -466,15 +468,17 @@ class ArrayVector : public ArrayVectorBase {
const BaseVector* source,
const folly::Range<const CopyRange*>& ranges) override;

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
auto selfPool = pool ? pool : pool_;
return std::make_shared<ArrayVector>(
pool_,
selfPool,
type_,
AlignedBuffer::copy(pool_, nulls_),
AlignedBuffer::copy(selfPool, nulls_),
length_,
AlignedBuffer::copy(pool_, offsets_),
AlignedBuffer::copy(pool_, sizes_),
elements_->copyPreserveEncodings(),
AlignedBuffer::copy(selfPool, offsets_),
AlignedBuffer::copy(selfPool, sizes_),
elements_->copyPreserveEncodings(pool),
nullCount_);
}

Expand Down Expand Up @@ -607,16 +611,18 @@ class MapVector : public ArrayVectorBase {
const BaseVector* source,
const folly::Range<const CopyRange*>& ranges) override;

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
auto selfPool = pool ? pool : pool_;
return std::make_shared<MapVector>(
pool_,
selfPool,
type_,
AlignedBuffer::copy(pool_, nulls_),
AlignedBuffer::copy(selfPool, nulls_),
length_,
AlignedBuffer::copy(pool_, offsets_),
AlignedBuffer::copy(pool_, sizes_),
keys_->copyPreserveEncodings(),
values_->copyPreserveEncodings(),
AlignedBuffer::copy(selfPool, offsets_),
AlignedBuffer::copy(selfPool, sizes_),
keys_->copyPreserveEncodings(pool),
values_->copyPreserveEncodings(pool),
nullCount_,
sortedKeys_);
}
Expand Down
10 changes: 6 additions & 4 deletions velox/vector/ConstantVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -356,18 +356,20 @@ class ConstantVector final : public SimpleVector<T> {
}
}

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
auto selfPool = pool ? pool : BaseVector::pool_;
if (valueVector_) {
return std::make_shared<ConstantVector<T>>(
BaseVector::pool_,
selfPool,
BaseVector::length_,
index_,
valueVector_->copyPreserveEncodings(),
valueVector_->copyPreserveEncodings(pool),
SimpleVector<T>::stats_);
}

return std::make_shared<ConstantVector<T>>(
BaseVector::pool_,
selfPool,
BaseVector::length_,
isNull_,
BaseVector::type_,
Expand Down
10 changes: 6 additions & 4 deletions velox/vector/DictionaryVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,15 @@ class DictionaryVector : public SimpleVector<T> {

void validate(const VectorValidateOptions& options) const override;

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
auto selfPool = pool ? pool : BaseVector::pool_;
return std::make_shared<DictionaryVector<T>>(
BaseVector::pool_,
AlignedBuffer::copy(BaseVector::pool_, BaseVector::nulls_),
selfPool,
AlignedBuffer::copy(selfPool, BaseVector::nulls_),
BaseVector::length_,
dictionaryValues_->copyPreserveEncodings(),
AlignedBuffer::copy(BaseVector::pool_, indices_),
AlignedBuffer::copy(selfPool, indices_),
SimpleVector<T>::stats_,
BaseVector::distinctValueCount_,
BaseVector::nullCount_,
Expand Down
5 changes: 3 additions & 2 deletions velox/vector/FlatVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,10 @@ class FlatVector final : public SimpleVector<T> {
const BaseVector* source,
const folly::Range<const BaseVector::CopyRange*>& ranges) override;

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
return std::make_shared<FlatVector<T>>(
BaseVector::pool_,
pool ? pool : BaseVector::pool_,
BaseVector::type_,
AlignedBuffer::copy(BaseVector::pool_, BaseVector::nulls_),
BaseVector::length_,
Expand Down
3 changes: 2 additions & 1 deletion velox/vector/FunctionVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ class FunctionVector : public BaseVector {
VELOX_NYI();
}

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* /* pool */ = nullptr) const override {
VELOX_UNSUPPORTED("copyPreserveEncodings not defined for FunctionVector");
}

Expand Down
3 changes: 2 additions & 1 deletion velox/vector/LazyVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ class LazyVector : public BaseVector {

void validate(const VectorValidateOptions& options) const override;

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* /* pool */ = nullptr) const override {
VELOX_UNSUPPORTED("copyPreserveEncodings not defined for LazyVector");
}

Expand Down
8 changes: 5 additions & 3 deletions velox/vector/SequenceVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,14 @@ class SequenceVector : public SimpleVector<T> {
return false;
}

VectorPtr copyPreserveEncodings() const override {
VectorPtr copyPreserveEncodings(
velox::memory::MemoryPool* pool = nullptr) const override {
auto selfPool = pool ? pool : BaseVector::pool_;
return std::make_shared<SequenceVector<T>>(
BaseVector::pool_,
selfPool,
BaseVector::length_,
sequenceValues_->copyPreserveEncodings(),
AlignedBuffer::copy(BaseVector::pool_, sequenceLengths_),
AlignedBuffer::copy(selfPool, sequenceLengths_),
SimpleVector<T>::stats_,
BaseVector::distinctValueCount_,
BaseVector::nullCount_,
Expand Down
16 changes: 16 additions & 0 deletions velox/vector/tests/CopyPreserveEncodingsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,20 @@ TEST_F(CopyPreserveEncodingsTest, sequenceNoNulls) {
assertEqualVectors(sequenceVector, copy);
validateCopyPreserveEncodings(sequenceVector, copy);
}

TEST_F(CopyPreserveEncodingsTest, newMemoryPool) {
auto dictionaryVector = vectorMaker_.dictionaryVector(generateIntInput());

auto sourcePool = dictionaryVector->pool();
auto targetPool = memory::memoryManager()->addLeafPool();
auto preCopySrcMemory = sourcePool->usedBytes();
ASSERT_EQ(0, targetPool->usedBytes());

auto copy = dictionaryVector->copyPreserveEncodings(targetPool.get());
assertEqualVectors(dictionaryVector, copy);
validateCopyPreserveEncodings(dictionaryVector, copy);

EXPECT_EQ(preCopySrcMemory, sourcePool->usedBytes());
EXPECT_EQ(preCopySrcMemory, targetPool->usedBytes());
}
} // namespace facebook::velox::test

0 comments on commit 84428f9

Please sign in to comment.