Skip to content

Commit

Permalink
Limit Dictionary to Single Level
Browse files Browse the repository at this point in the history
- Modifies wrapInDictionary to flatten indices of a wrapped dictionary iwth the wrapping indices.
- Makes lazy loading of a dictionary encoded column to combine the indices with a dictionary wrapper if loading with lazy wrapped in a dictionary.
- Adds functions to transpose dictionaries with and without nulls.
- Changes  NestedLoopJoin and MergeJoin so  that they wrap their input only after the wrapping indices are known. Previously these would wrap first and only then fill in the indices.
- Checks that we do not come across multiple nested dictionaries.
  • Loading branch information
Orri Erling committed Sep 6, 2024
1 parent 1a42a5b commit 1298d05
Show file tree
Hide file tree
Showing 25 changed files with 563 additions and 145 deletions.
12 changes: 9 additions & 3 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ void HashProbe::fillLeftSemiProjectMatchColumn(vector_size_t size) {

void HashProbe::fillOutput(vector_size_t size) {
prepareOutput(size);
WrapState state;

for (auto projection : identityProjections_) {
// Load input vector if it is being split into multiple batches. It is not
Expand All @@ -775,7 +776,7 @@ void HashProbe::fillOutput(vector_size_t size) {
auto inputChild = input_->childAt(projection.inputChannel);

output_->childAt(projection.outputChannel) =
wrapChild(size, outputRowMapping_, inputChild);
wrapOne(size, outputRowMapping_, inputChild, nullptr, state);
}

if (isLeftSemiProjectJoin(joinType_)) {
Expand Down Expand Up @@ -1106,6 +1107,7 @@ bool HashProbe::maybeReadSpillOutput() {

RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
std::vector<VectorPtr> filterColumns(filterInputType_->size());
WrapState state;
for (auto projection : filterInputProjections_) {
if (projectedInputColumns_.find(projection.inputChannel) !=
projectedInputColumns_.end()) {
Expand All @@ -1120,8 +1122,12 @@ RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
ensureLoadedIfNotAtEnd(projection.inputChannel);
}

filterColumns[projection.outputChannel] = wrapChild(
size, outputRowMapping_, input_->childAt(projection.inputChannel));
filterColumns[projection.outputChannel] = wrapOne(
size,
outputRowMapping_,
input_->childAt(projection.inputChannel),
nullptr,
state);
}

extractColumns(
Expand Down
1 change: 1 addition & 0 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/exec/HashPartitionFunction.h"
#include "velox/exec/HashTable.h"
#include "velox/exec/Operator.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/ProbeOperatorState.h"
#include "velox/exec/VectorHasher.h"

Expand Down
6 changes: 4 additions & 2 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/LocalPartition.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -283,9 +284,10 @@ RowVectorPtr
wrapChildren(const RowVectorPtr& input, vector_size_t size, BufferPtr indices) {
std::vector<VectorPtr> wrappedChildren;
wrappedChildren.reserve(input->type()->size());
WrapState state;
for (auto i = 0; i < input->type()->size(); i++) {
wrappedChildren.emplace_back(BaseVector::wrapInDictionary(
BufferPtr(nullptr), indices, size, input->childAt(i)));
wrappedChildren.emplace_back(
wrapOne(size, indices, input->childAt(i), nullptr, state));
}

return std::make_shared<RowVector>(
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
#include "velox/exec/Values.h"
#include "velox/exec/Window.h"

DEFINE_bool(merge_project, true, "Merge consecutive filter and project nodes");

namespace facebook::velox::exec {

namespace detail {
Expand Down Expand Up @@ -424,8 +426,10 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
std::dynamic_pointer_cast<const core::FilterNode>(planNode)) {
if (i < planNodes.size() - 1) {
auto next = planNodes[i + 1];
if (auto projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(next)) {
std::shared_ptr<const core::ProjectNode> projectNode;
if (FLAGS_merge_project &&
(projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(next))) {
operators.push_back(std::make_unique<FilterProject>(
id, ctx.get(), filterNode, projectNode));
i++;
Expand Down
167 changes: 111 additions & 56 deletions velox/exec/MergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,7 @@ void MergeJoin::addOutputRowForLeftJoin(
VELOX_USER_CHECK(isLeftJoin(joinType_) || isAntiJoin(joinType_));
rawLeftIndices_[outputSize_] = leftIndex;

for (const auto& projection : rightProjections_) {
const auto& target = output_->childAt(projection.outputChannel);
target->setNull(outputSize_, true);
}
addRightNulls();

if (joinTracker_) {
// Record left-side row with no match on the right side.
Expand All @@ -294,11 +291,7 @@ void MergeJoin::addOutputRowForRightJoin(
VELOX_USER_CHECK(isRightJoin(joinType_));
rawRightIndices_[outputSize_] = rightIndex;

for (const auto& projection : leftProjections_) {
const auto& target = output_->childAt(projection.outputChannel);
target->setNull(outputSize_, true);
}

addNull(leftNulls_);
if (joinTracker_) {
// Record right-side row with no match on the left side.
joinTracker_->addMiss(outputSize_);
Expand All @@ -307,11 +300,42 @@ void MergeJoin::addOutputRowForRightJoin(
++outputSize_;
}

void MergeJoin::addNull(BufferPtr& nulls) {
if (!nulls) {
nulls = AlignedBuffer::allocate<bool>(
outputBatchSize_, operatorCtx_->pool(), bits::kNotNull);
}
bits::setNull(nulls->asMutable<uint64_t>(), outputSize_);
}

void MergeJoin::addRightNulls() {
if (isRightFlattened_) {
for (const auto& projection : rightProjections_) {
const auto& target = output_->childAt(projection.outputChannel);
target->setNull(outputSize_, true);
}
} else {
addNull(rightNulls_);
}
}

void MergeJoin::flattenRightProjections() {
auto& children = output_->children();

for (const auto& projection : rightProjections_) {
auto& currentVector = children[projection.outputChannel];
VectorPtr currentVector;
if (currentRight_) {
currentVector = BaseVector::wrapInDictionary(
rightNulls_,
rightIndices_,
outputSize_,
currentRight_->childAt(projection.inputChannel));
} else {
currentVector = BaseVector::createNullConstant(
outputType_->childAt(projection.outputChannel),
outputSize_,
operatorCtx_->pool());
}
auto newFlat = BaseVector::create(
currentVector->type(), outputBatchSize_, operatorCtx_->pool());
newFlat->copy(currentVector.get(), 0, 0, outputSize_);
Expand Down Expand Up @@ -397,43 +421,9 @@ bool MergeJoin::prepareOutput(

// Create left side projection outputs.
std::vector<VectorPtr> localColumns(outputType_->size());
if (newLeft == nullptr) {
for (const auto& projection : leftProjections_) {
localColumns[projection.outputChannel] = BaseVector::create(
outputType_->childAt(projection.outputChannel),
outputBatchSize_,
operatorCtx_->pool());
}
} else {
for (const auto& projection : leftProjections_) {
localColumns[projection.outputChannel] = BaseVector::wrapInDictionary(
{},
leftIndices_,
outputBatchSize_,
newLeft->childAt(projection.inputChannel));
}
}
currentLeft_ = newLeft;

// Create right side projection outputs.
if (right == nullptr) {
for (const auto& projection : rightProjections_) {
localColumns[projection.outputChannel] = BaseVector::create(
outputType_->childAt(projection.outputChannel),
outputBatchSize_,
operatorCtx_->pool());
}
isRightFlattened_ = true;
} else {
for (const auto& projection : rightProjections_) {
localColumns[projection.outputChannel] = BaseVector::wrapInDictionary(
{},
rightIndices_,
outputBatchSize_,
right->childAt(projection.inputChannel));
}
isRightFlattened_ = false;
}
currentRight_ = right;

output_ = std::make_shared<RowVector>(
Expand Down Expand Up @@ -469,10 +459,6 @@ bool MergeJoin::prepareOutput(

if (filter_ != nullptr && filterInput_ == nullptr) {
std::vector<VectorPtr> inputs(filterInputType_->size());
for (const auto [filterInputChannel, outputChannel] :
filterInputToOutputChannel_) {
inputs[filterInputChannel] = output_->childAt(outputChannel);
}
for (auto i = 0; i < filterInputType_->size(); ++i) {
if (filterInputToOutputChannel_.find(i) !=
filterInputToOutputChannel_.end()) {
Expand All @@ -492,6 +478,69 @@ bool MergeJoin::prepareOutput(
return false;
}

void MergeJoin::wrapOutput() {
auto& outputColumns = output_->children();
if (currentLeft_ == nullptr) {
for (const auto& projection : leftProjections_) {
outputColumns[projection.outputChannel] = BaseVector::createNullConstant(
outputType_->childAt(projection.outputChannel),
outputSize_,
operatorCtx_->pool());
}
} else {
WrapState leftState;
for (const auto& projection : leftProjections_) {
outputColumns[projection.outputChannel] = wrapOne(
outputSize_,
leftIndices_,
currentLeft_->childAt(projection.inputChannel),
leftNulls_,
leftState);
}
}

// If right is flattened, the output columns are ready. If there is no right
// side, we have constant nulls on the right. Else we have the right side
// wrapped in 'rightIndices_' with 'rightNulls_' added if present.
if (!isRightFlattened_) {
if (currentRight_ == nullptr) {
for (const auto& projection : rightProjections_) {
outputColumns[projection.outputChannel] =
BaseVector::createNullConstant(
outputType_->childAt(projection.outputChannel),
outputSize_,
operatorCtx_->pool());
}
isRightFlattened_ = true;
} else {
WrapState rightState;
for (const auto& projection : rightProjections_) {
outputColumns[projection.outputChannel] = wrapOne(
outputSize_,
rightIndices_,
currentRight_->childAt(projection.inputChannel),
rightNulls_,
rightState);
}
}
}

// 'output_' will be moved to return and be clear. 'rightFlattened_' is
// never true after this.
isRightFlattened_ = false;
leftNulls_ = nullptr;
rightNulls_ = nullptr;
output_->resize(outputSize_);

// Patch the filter inputs that are also projected out so that the
// filter input references the child vector from 'output_'
auto& inputs = filterInput_->children();
for (const auto [filterInputChannel, outputChannel] :
filterInputToOutputChannel_) {
inputs[filterInputChannel] = output_->childAt(outputChannel);
}
}

bool MergeJoin::addToOutput() {
size_t firstLeftBatch;
vector_size_t leftStartIndex;
Expand Down Expand Up @@ -528,7 +577,6 @@ bool MergeJoin::addToOutput() {
r == numRights - 1 ? rightMatch_->endIndex : right->size();

if (prepareOutput(left, right)) {
output_->resize(outputSize_);
leftMatch_->setCursor(l, i);
rightMatch_->setCursor(r, rightStart);
return true;
Expand Down Expand Up @@ -697,6 +745,7 @@ RowVectorPtr MergeJoin::doGetOutput() {
// Not all rows from the last match fit in the output. Continue producing
// results from the current match.
if (addToOutput()) {
wrapOutput();
return std::move(output_);
}
}
Expand Down Expand Up @@ -752,6 +801,7 @@ RowVectorPtr MergeJoin::doGetOutput() {
VELOX_CHECK(rightMatch_ && rightMatch_->complete);

if (addToOutput()) {
wrapOutput();
return std::move(output_);
}
}
Expand All @@ -762,11 +812,12 @@ RowVectorPtr MergeJoin::doGetOutput() {
// If output_ is currently wrapping a different buffer, return it
// first.
if (prepareOutput(input_, nullptr)) {
output_->resize(outputSize_);
wrapOutput();
return std::move(output_);
}
while (true) {
if (outputSize_ == outputBatchSize_) {
wrapOutput();
return std::move(output_);
}
addOutputRowForLeftJoin(input_, index_);
Expand All @@ -781,20 +832,21 @@ RowVectorPtr MergeJoin::doGetOutput() {
}

if (noMoreInput_ && output_) {
output_->resize(outputSize_);
wrapOutput();
return std::move(output_);
}
} else if (isRightJoin(joinType_)) {
if (rightInput_ && noMoreInput_) {
// If output_ is currently wrapping a different buffer, return it
// first.
if (prepareOutput(nullptr, rightInput_)) {
output_->resize(outputSize_);
wrapOutput();
return std::move(output_);
}

while (true) {
if (outputSize_ == outputBatchSize_) {
wrapOutput();
return std::move(output_);
}

Expand All @@ -810,13 +862,13 @@ RowVectorPtr MergeJoin::doGetOutput() {
}

if (noMoreRightInput_ && output_) {
output_->resize(outputSize_);
wrapOutput();
return std::move(output_);
}
} else {
if (noMoreInput_ || noMoreRightInput_) {
if (output_) {
output_->resize(outputSize_);
wrapOutput();
return std::move(output_);
}
input_ = nullptr;
Expand All @@ -837,11 +889,12 @@ RowVectorPtr MergeJoin::doGetOutput() {
// If output_ is currently wrapping a different buffer, return it
// first.
if (prepareOutput(input_, nullptr)) {
output_->resize(outputSize_);
wrapOutput();
return std::move(output_);
}

if (outputSize_ == outputBatchSize_) {
wrapOutput();
return std::move(output_);
}
addOutputRowForLeftJoin(input_, index_);
Expand All @@ -864,11 +917,12 @@ RowVectorPtr MergeJoin::doGetOutput() {
// If output_ is currently wrapping a different buffer, return it
// first.
if (prepareOutput(nullptr, rightInput_)) {
output_->resize(outputSize_);
wrapOutput();
return std::move(output_);
}

if (outputSize_ == outputBatchSize_) {
wrapOutput();
return std::move(output_);
}

Expand Down Expand Up @@ -935,6 +989,7 @@ RowVectorPtr MergeJoin::doGetOutput() {
}

if (addToOutput()) {
wrapOutput();
return std::move(output_);
}

Expand Down
Loading

0 comments on commit 1298d05

Please sign in to comment.