Skip to content

Commit

Permalink
Fix GROUPING SETS with multiple aliased grouping key columns
Browse files Browse the repository at this point in the history
GROUPING SETS can be specified with grouping keys that are
alias columns of the same input column. This is typically used to
compute multiple mixed aggregations on the same key.

The Velox operator always assumes that only input columns
are specified as grouping keys. Whereas Presto does send
plan fragments correctly specifying the output column name
for such cases.

Due to Velox's assumptions in each result grouping set,
the column or its alias column values all appear in the
same output column leading to incorrect results.
  • Loading branch information
aditi-pandit committed Oct 6, 2023
1 parent 4364ac5 commit 80a4f8f
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 55 deletions.
64 changes: 60 additions & 4 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,32 @@ RowTypePtr getGroupIdOutputType(

return ROW(std::move(names), std::move(types));
}

std::vector<std::vector<std::string>> getGroupingSets(
const std::vector<std::vector<FieldAccessTypedExprPtr>>& groupingSetFields,
const std::vector<GroupIdNode::GroupingKeyInfo>& groupingKeyInfos) {
std::unordered_map<std::string, std::string> inputToOutputGroupingKeyMap;
for (const auto& groupKeyInfo : groupingKeyInfos) {
inputToOutputGroupingKeyMap[groupKeyInfo.input->name()] =
groupKeyInfo.output;
}

// Prestissimo passes grouping keys with their input column name to Velox.
// But Velox expects the output column name for the grouping key.
std::vector<std::vector<std::string>> groupingSets;
groupingSets.reserve(groupingSetFields.size());
for (const auto& groupFields : groupingSetFields) {
std::vector<std::string> groupingKeys;
groupingKeys.reserve(groupFields.size());
for (const auto& groupingField : groupFields) {
groupingKeys.push_back(
inputToOutputGroupingKeyMap[groupingField->name()]);
}
groupingSets.push_back(groupingKeys);
}
return groupingSets;
}

} // namespace

GroupIdNode::GroupIdNode(
Expand All @@ -385,6 +411,29 @@ GroupIdNode::GroupIdNode(
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source)
: PlanNode(std::move(id)),
sources_{source},
outputType_(getGroupIdOutputType(
groupingKeyInfos,
aggregationInputs,
groupIdName)),
groupingSets_(getGroupingSets(groupingSets, groupingKeyInfos)),
groupingKeyInfos_(std::move(groupingKeyInfos)),
aggregationInputs_(std::move(aggregationInputs)),
groupIdName_(std::move(groupIdName)) {
VELOX_USER_CHECK_GE(
groupingSets_.size(),
2,
"GroupIdNode requires two or more grouping sets.");
}

GroupIdNode::GroupIdNode(
PlanNodeId id,
std::vector<std::vector<std::string>> groupingSets,
std::vector<GroupingKeyInfo> groupingKeyInfos,
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source)
: PlanNode(std::move(id)),
sources_{source},
outputType_(getGroupIdOutputType(
Expand All @@ -395,7 +444,7 @@ GroupIdNode::GroupIdNode(
groupingKeyInfos_(std::move(groupingKeyInfos)),
aggregationInputs_(std::move(aggregationInputs)),
groupIdName_(std::move(groupIdName)) {
VELOX_CHECK_GE(
VELOX_USER_CHECK_GE(
groupingSets_.size(),
2,
"GroupIdNode requires two or more grouping sets.");
Expand All @@ -407,7 +456,12 @@ void GroupIdNode::addDetails(std::stringstream& stream) const {
stream << ", ";
}
stream << "[";
addFields(stream, groupingSets_[i]);
for (auto j = 0; j < groupingSets_[i].size(); j++) {
if (j > 0) {
stream << ", ";
}
stream << groupingSets_[i][j];
}
stream << "]";
}
}
Expand All @@ -434,14 +488,16 @@ folly::dynamic GroupIdNode::serialize() const {
// static
PlanNodePtr GroupIdNode::create(const folly::dynamic& obj, void* context) {
auto source = deserializeSingleSource(obj, context);
auto groupingSets = ISerializable::deserialize<
std::vector<std::vector<FieldAccessTypedExpr>>>(obj["groupingSets"]);
std::vector<GroupingKeyInfo> groupingKeyInfos;
for (const auto& info : obj["groupingKeyInfos"]) {
groupingKeyInfos.push_back(
{info["output"].asString(),
ISerializable::deserialize<FieldAccessTypedExpr>(info["input"])});
}

auto groupingSets =
ISerializable::deserialize<std::vector<std::vector<std::string>>>(
obj["groupingSets"]);
return std::make_shared<GroupIdNode>(
deserializePlanNodeId(obj),
std::move(groupingSets),
Expand Down
30 changes: 27 additions & 3 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ class GroupIdNode : public PlanNode {
/// @param groupIdName Name of the column that will contain the grouping set
/// ID (a zero based integer).
/// @param source Input plan node.
/// NOTE: THIS FUNCTION IS DEPRECATED. PLEASE DO NOT USE.
GroupIdNode(
PlanNodeId id,
std::vector<std::vector<FieldAccessTypedExprPtr>> groupingSets,
Expand All @@ -803,6 +804,25 @@ class GroupIdNode : public PlanNode {
std::string groupIdName,
PlanNodePtr source);

/// @param id Plan node ID.
/// @param groupingSets A list of grouping key sets. Grouping keys within the
/// set must be unique, but grouping keys across sets may repeat.
/// Note: groupingSets are specified using output column names.
/// @param groupingKeyInfos The names and order of the grouping keys in the
/// output.
/// @param aggregationInputs Columns that contain inputs to the aggregate
/// functions.
/// @param groupIdName Name of the column that will contain the grouping set
/// ID (a zero based integer).
/// @param source Input plan node.
GroupIdNode(
PlanNodeId id,
std::vector<std::vector<std::string>> groupingSets,
std::vector<GroupingKeyInfo> groupingKeyInfos,
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source);

const RowTypePtr& outputType() const override {
return outputType_;
}
Expand All @@ -811,8 +831,7 @@ class GroupIdNode : public PlanNode {
return sources_;
}

const std::vector<std::vector<FieldAccessTypedExprPtr>>& groupingSets()
const {
const std::vector<std::vector<std::string>>& groupingSets() const {
return groupingSets_;
}

Expand Down Expand Up @@ -845,7 +864,12 @@ class GroupIdNode : public PlanNode {

const std::vector<PlanNodePtr> sources_;
const RowTypePtr outputType_;
const std::vector<std::vector<FieldAccessTypedExprPtr>> groupingSets_;

// Specifies groupingSets with output column names.
// This allows for the case when a single input column could map
// to multiple output columns which are used in separate grouping sets.
const std::vector<std::vector<std::string>> groupingSets_;

const std::vector<GroupingKeyInfo> groupingKeyInfos_;
const std::vector<FieldAccessTypedExprPtr> aggregationInputs_;
const std::string groupIdName_;
Expand Down
70 changes: 70 additions & 0 deletions velox/docs/develop/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,83 @@ followed by the group ID column. The type of group ID column is BIGINT.
- Description
* - groupingSets
- List of grouping key sets. Keys within each set must be unique, but keys can repeat across the sets.
- Grouping keys are specified with their output names.
* - groupingKeyInfos
- The names and order of the grouping key columns in the output.
* - aggregationInputs
- Input columns to duplicate.
* - groupIdName
- The name for the group-id column that identifies the grouping set. Zero-based integer corresponding to the position of the grouping set in the 'groupingSets' list.

GroupIdNode is typically used to compute GROUPING SETS, CUBE and ROLLUP.

While usually GroupingSets do not repeat with the same grouping key column, there are some use-cases where
they might. To illustrate why GroupingSets might do so lets examine the following SQL query:

.. code-block:: sql
SELECT count(orderkey), count(DISTINCT orderkey) FROM orders;
In this query the user wants to compute global aggregates using the same column, though with
and without the DISTINCT clause. With a particular optimization strategy
`optimize.mixed-distinct-aggregations <https://www.qubole.com/blog/presto-optimizes-aggregations-over-distinct-values>`_, Presto uses GroupIdNode to compute these.

First, the optimizer creates a GroupIdNode to duplicate every row assigning one copy
to group 0 and another to group 1. This is achieved using the GroupIdNode with 2 grouping sets
each using orderkey as a grouping key. In order to disambiguate the
groups the orderkey column is aliased as a grouping key for one of the
grouping sets.

Lets say the orders table has 5 rows:

.. code-block::
orderkey
1
2
2
3
4
The GroupIdNode would transform this into:

.. code-block::
orderkey orderkey1 group_id
1 null 0
2 null 0
2 null 0
3 null 0
4 null 0
null 1 1
null 2 1
null 2 1
null 3 1
null 4 1
Then Presto plans an aggregation using (orderkey, group_id) and count(orderkey1).

This results in the following 5 rows:

.. code-block::
orderkey group_id count(orderkey1) as c
1 0 null
2 0 null
3 0 null
4 0 null
null 1 5
Then Presto plans a second aggregation with no keys and count(orderkey), arbitrary(c).
Since both aggregations ignore nulls this correctly computes the number of
distinct orderkeys and the count of all orderkeys.

.. code-block::
count(orderkey) arbitrary(c)
4 5
HashJoinNode and MergeJoinNode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
23 changes: 13 additions & 10 deletions velox/exec/GroupId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,27 @@ GroupId::GroupId(
"GroupId") {
const auto& inputType = groupIdNode->sources()[0]->outputType();

std::unordered_map<std::string, column_index_t>
inputToOutputGroupingKeyMapping;
std::unordered_map<column_index_t, column_index_t>
outputToInputGroupingKeyMapping;
for (const auto& groupingKeyInfo : groupIdNode->groupingKeyInfos()) {
inputToOutputGroupingKeyMapping[groupingKeyInfo.input->name()] =
outputType_->getChildIdx(groupingKeyInfo.output);
outputToInputGroupingKeyMapping[outputType_->getChildIdx(
groupingKeyInfo.output)] =
inputType->getChildIdx(groupingKeyInfo.input->name());
}

auto numGroupingSets = groupIdNode->groupingSets().size();
groupingKeyMappings_.reserve(numGroupingSets);

auto numGroupingKeys = groupIdNode->numGroupingKeys();

groupingKeyMappings_.reserve(groupIdNode->groupingSets().size());
for (const auto& groupingSet : groupIdNode->groupingSets()) {
std::vector<column_index_t> mappings(numGroupingKeys, kMissingGroupingKey);
for (const auto& groupingKey : groupingSet) {
auto outputChannel =
inputToOutputGroupingKeyMapping.at(groupingKey->name());
auto inputChannel = inputType->getChildIdx(groupingKey->name());
auto outputChannel = outputType_->getChildIdx(groupingKey);
VELOX_USER_CHECK_NE(
outputToInputGroupingKeyMapping.count(outputChannel),
0,
"GroupIdNode didn't map grouping key {} to input channel",
groupingKey);
auto inputChannel = outputToInputGroupingKeyMapping.at(outputChannel);
mappings[outputChannel] = inputChannel;
}

Expand Down
53 changes: 40 additions & 13 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ TEST_F(AggregationTest, groupingSets) {
auto plan =
PlanBuilder()
.values({data})
.groupId({{"k1"}, {"k2"}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
Expand All @@ -1473,7 +1473,7 @@ TEST_F(AggregationTest, groupingSets) {
// group_id column.
plan = PlanBuilder()
.values({data})
.groupId({{"k1"}, {"k2"}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"})
.project(
{"k1",
"k2",
Expand All @@ -1496,14 +1496,15 @@ TEST_F(AggregationTest, groupingSets) {
"SELECT null, k2, count(1), null, max(b) FROM tmp GROUP BY k2");

// Cube.
plan = PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
.project({"k1", "k2", "count_1", "sum_a", "max_b"})
.planNode();
plan =
PlanBuilder()
.values({data})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
.project({"k1", "k2", "count_1", "sum_a", "max_b"})
.planNode();

assertQuery(
plan,
Expand All @@ -1512,7 +1513,7 @@ TEST_F(AggregationTest, groupingSets) {
// Rollup.
plan = PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {"k1"}, {}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
Expand Down Expand Up @@ -1543,7 +1544,7 @@ TEST_F(AggregationTest, groupingSetsOutput) {
auto reversedOrderPlan =
PlanBuilder()
.values({data})
.groupId({{"k2", "k1"}, {}}, {"a", "b"})
.groupId({"k2", "k1"}, {{"k2", "k1"}, {}}, {"a", "b"})
.capturePlanNode(reversedOrderGroupIdNode)
.singleAggregation(
{"k2", "k1", "group_id"},
Expand All @@ -1554,7 +1555,7 @@ TEST_F(AggregationTest, groupingSetsOutput) {
auto orderPlan =
PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {}}, {"a", "b"})
.capturePlanNode(orderGroupIdNode)
.singleAggregation(
{"k1", "k2", "group_id"},
Expand Down Expand Up @@ -1583,6 +1584,32 @@ TEST_F(AggregationTest, groupingSetsOutput) {
assertEqualResults(orderResult.second, reversedOrderResult.second);
}

TEST_F(AggregationTest, groupingSetsSameKey) {
auto data = makeRowVector(
{"o_key", "o_status"},
{makeFlatVector<int64_t>({0, 1, 2, 3, 4}),
makeFlatVector<std::string>({"", "x", "xx", "xxx", "xxxx"})});

createDuckDbTable({data});

auto plan = PlanBuilder()
.values({data})
.groupId(
{"o_key", "o_key as o_key_1"},
{{"o_key", "o_key_1"}, {"o_key"}, {"o_key_1"}, {}},
{"o_status"})
.singleAggregation(
{"o_key", "o_key_1", "group_id"},
{"max(o_status) as max_o_status"})
.project({"o_key", "o_key_1", "max_o_status"})
.planNode();

assertQuery(
plan,
"SELECT o_key, o_key_1, max(o_status) as max_o_status FROM ("
"select o_key, o_key as o_key_1, o_status FROM tmp) GROUP BY GROUPING SETS ((o_key, o_key_1), (o_key), (o_key_1), ())");
}

TEST_F(AggregationTest, outputBatchSizeCheckWithSpill) {
const int vectorSize = 100;
const std::string strValue(1L << 20, 'a');
Expand Down
Loading

0 comments on commit 80a4f8f

Please sign in to comment.