Skip to content

Commit

Permalink
Limit Dictionary to Single Level
Browse files Browse the repository at this point in the history
- Modifies wrapInDictionary to flatten indices of a wrapped dictionary iwth the wrapping indices.
- Makes lazy loading of a dictionary encoded column to combine the indices with a dictionary wrapper if loading with lazy wrapped in a dictionary.
- Adds functions to transpose dictionaries with and without nulls.
- Changes  NestedLoopJoin and MergeJoin so  that they wrap their input only after the wrapping indices are known. Previously these would wrap first and only then fill in the indices.
- Checks that we do not come across multiple nested dictionaries.
  • Loading branch information
Orri Erling committed Nov 6, 2024
1 parent 789ce65 commit baae829
Show file tree
Hide file tree
Showing 27 changed files with 574 additions and 153 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,7 @@ if(${VELOX_BUILD_TESTING})
set_source(c-ares)
resolve_dependency(c-ares)

set_source(gRPC)
resolve_dependency(gRPC)
# set_source(gRPC) resolve_dependency(gRPC)
endif()

if(VELOX_ENABLE_REMOTE_FUNCTIONS)
Expand Down
2 changes: 1 addition & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ class QueryConfig {
}

bool validateOutputFromOperators() const {
return get<bool>(kValidateOutputFromOperators, false);
return get<bool>(kValidateOutputFromOperators, true);
}

bool isExpressionEvaluationCacheEnabled() const {
Expand Down
12 changes: 9 additions & 3 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ void HashProbe::fillLeftSemiProjectMatchColumn(vector_size_t size) {

void HashProbe::fillOutput(vector_size_t size) {
prepareOutput(size);
WrapState state;

for (auto projection : identityProjections_) {
// Load input vector if it is being split into multiple batches. It is not
Expand All @@ -778,7 +779,7 @@ void HashProbe::fillOutput(vector_size_t size) {
auto inputChild = input_->childAt(projection.inputChannel);

output_->childAt(projection.outputChannel) =
wrapChild(size, outputRowMapping_, inputChild);
wrapOne(size, outputRowMapping_, inputChild, nullptr, state);
}

if (isLeftSemiProjectJoin(joinType_)) {
Expand Down Expand Up @@ -1115,6 +1116,7 @@ bool HashProbe::maybeReadSpillOutput() {

RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
std::vector<VectorPtr> filterColumns(filterInputType_->size());
WrapState state;
for (auto projection : filterInputProjections_) {
if (projectedInputColumns_.find(projection.inputChannel) !=
projectedInputColumns_.end()) {
Expand All @@ -1129,8 +1131,12 @@ RowVectorPtr HashProbe::createFilterInput(vector_size_t size) {
ensureLoadedIfNotAtEnd(projection.inputChannel);
}

filterColumns[projection.outputChannel] = wrapChild(
size, outputRowMapping_, input_->childAt(projection.inputChannel));
filterColumns[projection.outputChannel] = wrapOne(
size,
outputRowMapping_,
input_->childAt(projection.inputChannel),
nullptr,
state);
}

extractColumns(
Expand Down
1 change: 1 addition & 0 deletions velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/exec/HashPartitionFunction.h"
#include "velox/exec/HashTable.h"
#include "velox/exec/Operator.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/ProbeOperatorState.h"
#include "velox/exec/VectorHasher.h"

Expand Down
6 changes: 4 additions & 2 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/exec/LocalPartition.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"

namespace facebook::velox::exec {
Expand Down Expand Up @@ -284,9 +285,10 @@ RowVectorPtr
wrapChildren(const RowVectorPtr& input, vector_size_t size, BufferPtr indices) {
std::vector<VectorPtr> wrappedChildren;
wrappedChildren.reserve(input->type()->size());
WrapState state;
for (auto i = 0; i < input->type()->size(); i++) {
wrappedChildren.emplace_back(BaseVector::wrapInDictionary(
BufferPtr(nullptr), indices, size, input->childAt(i)));
wrappedChildren.emplace_back(
wrapOne(size, indices, input->childAt(i), nullptr, state));
}

return std::make_shared<RowVector>(
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/LocalPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
#include "velox/exec/Values.h"
#include "velox/exec/Window.h"

DEFINE_bool(merge_project, true, "Merge consecutive filter and project nodes");

namespace facebook::velox::exec {

namespace detail {
Expand Down Expand Up @@ -431,8 +433,10 @@ std::shared_ptr<Driver> DriverFactory::createDriver(
std::dynamic_pointer_cast<const core::FilterNode>(planNode)) {
if (i < planNodes.size() - 1) {
auto next = planNodes[i + 1];
if (auto projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(next)) {
std::shared_ptr<const core::ProjectNode> projectNode;
if (FLAGS_merge_project &&
(projectNode =
std::dynamic_pointer_cast<const core::ProjectNode>(next))) {
operators.push_back(std::make_unique<FilterProject>(
id, ctx.get(), filterNode, projectNode));
i++;
Expand Down
Loading

0 comments on commit baae829

Please sign in to comment.