diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 4a1129a0b0d4..d0a522cc3921 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -376,6 +376,32 @@ RowTypePtr getGroupIdOutputType( return ROW(std::move(names), std::move(types)); } + +std::vector> getGroupingSets( + const std::vector>& groupingSetFields, + const std::vector& groupingKeyInfos) { + std::unordered_map inputToOutputGroupingKeyMap; + for (const auto& groupKeyInfo : groupingKeyInfos) { + inputToOutputGroupingKeyMap[groupKeyInfo.input->name()] = + groupKeyInfo.output; + } + + // Prestissimo passes grouping keys with their input column name to Velox. + // But Velox expects the output column name for the grouping key. + std::vector> groupingSets; + groupingSets.reserve(groupingSetFields.size()); + for (const auto& groupFields : groupingSetFields) { + std::vector groupingKeys; + groupingKeys.reserve(groupFields.size()); + for (const auto& groupingField : groupFields) { + groupingKeys.push_back( + inputToOutputGroupingKeyMap[groupingField->name()]); + } + groupingSets.push_back(groupingKeys); + } + return groupingSets; +} + } // namespace GroupIdNode::GroupIdNode( @@ -385,6 +411,29 @@ GroupIdNode::GroupIdNode( std::vector aggregationInputs, std::string groupIdName, PlanNodePtr source) + : PlanNode(std::move(id)), + sources_{source}, + outputType_(getGroupIdOutputType( + groupingKeyInfos, + aggregationInputs, + groupIdName)), + groupingSets_(getGroupingSets(groupingSets, groupingKeyInfos)), + groupingKeyInfos_(std::move(groupingKeyInfos)), + aggregationInputs_(std::move(aggregationInputs)), + groupIdName_(std::move(groupIdName)) { + VELOX_USER_CHECK_GE( + groupingSets_.size(), + 2, + "GroupIdNode requires two or more grouping sets."); +} + +GroupIdNode::GroupIdNode( + PlanNodeId id, + std::vector> groupingSets, + std::vector groupingKeyInfos, + std::vector aggregationInputs, + std::string groupIdName, + PlanNodePtr source) : PlanNode(std::move(id)), sources_{source}, outputType_(getGroupIdOutputType( @@ -395,7 +444,7 @@ GroupIdNode::GroupIdNode( groupingKeyInfos_(std::move(groupingKeyInfos)), aggregationInputs_(std::move(aggregationInputs)), groupIdName_(std::move(groupIdName)) { - VELOX_CHECK_GE( + VELOX_USER_CHECK_GE( groupingSets_.size(), 2, "GroupIdNode requires two or more grouping sets."); @@ -407,7 +456,12 @@ void GroupIdNode::addDetails(std::stringstream& stream) const { stream << ", "; } stream << "["; - addFields(stream, groupingSets_[i]); + for (auto j = 0; j < groupingSets_[i].size(); j++) { + if (j > 0) { + stream << ", "; + } + stream << groupingSets_[i][j]; + } stream << "]"; } } @@ -434,14 +488,16 @@ folly::dynamic GroupIdNode::serialize() const { // static PlanNodePtr GroupIdNode::create(const folly::dynamic& obj, void* context) { auto source = deserializeSingleSource(obj, context); - auto groupingSets = ISerializable::deserialize< - std::vector>>(obj["groupingSets"]); std::vector groupingKeyInfos; for (const auto& info : obj["groupingKeyInfos"]) { groupingKeyInfos.push_back( {info["output"].asString(), ISerializable::deserialize(info["input"])}); } + + auto groupingSets = + ISerializable::deserialize>>( + obj["groupingSets"]); return std::make_shared( deserializePlanNodeId(obj), std::move(groupingSets), diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 08cf25568e46..88c45c4a063a 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -795,6 +795,7 @@ class GroupIdNode : public PlanNode { /// @param groupIdName Name of the column that will contain the grouping set /// ID (a zero based integer). /// @param source Input plan node. + /// NOTE: THIS FUNCTION IS DEPRECATED. PLEASE DO NOT USE. GroupIdNode( PlanNodeId id, std::vector> groupingSets, @@ -803,6 +804,25 @@ class GroupIdNode : public PlanNode { std::string groupIdName, PlanNodePtr source); + /// @param id Plan node ID. + /// @param groupingSets A list of grouping key sets. Grouping keys within the + /// set must be unique, but grouping keys across sets may repeat. + /// Note: groupingSets are specified using output column names. + /// @param groupingKeyInfos The names and order of the grouping keys in the + /// output. + /// @param aggregationInputs Columns that contain inputs to the aggregate + /// functions. + /// @param groupIdName Name of the column that will contain the grouping set + /// ID (a zero based integer). + /// @param source Input plan node. + GroupIdNode( + PlanNodeId id, + std::vector> groupingSets, + std::vector groupingKeyInfos, + std::vector aggregationInputs, + std::string groupIdName, + PlanNodePtr source); + const RowTypePtr& outputType() const override { return outputType_; } @@ -811,8 +831,7 @@ class GroupIdNode : public PlanNode { return sources_; } - const std::vector>& groupingSets() - const { + const std::vector>& groupingSets() const { return groupingSets_; } @@ -845,7 +864,12 @@ class GroupIdNode : public PlanNode { const std::vector sources_; const RowTypePtr outputType_; - const std::vector> groupingSets_; + + // Specifies groupingSets with output column names. + // This allows for the case when a single input column could map + // to multiple output columns which are used in separate grouping sets. + const std::vector> groupingSets_; + const std::vector groupingKeyInfos_; const std::vector aggregationInputs_; const std::string groupIdName_; diff --git a/velox/docs/develop/operators.rst b/velox/docs/develop/operators.rst index 12da781ef925..9f8e39fc2754 100644 --- a/velox/docs/develop/operators.rst +++ b/velox/docs/develop/operators.rst @@ -232,6 +232,7 @@ followed by the group ID column. The type of group ID column is BIGINT. - Description * - groupingSets - List of grouping key sets. Keys within each set must be unique, but keys can repeat across the sets. + - Grouping keys are specified with their output names. * - groupingKeyInfos - The names and order of the grouping key columns in the output. * - aggregationInputs @@ -239,6 +240,75 @@ followed by the group ID column. The type of group ID column is BIGINT. * - groupIdName - The name for the group-id column that identifies the grouping set. Zero-based integer corresponding to the position of the grouping set in the 'groupingSets' list. +GroupIdNode is typically used to compute GROUPING SETS, CUBE and ROLLUP. + +While usually GroupingSets do not repeat with the same grouping key column, there are some use-cases where +they might. To illustrate why GroupingSets might do so lets examine the following SQL query: + +.. code-block:: sql + + SELECT count(orderkey), count(DISTINCT orderkey) FROM orders; + +In this query the user wants to compute global aggregates using the same column, though with +and without the DISTINCT clause. With a particular optimization strategy +`optimize.mixed-distinct-aggregations `_, Presto uses GroupIdNode to compute these. + +First, the optimizer creates a GroupIdNode to duplicate every row assigning one copy +to group 0 and another to group 1. This is achieved using the GroupIdNode with 2 grouping sets +each using orderkey as a grouping key. In order to disambiguate the +groups the orderkey column is aliased as a grouping key for one of the +grouping sets. + +Lets say the orders table has 5 rows: + +.. code-block:: + + orderkey + 1 + 2 + 2 + 3 + 4 + +The GroupIdNode would transform this into: + +.. code-block:: + + orderkey orderkey1 group_id + 1 null 0 + 2 null 0 + 2 null 0 + 3 null 0 + 4 null 0 + null 1 1 + null 2 1 + null 2 1 + null 3 1 + null 4 1 + +Then Presto plans an aggregation using (orderkey, group_id) and count(orderkey1). + +This results in the following 5 rows: + +.. code-block:: + + orderkey group_id count(orderkey1) as c + 1 0 null + 2 0 null + 3 0 null + 4 0 null + null 1 5 + +Then Presto plans a second aggregation with no keys and count(orderkey), arbitrary(c). +Since both aggregations ignore nulls this correctly computes the number of +distinct orderkeys and the count of all orderkeys. + +.. code-block:: + + count(orderkey) arbitrary(c) + 4 5 + + HashJoinNode and MergeJoinNode ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/velox/exec/GroupId.cpp b/velox/exec/GroupId.cpp index ade2f8a36ff6..f64c4e51fc34 100644 --- a/velox/exec/GroupId.cpp +++ b/velox/exec/GroupId.cpp @@ -29,24 +29,27 @@ GroupId::GroupId( "GroupId") { const auto& inputType = groupIdNode->sources()[0]->outputType(); - std::unordered_map - inputToOutputGroupingKeyMapping; + std::unordered_map + outputToInputGroupingKeyMapping; for (const auto& groupingKeyInfo : groupIdNode->groupingKeyInfos()) { - inputToOutputGroupingKeyMapping[groupingKeyInfo.input->name()] = - outputType_->getChildIdx(groupingKeyInfo.output); + outputToInputGroupingKeyMapping[outputType_->getChildIdx( + groupingKeyInfo.output)] = + inputType->getChildIdx(groupingKeyInfo.input->name()); } - auto numGroupingSets = groupIdNode->groupingSets().size(); - groupingKeyMappings_.reserve(numGroupingSets); - auto numGroupingKeys = groupIdNode->numGroupingKeys(); + groupingKeyMappings_.reserve(groupIdNode->groupingSets().size()); for (const auto& groupingSet : groupIdNode->groupingSets()) { std::vector mappings(numGroupingKeys, kMissingGroupingKey); for (const auto& groupingKey : groupingSet) { - auto outputChannel = - inputToOutputGroupingKeyMapping.at(groupingKey->name()); - auto inputChannel = inputType->getChildIdx(groupingKey->name()); + auto outputChannel = outputType_->getChildIdx(groupingKey); + VELOX_USER_CHECK_NE( + outputToInputGroupingKeyMapping.count(outputChannel), + 0, + "GroupIdNode didn't map grouping key {} to input channel", + groupingKey); + auto inputChannel = outputToInputGroupingKeyMapping.at(outputChannel); mappings[outputChannel] = inputChannel; } diff --git a/velox/exec/tests/AggregationTest.cpp b/velox/exec/tests/AggregationTest.cpp index b5712ea9f2a4..6e576bca58d8 100644 --- a/velox/exec/tests/AggregationTest.cpp +++ b/velox/exec/tests/AggregationTest.cpp @@ -1459,7 +1459,7 @@ TEST_F(AggregationTest, groupingSets) { auto plan = PlanBuilder() .values({data}) - .groupId({{"k1"}, {"k2"}}, {"a", "b"}) + .groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"}) .singleAggregation( {"k1", "k2", "group_id"}, {"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"}) @@ -1474,7 +1474,7 @@ TEST_F(AggregationTest, groupingSets) { // group_id column. plan = PlanBuilder() .values({data}) - .groupId({{"k1"}, {"k2"}}, {"a", "b"}) + .groupId({"k1", "k2"}, {{"k1"}, {"k2"}}, {"a", "b"}) .project( {"k1", "k2", @@ -1497,14 +1497,15 @@ TEST_F(AggregationTest, groupingSets) { "SELECT null, k2, count(1), null, max(b) FROM tmp GROUP BY k2"); // Cube. - plan = PlanBuilder() - .values({data}) - .groupId({{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"}) - .singleAggregation( - {"k1", "k2", "group_id"}, - {"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"}) - .project({"k1", "k2", "count_1", "sum_a", "max_b"}) - .planNode(); + plan = + PlanBuilder() + .values({data}) + .groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {"k2"}, {}}, {"a", "b"}) + .singleAggregation( + {"k1", "k2", "group_id"}, + {"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"}) + .project({"k1", "k2", "count_1", "sum_a", "max_b"}) + .planNode(); assertQuery( plan, @@ -1513,7 +1514,7 @@ TEST_F(AggregationTest, groupingSets) { // Rollup. plan = PlanBuilder() .values({data}) - .groupId({{"k1", "k2"}, {"k1"}, {}}, {"a", "b"}) + .groupId({"k1", "k2"}, {{"k1", "k2"}, {"k1"}, {}}, {"a", "b"}) .singleAggregation( {"k1", "k2", "group_id"}, {"count(1) as count_1", "sum(a) as sum_a", "max(b) as max_b"}) @@ -1544,7 +1545,7 @@ TEST_F(AggregationTest, groupingSetsOutput) { auto reversedOrderPlan = PlanBuilder() .values({data}) - .groupId({{"k2", "k1"}, {}}, {"a", "b"}) + .groupId({"k2", "k1"}, {{"k2", "k1"}, {}}, {"a", "b"}) .capturePlanNode(reversedOrderGroupIdNode) .singleAggregation( {"k2", "k1", "group_id"}, @@ -1555,7 +1556,7 @@ TEST_F(AggregationTest, groupingSetsOutput) { auto orderPlan = PlanBuilder() .values({data}) - .groupId({{"k1", "k2"}, {}}, {"a", "b"}) + .groupId({"k1", "k2"}, {{"k1", "k2"}, {}}, {"a", "b"}) .capturePlanNode(orderGroupIdNode) .singleAggregation( {"k1", "k2", "group_id"}, @@ -1584,6 +1585,32 @@ TEST_F(AggregationTest, groupingSetsOutput) { assertEqualResults(orderResult.second, reversedOrderResult.second); } +TEST_F(AggregationTest, groupingSetsSameKey) { + auto data = makeRowVector( + {"o_key", "o_status"}, + {makeFlatVector({0, 1, 2, 3, 4}), + makeFlatVector({"", "x", "xx", "xxx", "xxxx"})}); + + createDuckDbTable({data}); + + auto plan = PlanBuilder() + .values({data}) + .groupId( + {"o_key", "o_key as o_key_1"}, + {{"o_key", "o_key_1"}, {"o_key"}, {"o_key_1"}, {}}, + {"o_status"}) + .singleAggregation( + {"o_key", "o_key_1", "group_id"}, + {"max(o_status) as max_o_status"}) + .project({"o_key", "o_key_1", "max_o_status"}) + .planNode(); + + assertQuery( + plan, + "SELECT o_key, o_key_1, max(o_status) as max_o_status FROM (" + "select o_key, o_key as o_key_1, o_status FROM tmp) GROUP BY GROUPING SETS ((o_key, o_key_1), (o_key), (o_key_1), ())"); +} + TEST_F(AggregationTest, outputBatchSizeCheckWithSpill) { const int numVectors = 5; const int vectorSize = 20; diff --git a/velox/exec/tests/PlanNodeSerdeTest.cpp b/velox/exec/tests/PlanNodeSerdeTest.cpp index 553d8c19a688..8ccd9877ad32 100644 --- a/velox/exec/tests/PlanNodeSerdeTest.cpp +++ b/velox/exec/tests/PlanNodeSerdeTest.cpp @@ -164,9 +164,15 @@ TEST_F(PlanNodeSerdeTest, filter) { TEST_F(PlanNodeSerdeTest, groupId) { auto plan = PlanBuilder() .values({data_}) - .groupId({{"c0"}, {"c0", "c1"}}, {"c2"}) + .groupId({"c0", "c1"}, {{"c0"}, {"c0", "c1"}}, {"c2"}) .planNode(); testSerde(plan); + + plan = PlanBuilder() + .values({data_}) + .groupId({"c0", "c0 as c1"}, {{"c0"}, {"c0", "c1"}}, {"c2"}) + .planNode(); + testSerde(plan); } TEST_F(PlanNodeSerdeTest, localPartition) { diff --git a/velox/exec/tests/PlanNodeToStringTest.cpp b/velox/exec/tests/PlanNodeToStringTest.cpp index 8d85fceae320..737f8697d6a8 100644 --- a/velox/exec/tests/PlanNodeToStringTest.cpp +++ b/velox/exec/tests/PlanNodeToStringTest.cpp @@ -254,7 +254,7 @@ TEST_F(PlanNodeToStringTest, aggregation) { TEST_F(PlanNodeToStringTest, groupId) { auto plan = PlanBuilder() .values({data_}) - .groupId({{"c0"}, {"c1"}}, {"c2"}) + .groupId({"c0", "c1"}, {{"c0"}, {"c1"}}, {"c2"}) .planNode(); ASSERT_EQ("-- GroupId\n", plan->toString()); ASSERT_EQ( @@ -263,12 +263,21 @@ TEST_F(PlanNodeToStringTest, groupId) { plan = PlanBuilder() .values({data_}) - .groupId({{"c0", "c1"}, {"c1"}}, {"c2"}, "gid") + .groupId({"c0", "c1"}, {{"c0", "c1"}, {"c1"}}, {"c2"}, "gid") .planNode(); ASSERT_EQ("-- GroupId\n", plan->toString()); ASSERT_EQ( "-- GroupId[[c0, c1], [c1]] -> c0:SMALLINT, c1:INTEGER, c2:BIGINT, gid:BIGINT\n", plan->toString(true, false)); + + plan = PlanBuilder() + .values({data_}) + .groupId({"c0", "c0 as c1"}, {{"c0", "c1"}, {"c1"}}, {"c2"}, "gid") + .planNode(); + ASSERT_EQ("-- GroupId\n", plan->toString()); + ASSERT_EQ( + "-- GroupId[[c0, c1], [c1]] -> c0:SMALLINT, c1:SMALLINT, c2:BIGINT, gid:BIGINT\n", + plan->toString(true, false)); } TEST_F(PlanNodeToStringTest, hashJoin) { diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index cbd3360bc411..e4f3330b26b5 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -727,33 +727,39 @@ PlanBuilder& PlanBuilder::streamingAggregation( } PlanBuilder& PlanBuilder::groupId( + const std::vector& groupingKeys, const std::vector>& groupingSets, const std::vector& aggregationInputs, std::string groupIdName) { - std::vector> groupingSetExprs; - groupingSetExprs.reserve(groupingSets.size()); - for (const auto& groupingSet : groupingSets) { - groupingSetExprs.push_back(fields(groupingSet)); - } - std::vector groupingKeyInfos; - std::set names; - auto index = 0; - for (const auto& groupingSet : groupingSetExprs) { - for (const auto& groupingKey : groupingSet) { - if (names.find(groupingKey->name()) == names.end()) { - core::GroupIdNode::GroupingKeyInfo keyInfos; - keyInfos.output = groupingKey->name(); - keyInfos.input = groupingKey; - groupingKeyInfos.push_back(keyInfos); - } - names.insert(groupingKey->name()); - } + groupingKeyInfos.reserve(groupingKeys.size()); + for (const auto& groupingKey : groupingKeys) { + auto untypedExpr = parse::parseExpr(groupingKey, options_); + const auto* fieldAccessExpr = + dynamic_cast(untypedExpr.get()); + VELOX_USER_CHECK( + fieldAccessExpr, + "Grouping key {} is not valid projection", + groupingKey); + std::string inputField = fieldAccessExpr->getFieldName(); + std::string outputField = untypedExpr->alias().has_value() + ? + // This is a projection with a column alias with the format + // "input_col as output_col". + untypedExpr->alias().value() + : + // This is a projection without a column alias. + fieldAccessExpr->getFieldName(); + + core::GroupIdNode::GroupingKeyInfo keyInfos; + keyInfos.output = outputField; + keyInfos.input = field(inputField); + groupingKeyInfos.push_back(keyInfos); } planNode_ = std::make_shared( nextPlanNodeId(), - groupingSetExprs, + groupingSets, std::move(groupingKeyInfos), fields(aggregationInputs), std::move(groupIdName), diff --git a/velox/exec/tests/utils/PlanBuilder.h b/velox/exec/tests/utils/PlanBuilder.h index ce5546adcf0d..c8badb4615a8 100644 --- a/velox/exec/tests/utils/PlanBuilder.h +++ b/velox/exec/tests/utils/PlanBuilder.h @@ -453,10 +453,18 @@ class PlanBuilder { bool ignoreNullKeys, const std::vector& resultTypes = {}); - /// Add a GroupIdNode using the specified grouping sets, aggregation inputs - /// and a groupId column name. And create GroupIdNode plan node with grouping - /// keys appearing in the output in the order they appear in 'groupingSets'. + /// Add a GroupIdNode using the specified grouping keys, grouping sets, + /// aggregation inputs and a groupId column name. + /// The grouping keys can specify aliases if an input column is mapped + /// to an output column with a different name. + /// e.g. Grouping keys {"k1", "k1 as k2"} means there are 2 grouping keys: + /// the input column k1 and output column k2 which is an alias of column k1. + /// Grouping sets using above grouping keys use the output column aliases. + /// e.g. Grouping sets in the above case could be {{"k1"}, {"k2"}, {}} + /// The GroupIdNode output columns have grouping keys in the order specified + /// in groupingKeys variable. PlanBuilder& groupId( + const std::vector& groupingKeys, const std::vector>& groupingSets, const std::vector& aggregationInputs, std::string groupIdName = "group_id");