Skip to content

Commit

Permalink
Fix the query option override for leaf stage (#14603)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Dec 5, 2024
1 parent 8dadfd2 commit a4f6df4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,15 @@ private static InstanceRequest compileInstanceRequest(OpChainExecutionContext ex
* Helper method to update query options.
*/
private static void updateQueryOptions(PinotQuery pinotQuery, OpChainExecutionContext executionContext) {
Map<String, String> queryOptions = new HashMap<>(executionContext.getOpChainMetadata());
Map<String, String> queryOptions = pinotQuery.getQueryOptions();
if (queryOptions != null) {
queryOptions.putAll(executionContext.getOpChainMetadata());
} else {
queryOptions = new HashMap<>(executionContext.getOpChainMetadata());
pinotQuery.setQueryOptions(queryOptions);
}
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
Long.toString(executionContext.getDeadlineMs() - System.currentTimeMillis()));
pinotQuery.setQueryOptions(queryOptions);
}

/**
Expand Down
44 changes: 26 additions & 18 deletions pinot-query-runtime/src/test/resources/queries/QueryHints.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,25 +53,33 @@
"queries": [
{
"description": "Wrong partition key",
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='name', partition_size='4') */ GROUP BY {tbl1}.num",
"sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='name', partition_size='4') */ GROUP BY {tbl1}.num",
"expectedException": "Error composing query plan for.*"
},
{
"description": "Wrong partition size",
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ GROUP BY {tbl1}.num",
"sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ GROUP BY {tbl1}.num",
"expectedException": "Error composing query plan for.*"
},
{
"description": "Group by partition column",
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
"sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
},
{
"description": "Group by partition column with partition parallelism",
"sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num"
"sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num"
},
{
"description": "Group by partition column with GROUP BY hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} GROUP BY {tbl1}.num"
},
{
"description": "Group by partition column with partition parallelism and GROUP BY hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num"
},
{
"description": "Skip leaf stage aggregation with GROUP BY hint",
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num) FROM {tbl1} WHERE {tbl1}.num >= 0 GROUP BY {tbl1}.name"
"sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num), COUNT(DISTINCT {tbl1}.num) FROM {tbl1} WHERE {tbl1}.num >= 0 GROUP BY {tbl1}.name"
},
{
"description": "Colocated JOIN with partition column",
Expand All @@ -87,7 +95,7 @@
},
{
"description": "Colocated JOIN with partition column and group by partition column",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num), COUNT(DISTINCT {tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
},
{
"description": "Colocated JOIN with partition column and group by non-partitioned column",
Expand All @@ -103,7 +111,7 @@
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column",
"sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
"sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column",
Expand Down Expand Up @@ -180,7 +188,7 @@
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column",
"sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
"sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
},
{
"description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column",
Expand Down Expand Up @@ -259,7 +267,7 @@
"queries": [
{
"description": "Inner join with group by",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.name, AVG({tbl2}.data) FROM {tbl1} JOIN {tbl2} ON {tbl1}.name = {tbl2}.id WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' AND {tbl2}.data < 0 GROUP BY {tbl1}.name"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.name, AVG({tbl2}.data), COUNT(DISTINCT {tbl2}.data) FROM {tbl1} JOIN {tbl2} ON {tbl1}.name = {tbl2}.id WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' AND {tbl2}.data < 0 GROUP BY {tbl1}.name"
},
{
"description": "semi-join with dynamic_broadcast join strategy",
Expand All @@ -275,7 +283,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on same key",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on different key",
Expand All @@ -291,7 +299,7 @@
},
{
"description": "aggregate with skip intermediate stage hint (via hint option is_partitioned_by_group_by_keys)",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num"
},
{
"description": "join with pre-partitioned left and right tables",
Expand Down Expand Up @@ -385,31 +393,31 @@
},
{
"description": "partition agg + semi-join with pre-partitioned main and side tables & agg hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
},
{
"description": "partition agg + semi-join with pre-partitioned main and side table & agg hint and colocated hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
},
{
"description": "partition agg + semi-join with single table partition & agg hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num"
},
{
"description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
},
{
"description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join; colocated hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5"
},
{
"description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
},
{
"description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key; colocated hint",
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
"sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC"
},
{
"description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join on join key",
Expand Down

0 comments on commit a4f6df4

Please sign in to comment.