Skip to content

Commit

Permalink
fix: Support dictionary with nulls in RowVector::pushDictionaryToRowV…
Browse files Browse the repository at this point in the history
…ectorLeaves (facebookincubator#12220)

Summary:

`RowVector::pushDictionaryToRowVectorLeaves` used to stop combining dictionaries if the dictionary introducing nulls over underlying `RowVector`.  We add support for this case as well in this change.

Differential Revision: D68919058
  • Loading branch information
Yuhta authored and facebook-github-bot committed Jan 31, 2025
1 parent b127d51 commit a83b96a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 34 deletions.
82 changes: 58 additions & 24 deletions velox/vector/ComplexVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,24 +687,17 @@ struct Wrapper {
BufferPtr indices;
};

void combineWrappers(
std::vector<Wrapper>& wrappers,
template <typename F>
void forEachCombinedIndex(
const std::vector<Wrapper>& wrappers,
vector_size_t size,
memory::MemoryPool* pool) {
std::vector<BufferPtr> wrapInfos(wrappers.size());
F&& f) {
std::vector<const vector_size_t*> sourceIndices(wrappers.size());
uint64_t* rawNulls = nullptr;
for (int i = 0; i < wrappers.size(); ++i) {
wrapInfos[i] = wrappers[i].dictionary->wrapInfo();
VELOX_CHECK_NOT_NULL(wrapInfos[i]);
sourceIndices[i] = wrapInfos[i]->as<vector_size_t>();
if (!rawNulls && wrappers[i].dictionary->nulls()) {
wrappers.back().nulls = allocateNulls(size, pool);
rawNulls = wrappers.back().nulls->asMutable<uint64_t>();
}
auto& wrapInfo = wrappers[i].dictionary->wrapInfo();
VELOX_CHECK_NOT_NULL(wrapInfo);
sourceIndices[i] = wrapInfo->as<vector_size_t>();
}
wrappers.back().indices = allocateIndices(size, pool);
auto* rawIndices = wrappers.back().indices->asMutable<vector_size_t>();
for (vector_size_t j = 0; j < size; ++j) {
auto index = j;
bool isNull = false;
Expand All @@ -715,12 +708,55 @@ void combineWrappers(
}
index = sourceIndices[i][index];
}
if (isNull) {
bits::setNull(rawNulls, j);
} else {
rawIndices[j] = index;
f(j, index, isNull);
}
}

void combineWrappers(
std::vector<Wrapper>& wrappers,
vector_size_t size,
memory::MemoryPool* pool) {
uint64_t* rawNulls = nullptr;
for (int i = 0; i < wrappers.size(); ++i) {
if (!rawNulls && wrappers[i].dictionary->nulls()) {
wrappers.back().nulls = allocateNulls(size, pool);
rawNulls = wrappers.back().nulls->asMutable<uint64_t>();
break;
}
}
wrappers.back().indices = allocateIndices(size, pool);
auto* rawIndices = wrappers.back().indices->asMutable<vector_size_t>();
forEachCombinedIndex(
wrappers,
size,
[&](vector_size_t outer, vector_size_t inner, bool isNull) {
if (isNull) {
bits::setNull(rawNulls, outer);
} else {
rawIndices[outer] = inner;
}
});
}

BufferPtr combineNulls(
const std::vector<Wrapper>& wrappers,
vector_size_t size,
const uint64_t* valueNulls,
memory::MemoryPool* pool) {
if (wrappers.size() == 1 && !valueNulls) {
return wrappers[0].dictionary->nulls();
}
auto nulls = allocateNulls(size, pool);
auto* rawNulls = nulls->asMutable<uint64_t>();
forEachCombinedIndex(
wrappers,
size,
[&](vector_size_t outer, vector_size_t inner, bool isNull) {
if (isNull || (valueNulls && bits::isBitNull(valueNulls, inner))) {
bits::setNull(rawNulls, outer);
}
});
return nulls;
}

VectorPtr wrapInDictionary(
Expand Down Expand Up @@ -765,9 +801,11 @@ VectorPtr pushDictionaryToRowVectorLeavesImpl(
}
case VectorEncoding::Simple::ROW: {
VELOX_CHECK_EQ(values->typeKind(), TypeKind::ROW);
auto nulls = values->nulls();
for (auto& wrapper : wrappers) {
if (wrapper.dictionary->nulls()) {
return wrapInDictionary(wrappers, size, values, pool);
nulls = combineNulls(wrappers, size, values->rawNulls(), pool);
break;
}
}
auto children = values->asUnchecked<RowVector>()->children();
Expand All @@ -778,11 +816,7 @@ VectorPtr pushDictionaryToRowVectorLeavesImpl(
}
}
return std::make_shared<RowVector>(
pool,
values->type(),
values->nulls(),
values->size(),
std::move(children));
pool, values->type(), std::move(nulls), size, std::move(children));
}
case VectorEncoding::Simple::DICTIONARY: {
Wrapper wrapper{values, nullptr, nullptr};
Expand Down
4 changes: 2 additions & 2 deletions velox/vector/ComplexVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ class RowVector : public BaseVector {
/// Push all dictionary encoding to the leave vectors of a RowVector tree
/// (i.e. we traverse the tree consists of RowVectors, possibly wrapped in
/// DictionaryVector, and no traverse into other complex types like array or
/// map children). If the wrapper introduce nulls on RowVector, we don't push
/// the dictionary into that RowVector. The input vector should not contain
/// map children). When new nulls are introduced on RowVector, we combine it
/// with the existing nulls on RowVector. The input vector should not contain
/// any unloaded lazy.
///
/// This is used for example in writing Nimble ArrayWithOffsets and
Expand Down
13 changes: 5 additions & 8 deletions velox/vector/tests/VectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3867,14 +3867,11 @@ TEST_F(VectorTest, pushDictionaryToRowVectorLeaves) {
auto& c4c2 = c4Row->childAt(2);
ASSERT_EQ(c4c2->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(c4c0->wrapInfo().get(), c4c2->wrapInfo().get());
auto& c5 = outputRow->childAt(5);
ASSERT_EQ(c5->encoding(), VectorEncoding::Simple::DICTIONARY);
ASSERT_EQ(c5->valueVector()->encoding(), VectorEncoding::Simple::ROW);
auto* c5Row = c5->valueVector()->asUnchecked<RowVector>();
auto& c5c0 = c5Row->childAt(0);
ASSERT_EQ(c5c0->encoding(), VectorEncoding::Simple::FLAT);
auto& c5c1 = c5Row->childAt(1);
ASSERT_EQ(c5c1->encoding(), VectorEncoding::Simple::FLAT);
auto* c5 = outputRow->childAt(5)->asChecked<RowVector>();
auto& c5c0 = c5->childAt(0);
ASSERT_EQ(c5c0->encoding(), VectorEncoding::Simple::DICTIONARY);
auto& c5c1 = c5->childAt(1);
ASSERT_EQ(c5c1->encoding(), VectorEncoding::Simple::DICTIONARY);
}
}

Expand Down

0 comments on commit a83b96a

Please sign in to comment.