Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GroupId with aliased grouping key columns #6738

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,32 @@ RowTypePtr getGroupIdOutputType(

return ROW(std::move(names), std::move(types));
}

std::vector<std::vector<std::string>> getGroupingSets(
const std::vector<std::vector<FieldAccessTypedExprPtr>>& groupingSetFields,
const std::vector<GroupIdNode::GroupingKeyInfo>& groupingKeyInfos) {
std::unordered_map<std::string, std::string> inputToOutputGroupingKeyMap;
for (const auto& groupKeyInfo : groupingKeyInfos) {
inputToOutputGroupingKeyMap[groupKeyInfo.input->name()] =
groupKeyInfo.output;
}

// Prestissimo passes grouping keys with their input column name to Velox.
// But Velox expects the output column name for the grouping key.
std::vector<std::vector<std::string>> groupingSets;
groupingSets.reserve(groupingSetFields.size());
for (const auto& groupFields : groupingSetFields) {
std::vector<std::string> groupingKeys;
groupingKeys.reserve(groupFields.size());
for (const auto& groupingField : groupFields) {
groupingKeys.push_back(
inputToOutputGroupingKeyMap[groupingField->name()]);
}
groupingSets.push_back(groupingKeys);
}
return groupingSets;
}

} // namespace

GroupIdNode::GroupIdNode(
Expand All @@ -385,6 +411,29 @@ GroupIdNode::GroupIdNode(
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source)
: PlanNode(std::move(id)),
sources_{source},
outputType_(getGroupIdOutputType(
groupingKeyInfos,
aggregationInputs,
groupIdName)),
groupingSets_(getGroupingSets(groupingSets, groupingKeyInfos)),
groupingKeyInfos_(std::move(groupingKeyInfos)),
aggregationInputs_(std::move(aggregationInputs)),
groupIdName_(std::move(groupIdName)) {
VELOX_USER_CHECK_GE(
groupingSets_.size(),
2,
"GroupIdNode requires two or more grouping sets.");
}

GroupIdNode::GroupIdNode(
PlanNodeId id,
std::vector<std::vector<std::string>> groupingSets,
std::vector<GroupingKeyInfo> groupingKeyInfos,
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source)
: PlanNode(std::move(id)),
sources_{source},
outputType_(getGroupIdOutputType(
Expand All @@ -395,7 +444,7 @@ GroupIdNode::GroupIdNode(
groupingKeyInfos_(std::move(groupingKeyInfos)),
aggregationInputs_(std::move(aggregationInputs)),
groupIdName_(std::move(groupIdName)) {
VELOX_CHECK_GE(
VELOX_USER_CHECK_GE(
groupingSets_.size(),
2,
"GroupIdNode requires two or more grouping sets.");
Expand All @@ -407,7 +456,12 @@ void GroupIdNode::addDetails(std::stringstream& stream) const {
stream << ", ";
}
stream << "[";
addFields(stream, groupingSets_[i]);
for (auto j = 0; j < groupingSets_[i].size(); j++) {
if (j > 0) {
stream << ", ";
}
stream << groupingSets_[i][j];
}
stream << "]";
}
}
Expand All @@ -434,14 +488,16 @@ folly::dynamic GroupIdNode::serialize() const {
// static
PlanNodePtr GroupIdNode::create(const folly::dynamic& obj, void* context) {
auto source = deserializeSingleSource(obj, context);
auto groupingSets = ISerializable::deserialize<
std::vector<std::vector<FieldAccessTypedExpr>>>(obj["groupingSets"]);
std::vector<GroupingKeyInfo> groupingKeyInfos;
for (const auto& info : obj["groupingKeyInfos"]) {
groupingKeyInfos.push_back(
{info["output"].asString(),
ISerializable::deserialize<FieldAccessTypedExpr>(info["input"])});
}

auto groupingSets =
ISerializable::deserialize<std::vector<std::vector<std::string>>>(
obj["groupingSets"]);
return std::make_shared<GroupIdNode>(
deserializePlanNodeId(obj),
std::move(groupingSets),
Expand Down
30 changes: 27 additions & 3 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ class GroupIdNode : public PlanNode {
/// @param groupIdName Name of the column that will contain the grouping set
/// ID (a zero based integer).
/// @param source Input plan node.
/// NOTE: THIS FUNCTION IS DEPRECATED. PLEASE DO NOT USE.
GroupIdNode(
PlanNodeId id,
std::vector<std::vector<FieldAccessTypedExprPtr>> groupingSets,
Expand All @@ -803,6 +804,25 @@ class GroupIdNode : public PlanNode {
std::string groupIdName,
PlanNodePtr source);

/// @param id Plan node ID.
aditi-pandit marked this conversation as resolved.
Show resolved Hide resolved
/// @param groupingSets A list of grouping key sets. Grouping keys within the
/// set must be unique, but grouping keys across sets may repeat.
/// Note: groupingSets are specified using output column names.
/// @param groupingKeyInfos The names and order of the grouping keys in the
/// output.
/// @param aggregationInputs Columns that contain inputs to the aggregate
/// functions.
/// @param groupIdName Name of the column that will contain the grouping set
/// ID (a zero based integer).
/// @param source Input plan node.
GroupIdNode(
PlanNodeId id,
std::vector<std::vector<std::string>> groupingSets,
std::vector<GroupingKeyInfo> groupingKeyInfos,
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source);

const RowTypePtr& outputType() const override {
return outputType_;
}
Expand All @@ -811,8 +831,7 @@ class GroupIdNode : public PlanNode {
return sources_;
}

const std::vector<std::vector<FieldAccessTypedExprPtr>>& groupingSets()
const {
const std::vector<std::vector<std::string>>& groupingSets() const {
return groupingSets_;
}

Expand Down Expand Up @@ -845,7 +864,12 @@ class GroupIdNode : public PlanNode {

const std::vector<PlanNodePtr> sources_;
const RowTypePtr outputType_;
const std::vector<std::vector<FieldAccessTypedExprPtr>> groupingSets_;

// Specifies groupingSets with output column names.
// This allows for the case when a single input column could map
// to multiple output columns which are used in separate grouping sets.
const std::vector<std::vector<std::string>> groupingSets_;

const std::vector<GroupingKeyInfo> groupingKeyInfos_;
const std::vector<FieldAccessTypedExprPtr> aggregationInputs_;
const std::string groupIdName_;
Expand Down
70 changes: 70 additions & 0 deletions velox/docs/develop/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,83 @@ followed by the group ID column. The type of group ID column is BIGINT.
- Description
* - groupingSets
- List of grouping key sets. Keys within each set must be unique, but keys can repeat across the sets.
- Grouping keys are specified with their output names.
* - groupingKeyInfos
- The names and order of the grouping key columns in the output.
* - aggregationInputs
- Input columns to duplicate.
* - groupIdName
- The name for the group-id column that identifies the grouping set. Zero-based integer corresponding to the position of the grouping set in the 'groupingSets' list.

GroupIdNode is typically used to compute GROUPING SETS, CUBE and ROLLUP.

While usually GroupingSets do not repeat with the same grouping key column, there are some use-cases where
they might. To illustrate why GroupingSets might do so lets examine the following SQL query:

.. code-block:: sql

SELECT count(orderkey), count(DISTINCT orderkey) FROM orders;

In this query the user wants to compute global aggregates using the same column, though with
and without the DISTINCT clause. With a particular optimization strategy
`optimize.mixed-distinct-aggregations <https://www.qubole.com/blog/presto-optimizes-aggregations-over-distinct-values>`_, Presto uses GroupIdNode to compute these.

First, the optimizer creates a GroupIdNode to duplicate every row assigning one copy
to group 0 and another to group 1. This is achieved using the GroupIdNode with 2 grouping sets
each using orderkey as a grouping key. In order to disambiguate the
groups the orderkey column is aliased as a grouping key for one of the
grouping sets.

Lets say the orders table has 5 rows:

.. code-block::
aditi-pandit marked this conversation as resolved.
Show resolved Hide resolved

orderkey
1
aditi-pandit marked this conversation as resolved.
Show resolved Hide resolved
2
2
3
4

The GroupIdNode would transform this into:

.. code-block::

orderkey orderkey1 group_id
1 null 0
2 null 0
2 null 0
3 null 0
4 null 0
null 1 1
null 2 1
null 2 1
null 3 1
null 4 1

Then Presto plans an aggregation using (orderkey, group_id) and count(orderkey1).

This results in the following 5 rows:

.. code-block::

orderkey group_id count(orderkey1) as c
1 0 null
2 0 null
3 0 null
4 0 null
null 1 5

Then Presto plans a second aggregation with no keys and count(orderkey), arbitrary(c).
Since both aggregations ignore nulls this correctly computes the number of
distinct orderkeys and the count of all orderkeys.

.. code-block::

count(orderkey) arbitrary(c)
4 5


HashJoinNode and MergeJoinNode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
23 changes: 13 additions & 10 deletions velox/exec/GroupId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,27 @@ GroupId::GroupId(
"GroupId") {
const auto& inputType = groupIdNode->sources()[0]->outputType();

std::unordered_map<std::string, column_index_t>
inputToOutputGroupingKeyMapping;
std::unordered_map<column_index_t, column_index_t>
outputToInputGroupingKeyMapping;
for (const auto& groupingKeyInfo : groupIdNode->groupingKeyInfos()) {
inputToOutputGroupingKeyMapping[groupingKeyInfo.input->name()] =
outputType_->getChildIdx(groupingKeyInfo.output);
outputToInputGroupingKeyMapping[outputType_->getChildIdx(
groupingKeyInfo.output)] =
inputType->getChildIdx(groupingKeyInfo.input->name());
}

auto numGroupingSets = groupIdNode->groupingSets().size();
groupingKeyMappings_.reserve(numGroupingSets);

auto numGroupingKeys = groupIdNode->numGroupingKeys();

groupingKeyMappings_.reserve(groupIdNode->groupingSets().size());
for (const auto& groupingSet : groupIdNode->groupingSets()) {
std::vector<column_index_t> mappings(numGroupingKeys, kMissingGroupingKey);
for (const auto& groupingKey : groupingSet) {
auto outputChannel =
inputToOutputGroupingKeyMapping.at(groupingKey->name());
auto inputChannel = inputType->getChildIdx(groupingKey->name());
auto outputChannel = outputType_->getChildIdx(groupingKey);
VELOX_USER_CHECK_NE(
outputToInputGroupingKeyMapping.count(outputChannel),
0,
"GroupIdNode didn't map grouping key {} to input channel",
groupingKey);
auto inputChannel = outputToInputGroupingKeyMapping.at(outputChannel);
mappings[outputChannel] = inputChannel;
}

Expand Down
53 changes: 40 additions & 13 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ TEST_F(AggregationTest, groupingSets) {
auto plan =
PlanBuilder()
.values({data})
.groupId({{"k1"}, {"k2"}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
Expand All @@ -1474,7 +1474,7 @@ TEST_F(AggregationTest, groupingSets) {
// group_id column.
plan = PlanBuilder()
.values({data})
.groupId({{"k1"}, {"k2"}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"})
.project(
{"k1",
"k2",
Expand All @@ -1497,14 +1497,15 @@ TEST_F(AggregationTest, groupingSets) {
"SELECT null, k2, count(1), null, max(b) FROM tmp GROUP BY k2");

// Cube.
plan = PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
.project({"k1", "k2", "count_1", "sum_a", "max_b"})
.planNode();
plan =
PlanBuilder()
.values({data})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"})
aditi-pandit marked this conversation as resolved.
Show resolved Hide resolved
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
.project({"k1", "k2", "count_1", "sum_a", "max_b"})
.planNode();

assertQuery(
plan,
Expand All @@ -1513,7 +1514,7 @@ TEST_F(AggregationTest, groupingSets) {
// Rollup.
plan = PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {"k1"}, {}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
Expand Down Expand Up @@ -1544,7 +1545,7 @@ TEST_F(AggregationTest, groupingSetsOutput) {
auto reversedOrderPlan =
PlanBuilder()
.values({data})
.groupId({{"k2", "k1"}, {}}, {"a", "b"})
.groupId({"k2", "k1"}, {{"k2", "k1"}, {}}, {"a", "b"})
.capturePlanNode(reversedOrderGroupIdNode)
.singleAggregation(
{"k2", "k1", "group_id"},
Expand All @@ -1555,7 +1556,7 @@ TEST_F(AggregationTest, groupingSetsOutput) {
auto orderPlan =
PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {}}, {"a", "b"})
.capturePlanNode(orderGroupIdNode)
.singleAggregation(
{"k1", "k2", "group_id"},
Expand Down Expand Up @@ -1584,6 +1585,32 @@ TEST_F(AggregationTest, groupingSetsOutput) {
assertEqualResults(orderResult.second, reversedOrderResult.second);
}

TEST_F(AggregationTest, groupingSetsSameKey) {
auto data = makeRowVector(
{"o_key", "o_status"},
{makeFlatVector<int64_t>({0, 1, 2, 3, 4}),
makeFlatVector<std::string>({"", "x", "xx", "xxx", "xxxx"})});

createDuckDbTable({data});

auto plan = PlanBuilder()
.values({data})
.groupId(
{"o_key", "o_key as o_key_1"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still find this use case confusing. It is just not clear why would anything use a plan like this. I wish we could identify a compelling use case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt the query documented with its Presto plan was a better example. But I wasn't able to translate the IF expressions to equivalent CASE expressions. case group_id when 1 then 0 else null end could not compile since null is UNKNOWN type and CASE required all when/then expressions to evaluate to the same type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try "null::bigint"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will send out a follow up PR.

{{"o_key", "o_key_1"}, {"o_key"}, {"o_key_1"}, {}},
{"o_status"})
.singleAggregation(
{"o_key", "o_key_1", "group_id"},
{"max(o_status) as max_o_status"})
.project({"o_key", "o_key_1", "max_o_status"})
.planNode();

assertQuery(
plan,
"SELECT o_key, o_key_1, max(o_status) as max_o_status FROM ("
"select o_key, o_key as o_key_1, o_status FROM tmp) GROUP BY GROUPING SETS ((o_key, o_key_1), (o_key), (o_key_1), ())");
}

TEST_F(AggregationTest, outputBatchSizeCheckWithSpill) {
const int numVectors = 5;
const int vectorSize = 20;
Expand Down
Loading