Skip to content

Commit

Permalink
Unnest array of rows based on unnestArrayOfRows
Browse files Browse the repository at this point in the history
Velox unnests array of row type into a single column of the
element row type. Presto also supports unnesting such arrays
into multiple columns one for each child type of element row
type. Adds unnestArrayOfRows to add such unnesting support
in Unnest PlanNode.
  • Loading branch information
aditi-pandit committed Dec 16, 2023
1 parent 01756db commit 435ff31
Show file tree
Hide file tree
Showing 14 changed files with 374 additions and 36 deletions.
51 changes: 44 additions & 7 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -822,11 +822,13 @@ UnnestNode::UnnestNode(
std::vector<FieldAccessTypedExprPtr> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
bool unnestArrayOfRows,
const PlanNodePtr& source)
: PlanNode(id),
replicateVariables_{std::move(replicateVariables)},
unnestVariables_{std::move(unnestVariables)},
withOrdinality_{ordinalityName.has_value()},
unnestArrayOfRows_(unnestArrayOfRows),
sources_{source} {
// Calculate output type. First come "replicate" columns, followed by
// "unnest" columns, followed by an optional ordinality column.
Expand All @@ -838,25 +840,53 @@ UnnestNode::UnnestNode(
types.emplace_back(variable->type());
}

auto indexCheck = [](vector_size_t index, vector_size_t size) {
VELOX_USER_CHECK_LE(
index, size, "unnestNames do not include all UnnestNode columns");
};

int unnestIndex = 0;
auto unnestNamesLength = unnestNames.size();
for (const auto& variable : unnestVariables_) {
if (variable->type()->isArray()) {
names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(variable->type()->asArray().elementType());
} else if (variable->type()->isMap()) {
const auto& mapType = variable->type()->asMap();
const auto& type = variable->type();
if (type->isArray()) {
const auto& elementType = type->asArray().elementType();
if (elementType->isRow() && unnestArrayOfRows_) {
// Array of rows are flattened to a list of columns. There is one
// column for each child type of the row.
// If not unnestArrayOfRows_, array of rows like other arrays
// are flattened to a single column of the array type.
auto numChildren = elementType->size();
for (auto i = 0; i < numChildren; i++) {
indexCheck(unnestIndex, unnestNamesLength);
names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(elementType->childAt(i));
}
} else {
indexCheck(unnestIndex, unnestNamesLength);
names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(elementType);
}
} else if (type->isMap()) {
const auto& mapType = type->asMap();

indexCheck(unnestIndex, unnestNamesLength);
names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(mapType.keyType());

indexCheck(unnestIndex, unnestNamesLength);
names.emplace_back(unnestNames[unnestIndex++]);
types.emplace_back(mapType.valueType());
} else {
VELOX_FAIL(
VELOX_USER_FAIL(
"Unexpected type of unnest variable. Expected ARRAY or MAP, but got {}.",
variable->type()->toString());
type->toString());
}
}
VELOX_USER_CHECK_EQ(
unnestNames.size(),
unnestIndex,
"Size of unnestNames should match the number of output columns");

if (ordinalityName.has_value()) {
names.emplace_back(ordinalityName.value());
Expand All @@ -867,6 +897,10 @@ UnnestNode::UnnestNode(

void UnnestNode::addDetails(std::stringstream& stream) const {
addFields(stream, unnestVariables_);

if (unnestArrayOfRows_) {
stream << ", unnest arrays of rows";
}
}

folly::dynamic UnnestNode::serialize() const {
Expand All @@ -888,6 +922,8 @@ folly::dynamic UnnestNode::serialize() const {
if (withOrdinality_) {
obj["ordinalityName"] = outputType()->names().back();
}

obj["unnestArrayOfRows"] = unnestArrayOfRows_;
return obj;
}

Expand All @@ -909,6 +945,7 @@ PlanNodePtr UnnestNode::create(const folly::dynamic& obj, void* context) {
std::move(unnestVariables),
std::move(unnestNames),
ordinalityName,
obj["unnestArrayOfRows"].asBool(),
std::move(source));
}

Expand Down
50 changes: 43 additions & 7 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1897,29 +1897,60 @@ class LimitNode : public PlanNode {
};

/// Expands arrays and maps into separate columns. Arrays are expanded into a
/// single column, and maps are expanded into two columns (key, value). Can be
/// used to expand multiple columns. In this case will produce as many rows as
/// the highest cardinality array or map (the other columns are padded with
/// nulls). Optionally can produce an ordinality column that specifies the row
/// number starting with 1.
/// single column with the exception of arrays of rows.
/// Arrays of ROW type (if unnestArrayOfRows) are recursively un-nested
/// i.e each column from the ROW type is expanded into a separate output column.
/// If not unnestArrayOfRows, arrays of ROW type are expanded into a single
/// column as other non-row types.
/// Maps are expanded into two columns (key, value).
/// Can be used to expand multiple columns. In this case will produce as many
/// rows as the highest cardinality array or map (the other columns are padded
/// with nulls).
/// Optionally can produce an ordinality column that specifies the row number
/// starting with 1.
class UnnestNode : public PlanNode {
public:
/// @param replicateVariables Inputs that are projected as is
/// @param unnestVariables Inputs that are unnested. Must be of type ARRAY or
/// MAP.
/// @param unnestNames Names to use for unnested outputs: one name for each
/// array (element); two names for each map (key and value). The output names
/// must appear in the same order as unnestVariables.
/// array (element) except for array (row), which would have as many names
/// as the cardinality of the row type; two names for each map (key and
/// value).
/// The output names must appear in the same order as unnestVariables.
/// @param ordinalityName Optional name for the ordinality columns. If not
/// present, ordinality column is not produced.
/// @param unnestArrayOfRows Expands array of rows to multiple columns, one
/// for each child type of the row. If not unnestArrayOfRows, the array
/// like other non-row array types are expanded to a single column of
/// the row type.
UnnestNode(
const PlanNodeId& id,
std::vector<FieldAccessTypedExprPtr> replicateVariables,
std::vector<FieldAccessTypedExprPtr> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
bool unnestArrayOfRows,
const PlanNodePtr& source);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
UnnestNode(
const PlanNodeId& id,
std::vector<FieldAccessTypedExprPtr> replicateVariables,
std::vector<FieldAccessTypedExprPtr> unnestVariables,
const std::vector<std::string>& unnestNames,
const std::optional<std::string>& ordinalityName,
const PlanNodePtr& source)
: UnnestNode(
id,
replicateVariables,
unnestVariables,
unnestNames,
ordinalityName,
true,
source) {}
#endif

/// The order of columns in the output is: replicated columns (in the order
/// specified), unnested columns (in the order specified, for maps: key comes
/// before value), optional ordinality column.
Expand All @@ -1943,6 +1974,10 @@ class UnnestNode : public PlanNode {
return withOrdinality_;
}

bool unnestArrayOfRows() const {
return unnestArrayOfRows_;
}

std::string_view name() const override {
return "Unnest";
}
Expand All @@ -1957,6 +1992,7 @@ class UnnestNode : public PlanNode {
const std::vector<FieldAccessTypedExprPtr> replicateVariables_;
const std::vector<FieldAccessTypedExprPtr> unnestVariables_;
const bool withOrdinality_;
const bool unnestArrayOfRows_;
const std::vector<PlanNodePtr> sources_;
RowTypePtr outputType_;
};
Expand Down
16 changes: 10 additions & 6 deletions velox/docs/develop/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -647,11 +647,13 @@ UnnestNode
~~~~~~~~~~

The unnest operation expands arrays and maps into separate columns. Arrays are
expanded into a single column, and maps are expanded into two columns
(key, value). Can be used to expand multiple columns. In this case produces as
many rows as the highest cardinality array or map (the other columns are padded
with nulls). Optionally can produce an ordinality column that specifies the row
number starting with 1.
expanded into a single column. If unnestArrayOfRows is set, then arrays of
ROW type are recursively un-nested i.e each column from the ROW type is expanded
into a separate output column. Maps are expanded into two columns (key, value).
Can be used to expand multiple columns. In this case produces as many rows as
the highest cardinality array or map (the other columns are padded with nulls).
Optionally can produce an ordinality column that specifies the row number
starting with 1.

.. list-table::
:widths: 10 30
Expand All @@ -665,9 +667,11 @@ number starting with 1.
* - unnestVariables
- Input columns of type array or map to expand.
* - unnestNames
- Names to use for expanded columns. One name per array column. Two names per map column.
- Names to use for expanded columns. One name per array column. Two names per map column. If array of rows (and not legacyUnnest), then one column per child type of the row.
* - ordinalityName
- Optional name for the ordinality column.
* - unnestArrayOfRows
- Array of rows are expanded to multiple columns, one for each child type of the row. However, if not unnestArrayOfRows, array of rows (like other array types) are expanded to a single column of the row type.

.. _TableWriteNode:

Expand Down
42 changes: 32 additions & 10 deletions velox/exec/Unnest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Unnest::Unnest(
operatorId,
unnestNode->id(),
"Unnest"),
withOrdinality_(unnestNode->withOrdinality()) {
withOrdinality_(unnestNode->withOrdinality()),
unnestArrayOfRows_(unnestNode->unnestArrayOfRows()) {
const auto& inputType = unnestNode->sources()[0]->outputType();
const auto& unnestVariables = unnestNode->unnestVariables();
for (const auto& variable : unnestVariables) {
Expand Down Expand Up @@ -182,8 +183,7 @@ RowVectorPtr Unnest::generateOutput(
BufferPtr elementIndices = allocateIndices(numElements, pool());
auto* rawElementIndices = elementIndices->asMutable<vector_size_t>();

auto nulls =
AlignedBuffer::allocate<bool>(numElements, pool(), bits::kNotNull);
auto nulls = allocateNulls(numElements, pool());
auto rawNulls = nulls->asMutable<uint64_t>();

// Make dictionary index for elements column since they may be out of order.
Expand Down Expand Up @@ -220,13 +220,35 @@ RowVectorPtr Unnest::generateOutput(
// Construct unnest column using Array elements wrapped using above
// created dictionary.
auto unnestBaseArray = currentDecoded.base()->as<ArrayVector>();
outputs[outputsIndex++] = identityMapping
? unnestBaseArray->elements()
: wrapChild(
numElements,
elementIndices,
unnestBaseArray->elements(),
nulls);
auto elements = unnestBaseArray->elements();

if (elements->typeKind() == TypeKind::ROW && unnestArrayOfRows_) {
// In this case the array is un-nested into multiple columns, one for
// each child type of the row.
// The array row elements could be encoded, however. In that case, each
// child array accessed needs to be encoded with the array row elements
// encoding and then wrapped with the un-nest encoding for final output.
DecodedVector rowDecoded(*elements);
auto rowVector = rowDecoded.base()->as<RowVector>();

auto rowIsIdentity = rowDecoded.isIdentityMapping();
auto numRowElements = rowDecoded.size();

for (auto j = 0; j < rowVector->childrenSize(); j++) {
auto childVector = rowIsIdentity
? rowVector->childAt(j)
: rowDecoded.wrap(
rowVector->childAt(j), *elements, numRowElements);

outputs[outputsIndex++] = identityMapping
? childVector
: wrapChild(numElements, elementIndices, childVector, nulls);
}
} else {
outputs[outputsIndex++] = identityMapping
? elements
: wrapChild(numElements, elementIndices, elements, nulls);
}
} else {
// Construct two unnest columns for Map keys and values vectors wrapped
// using above created dictionary.
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Unnest.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Unnest : public Operator {
vector_size_t outputSize);

const bool withOrdinality_;
const bool unnestArrayOfRows_;
std::vector<column_index_t> unnestChannels_;

SelectivityVector inputRows_;
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/tests/PlanNodeSerdeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,19 @@ TEST_F(PlanNodeSerdeTest, unnest) {
plan =
PlanBuilder().values({data}).unnest({"c0"}, {"c1"}, "ordinal").planNode();
testSerde(plan);

// Plan with unnestArrayOfRows.
plan = PlanBuilder()
.values({data})
.unnest({"c0"}, {"c1"}, "ordinal", true)
.planNode();
testSerde(plan);

plan = PlanBuilder()
.values({data})
.unnest({"c0"}, {"c1"}, std::nullopt, true)
.planNode();
testSerde(plan);
}

TEST_F(PlanNodeSerdeTest, values) {
Expand Down
11 changes: 11 additions & 0 deletions velox/exec/tests/PlanNodeToStringTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,17 @@ TEST_F(PlanNodeToStringTest, unnest) {
ASSERT_EQ(
"-- Unnest[a0] -> c1:INTEGER, a0_e:SMALLINT\n",
plan->toString(true, false));

plan = PlanBuilder()
.values({data_})
.project({"array_constructor(c0) AS a0", "c1"})
.unnest({"c1"}, {"a0"}, std::nullopt, true)
.planNode();

ASSERT_EQ("-- Unnest\n", plan->toString());
ASSERT_EQ(
"-- Unnest[a0, unnest arrays of rows] -> c1:INTEGER, a0_e:SMALLINT\n",
plan->toString(true, false));
}

TEST_F(PlanNodeToStringTest, localPartition) {
Expand Down
20 changes: 20 additions & 0 deletions velox/exec/tests/QueryAssertionsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/type/Variant.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

using namespace facebook::velox::exec::test;
Expand Down Expand Up @@ -493,4 +494,23 @@ TEST_F(QueryAssertionsTest, intervalDayTime) {
assertQuery(plan, "SELECT * FROM tmp");
}

TEST_F(QueryAssertionsTest, equalPlans) {
VectorFuzzer::Options opts;
opts.containerVariableLength = false;
VectorFuzzer fuzzer(opts, pool());

auto input = fuzzer.fuzzFlat(INTEGER(), 100);
auto unnestPlan = PlanBuilder()
.values({makeRowVector({fuzzer.fuzzArray(input, 20)})})
.unnest({}, {"c0"})
.orderBy({"c0"}, false)
.planNode();

auto noUnnestPlan = PlanBuilder()
.values({makeRowVector({input})})
.orderBy({"c0"}, false)
.planNode();
assertEqualResults(unnestPlan, noUnnestPlan);
}

} // namespace facebook::velox::test
Loading

0 comments on commit 435ff31

Please sign in to comment.