Skip to content

Commit

Permalink
Fix GROUPING SETS with multiple aliased grouping key columns
Browse files Browse the repository at this point in the history
GROUPING SETS can be specified with grouping keys that are
alias columns of the same input column. This is typically used to
compute multiple mixed aggregations on the same key.

The Velox operator always assumes that only input columns
are specified as grouping keys. Whereas Presto does send
plan fragments correctly specifying the output column name
for such cases.

Due to Velox's assumptions in each result grouping set,
the column or its alias column values all appear in the
same output column leading to incorrect results.
  • Loading branch information
aditi-pandit committed Oct 4, 2023
1 parent b55002a commit 4ef2976
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 55 deletions.
65 changes: 60 additions & 5 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,13 +401,53 @@ GroupIdNode::GroupIdNode(
"GroupIdNode requires two or more grouping sets.");
}

GroupIdNode::GroupIdNode(
PlanNodeId id,
std::vector<std::vector<std::string>> groupingSets,
std::vector<GroupingKeyInfo> groupingKeyInfos,
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source)
: PlanNode(std::move(id)),
sources_{source},
outputType_(getGroupIdOutputType(
groupingKeyInfos,
aggregationInputs,
groupIdName)),
groupingSetsWithColumns_(std::move(groupingSets)),
groupingKeyInfos_(std::move(groupingKeyInfos)),
aggregationInputs_(std::move(aggregationInputs)),
groupIdName_(std::move(groupIdName)) {
VELOX_CHECK_GE(
groupingSetsWithColumns_.size(),
2,
"GroupIdNode requires two or more grouping sets.");
}

void GroupIdNode::addDetails(std::stringstream& stream) const {
for (auto i = 0; i < groupingSets_.size(); ++i) {
if (!groupingSets_.empty()) {
for (auto i = 0; i < groupingSets_.size(); ++i) {
if (i > 0) {
stream << ", ";
}
stream << "[";
addFields(stream, groupingSets_[i]);
stream << "]";
}
return;
}

for (auto i = 0; i < groupingSetsWithColumns_.size(); ++i) {
if (i > 0) {
stream << ", ";
}
stream << "[";
addFields(stream, groupingSets_[i]);
for (auto j = 0; j < groupingSetsWithColumns_[i].size(); j++) {
if (j > 0) {
stream << ", ";
}
stream << groupingSetsWithColumns_[i][j];
}
stream << "]";
}
}
Expand All @@ -422,6 +462,8 @@ folly::dynamic GroupIdNode::GroupingKeyInfo::serialize() const {
folly::dynamic GroupIdNode::serialize() const {
auto obj = PlanNode::serialize();
obj["groupingSets"] = ISerializable::serialize(groupingSets_);
obj["groupingSetsWithColumns"] =
ISerializable::serialize(groupingSetsWithColumns_);
obj["aggregationInputs"] = ISerializable::serialize(aggregationInputs_);
obj["groupIdName"] = groupIdName_;
obj["groupingKeyInfos"] = folly::dynamic::array();
Expand All @@ -434,17 +476,30 @@ folly::dynamic GroupIdNode::serialize() const {
// static
PlanNodePtr GroupIdNode::create(const folly::dynamic& obj, void* context) {
auto source = deserializeSingleSource(obj, context);
auto groupingSets = ISerializable::deserialize<
std::vector<std::vector<FieldAccessTypedExpr>>>(obj["groupingSets"]);
std::vector<GroupingKeyInfo> groupingKeyInfos;
for (const auto& info : obj["groupingKeyInfos"]) {
groupingKeyInfos.push_back(
{info["output"].asString(),
ISerializable::deserialize<FieldAccessTypedExpr>(info["input"])});
}
auto groupingSets = ISerializable::deserialize<
std::vector<std::vector<FieldAccessTypedExpr>>>(obj["groupingSets"]);
if (!groupingSets.empty()) {
return std::make_shared<GroupIdNode>(
deserializePlanNodeId(obj),
std::move(groupingSets),
std::move(groupingKeyInfos),
deserializeFields(obj["aggregationInputs"], context),
obj["groupIdName"].asString(),
std::move(source));
}

auto groupingSetsWithColumns =
ISerializable::deserialize<std::vector<std::vector<std::string>>>(
obj["groupingSetsWithColumns"]);
return std::make_shared<GroupIdNode>(
deserializePlanNodeId(obj),
std::move(groupingSets),
std::move(groupingSetsWithColumns),
std::move(groupingKeyInfos),
deserializeFields(obj["aggregationInputs"], context),
obj["groupIdName"].asString(),
Expand Down
40 changes: 40 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,9 @@ class GroupIdNode : public PlanNode {
folly::dynamic serialize() const;
};

/// Note : This GroupIdNode constructor is deprecated. It uses deprecated
/// field 'groupingSets', and shall be removed after Presto is updated. Please
/// use the following GroupIdNode constructor.
/// @param id Plan node ID.
/// @param groupingSets A list of grouping key sets. Grouping keys within the
/// set must be unique, but grouping keys across sets may repeat.
Expand All @@ -803,6 +806,25 @@ class GroupIdNode : public PlanNode {
std::string groupIdName,
PlanNodePtr source);

/// @param id Plan node ID.
/// @param groupingSets A list of grouping key sets. Grouping keys within the
/// set must be unique, but grouping keys across sets may repeat.
/// Note: groupingSets are specified using output column names.
/// @param groupingKeyInfos The names and order of the grouping keys in the
/// output.
/// @param aggregationInputs Columns that contain inputs to the aggregate
/// functions.
/// @param groupIdName Name of the column that will contain the grouping set
/// ID (a zero based integer).
/// @param source Input plan node.
GroupIdNode(
PlanNodeId id,
std::vector<std::vector<std::string>> groupingSets,
std::vector<GroupingKeyInfo> groupingKeyInfos,
std::vector<FieldAccessTypedExprPtr> aggregationInputs,
std::string groupIdName,
PlanNodePtr source);

const RowTypePtr& outputType() const override {
return outputType_;
}
Expand All @@ -816,6 +838,10 @@ class GroupIdNode : public PlanNode {
return groupingSets_;
}

const std::vector<std::vector<std::string>>& groupingSetsWithColumns() const {
return groupingSetsWithColumns_;
}

const std::vector<GroupingKeyInfo>& groupingKeyInfos() const {
return groupingKeyInfos_;
}
Expand Down Expand Up @@ -845,7 +871,21 @@ class GroupIdNode : public PlanNode {

const std::vector<PlanNodePtr> sources_;
const RowTypePtr outputType_;

// groupingSetsWithColumns_ specifies groupingSets with output column names.
// This allows for the case when a single input column could map
// to multiple output columns which are used in separate grouping sets.
//
// This field will deprecate groupingSets_. groupingSets_ uses input fields
// in groupingSets. When a single input field is mapped to multiple
// output fields, then groupingSets_ looses track of which output column to
// use for the groupingSet. Note : The GroupIdNode should have only one of
// groupingSetsWithColumns_ or groupingSets_.
const std::vector<std::vector<std::string>> groupingSetsWithColumns_;
// Note: Deprecated by groupingSetsWithColumns_; This will be removed after
// Presto is updated to use groupingSetsWithColumns_;
const std::vector<std::vector<FieldAccessTypedExprPtr>> groupingSets_;

const std::vector<GroupingKeyInfo> groupingKeyInfos_;
const std::vector<FieldAccessTypedExprPtr> aggregationInputs_;
const std::string groupIdName_;
Expand Down
41 changes: 41 additions & 0 deletions velox/docs/develop/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,54 @@ followed by the group ID column. The type of group ID column is BIGINT.
- Description
* - groupingSets
- List of grouping key sets. Keys within each set must be unique, but keys can repeat across the sets.
- Grouping keys are specified with their output names.
* - groupingKeyInfos
- The names and order of the grouping key columns in the output.
* - aggregationInputs
- Input columns to duplicate.
* - groupIdName
- The name for the group-id column that identifies the grouping set. Zero-based integer corresponding to the position of the grouping set in the 'groupingSets' list.

To illustrate why GroupingSets should use output column names lets examine the following SQL query:
.. code-block:: sql
select COUNT(orderkey), count(distinct orderkey) from orders as t(orderkey);
In this query the user wants to compute both the COUNT of orderkeys and the COUNT of distinct orderkeys. The aggregations
are related and can be computed in a staged way using multiple grouping sets using the same input column as grouping key.
Presto generates the following plan for the query:

- Output[PlanNodeId 11][_col0, _col1] => [count:bigint, count_3:bigint]
_col0 := count
_col1 := count_3
- Project[PlanNodeId 211][projectLocality = LOCAL] => [count_3:bigint, count:bigint]
count := COALESCE(expr_17, BIGINT'0')
- LocalExchange[PlanNodeId 287][ROUND_ROBIN] () => [count_3:bigint, expr_17:bigint]
- Aggregate(FINAL)[PlanNodeId 210] => [count_3:bigint, expr_17:bigint]
count_3 := "presto.default.count"((count_18))
expr_17 := "presto.default.arbitrary"((arbitrary))
- LocalExchange[PlanNodeId 303][SINGLE] () => [count_18:bigint, arbitrary:bigint]
- Aggregate(PARTIAL)[PlanNodeId 301] => [count_18:bigint, arbitrary:bigint]
count_18 := "presto.default.count"((expr_15))
arbitrary := "presto.default.arbitrary"((expr_16))
- Project[PlanNodeId 209][projectLocality = LOCAL] => [expr_15:integer, expr_16:bigint]
expr_15 := IF((group) = (BIGINT'1'), field, null)
expr_16 := IF((group) = (BIGINT'0'), count_14, null)
- Aggregate(FINAL)[group, field][PlanNodeId 208] => [group:bigint, field:integer, count_14:bigint]
count_14 := "presto.default.count"((count_19))
- LocalExchange[PlanNodeId 312][HASH] (group, field) => [group:bigint, field:integer, count_19:bigint]
- Aggregate(PARTIAL)[group, field][PlanNodeId 310] => [group:bigint, field:integer, count_19:bigint]
count_19 := "presto.default.count"((field_13))
- GroupId[PlanNodeId 207][[field], [field]] => [field:integer, field_13:integer, group:bigint]
field := field
field_13 := field
- LocalExchange[PlanNodeId 284][ROUND_ROBIN] () => [field:integer]
- TableScan[PlanNodeId 0] => [field:integer]
field = Orders.orderkey

Here the GroupIdNode has 2 grouping sets using the same input column.
The two grouping sets are used in consecutive aggregations that use a groupingSet at a time
to compute the related aggregations.

HashJoinNode and MergeJoinNode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
36 changes: 24 additions & 12 deletions velox/exec/GroupId.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,39 @@ GroupId::GroupId(
"GroupId") {
const auto& inputType = groupIdNode->sources()[0]->outputType();

std::unordered_map<std::string, column_index_t>
inputToOutputGroupingKeyMapping;
std::unordered_map<column_index_t, column_index_t>
outputToInputGroupingKeyMapping;
for (const auto& groupingKeyInfo : groupIdNode->groupingKeyInfos()) {
inputToOutputGroupingKeyMapping[groupingKeyInfo.input->name()] =
outputType_->getChildIdx(groupingKeyInfo.output);
outputToInputGroupingKeyMapping[outputType_->getChildIdx(
groupingKeyInfo.output)] =
inputType->getChildIdx(groupingKeyInfo.input->name());
}

auto numGroupingSets = groupIdNode->groupingSets().size();
auto useGroupingSetExprs = !groupIdNode->groupingSets().empty();
auto numGroupingSets = useGroupingSetExprs
? groupIdNode->groupingSets().size()
: groupIdNode->groupingSetsWithColumns().size();
groupingKeyMappings_.reserve(numGroupingSets);

auto numGroupingKeys = groupIdNode->numGroupingKeys();

for (const auto& groupingSet : groupIdNode->groupingSets()) {
for (auto i = 0; i < numGroupingSets; i++) {
std::vector<column_index_t> mappings(numGroupingKeys, kMissingGroupingKey);
for (const auto& groupingKey : groupingSet) {
auto outputChannel =
inputToOutputGroupingKeyMapping.at(groupingKey->name());
auto inputChannel = inputType->getChildIdx(groupingKey->name());
auto numGroupingKeysInSet = useGroupingSetExprs
? groupIdNode->groupingSets()[i].size()
: groupIdNode->groupingSetsWithColumns()[i].size();
for (auto j = 0; j < numGroupingKeysInSet; j++) {
auto outputKeyName = useGroupingSetExprs
? groupIdNode->groupingSets()[i][j]->name()
: groupIdNode->groupingSetsWithColumns()[i][j];

auto outputChannel = outputType_->getChildIdx(outputKeyName);
VELOX_CHECK(
outputToInputGroupingKeyMapping.count(outputChannel) != 0,
"GroupIdNode didn't map grouping key {} to input channel",
outputKeyName);
auto inputChannel = outputToInputGroupingKeyMapping.at(outputChannel);
mappings[outputChannel] = inputChannel;
}

groupingKeyMappings_.emplace_back(std::move(mappings));
}

Expand Down
59 changes: 46 additions & 13 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1462,7 +1462,7 @@ TEST_F(AggregationTest, groupingSets) {
auto plan =
PlanBuilder()
.values({data})
.groupId({{"k1"}, {"k2"}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
Expand All @@ -1477,7 +1477,7 @@ TEST_F(AggregationTest, groupingSets) {
// group_id column.
plan = PlanBuilder()
.values({data})
.groupId({{"k1"}, {"k2"}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"})
.project(
{"k1",
"k2",
Expand All @@ -1500,14 +1500,15 @@ TEST_F(AggregationTest, groupingSets) {
"SELECT null, k2, count(1), null, max(b) FROM tmp GROUP BY k2");

// Cube.
plan = PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
.project({"k1", "k2", "count_1", "sum_a", "max_b"})
.planNode();
plan =
PlanBuilder()
.values({data})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
.project({"k1", "k2", "count_1", "sum_a", "max_b"})
.planNode();

assertQuery(
plan,
Expand All @@ -1516,7 +1517,7 @@ TEST_F(AggregationTest, groupingSets) {
// Rollup.
plan = PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {"k1"}, {}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
Expand Down Expand Up @@ -1551,7 +1552,7 @@ TEST_F(AggregationTest, groupingSetsOutput) {
auto reversedOrderPlan =
PlanBuilder()
.values({data})
.groupId({{"k2", "k1"}, {}}, {"a", "b"})
.groupId({"k2", "k1"}, {{"k2", "k1"}, {}}, {"a", "b"})
.capturePlanNode(reversedOrderGroupIdNode)
.singleAggregation(
{"k2", "k1", "group_id"},
Expand All @@ -1562,7 +1563,7 @@ TEST_F(AggregationTest, groupingSetsOutput) {
auto orderPlan =
PlanBuilder()
.values({data})
.groupId({{"k1", "k2"}, {}}, {"a", "b"})
.groupId({"k1", "k2"}, {{"k1", "k2"}, {}}, {"a", "b"})
.capturePlanNode(orderGroupIdNode)
.singleAggregation(
{"k1", "k2", "group_id"},
Expand Down Expand Up @@ -1591,6 +1592,38 @@ TEST_F(AggregationTest, groupingSetsOutput) {
assertEqualResults(orderResult.second, reversedOrderResult.second);
}

TEST_F(AggregationTest, groupingSetsSameKey) {
vector_size_t size = 5;
auto data = makeRowVector(
{"k1", "a", "b"},
{
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
makeFlatVector<int64_t>(size, [](auto row) { return row; }),
makeFlatVector<StringView>(
size,
[](auto row) {
auto str = std::string(row % 12, 'x');
return StringView(str);
}),
});

createDuckDbTable({data});

auto plan =
PlanBuilder()
.values({data})
.groupId({"k1", "k1 as k2"}, {{"k1"}, {"k2"}}, {"a", "b"})
.singleAggregation(
{"k1", "k2", "group_id"},
{"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"})
.project({"k1", "k2", "count_1", "sum_a", "max_b"})
.planNode();

assertQuery(
plan,
"SELECT k1, k2, count(1), sum(a), max(b) FROM (SELECT k1, k1 as k2, a, b FROM tmp) GROUP BY GROUPING SETS ((k1), (k2))");
}

TEST_F(AggregationTest, outputBatchSizeCheckWithSpill) {
const int vectorSize = 100;
const std::string strValue(1L << 20, 'a');
Expand Down
8 changes: 7 additions & 1 deletion velox/exec/tests/PlanNodeSerdeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,15 @@ TEST_F(PlanNodeSerdeTest, filter) {
TEST_F(PlanNodeSerdeTest, groupId) {
auto plan = PlanBuilder()
.values({data_})
.groupId({{"c0"}, {"c0", "c1"}}, {"c2"})
.groupId({"c0", "c1"}, {{"c0"}, {"c0", "c1"}}, {"c2"})
.planNode();
testSerde(plan);

plan = PlanBuilder()
.values({data_})
.groupId({"c0", "c0 as c1"}, {{"c0"}, {"c0", "c1"}}, {"c2"})
.planNode();
testSerde(plan);
}

TEST_F(PlanNodeSerdeTest, localPartition) {
Expand Down
Loading

0 comments on commit 4ef2976

Please sign in to comment.