diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index ed1d21362960..e94938a6a284 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -224,10 +224,15 @@ private static InstanceRequest compileInstanceRequest(OpChainExecutionContext ex * Helper method to update query options. */ private static void updateQueryOptions(PinotQuery pinotQuery, OpChainExecutionContext executionContext) { - Map queryOptions = new HashMap<>(executionContext.getOpChainMetadata()); + Map queryOptions = pinotQuery.getQueryOptions(); + if (queryOptions != null) { + queryOptions.putAll(executionContext.getOpChainMetadata()); + } else { + queryOptions = new HashMap<>(executionContext.getOpChainMetadata()); + pinotQuery.setQueryOptions(queryOptions); + } queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, Long.toString(executionContext.getDeadlineMs() - System.currentTimeMillis())); - pinotQuery.setQueryOptions(queryOptions); } /** diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json index 81a939c2e142..22e464f28b77 100644 --- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json +++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json @@ -53,25 +53,33 @@ "queries": [ { "description": "Wrong partition key", - "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='name', partition_size='4') */ GROUP BY {tbl1}.num", + "sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='name', partition_size='4') */ GROUP BY {tbl1}.num", "expectedException": "Error composing query plan for.*" }, { "description": "Wrong partition size", - "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ GROUP BY {tbl1}.num", + "sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ GROUP BY {tbl1}.num", "expectedException": "Error composing query plan for.*" }, { "description": "Group by partition column", - "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num" + "sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num" }, { "description": "Group by partition column with partition parallelism", - "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num" + "sql": "SELECT {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num" + }, + { + "description": "Group by partition column with GROUP BY hint", + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} GROUP BY {tbl1}.num" + }, + { + "description": "Group by partition column with partition parallelism and GROUP BY hint", + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4', partition_parallelism='2') */ GROUP BY {tbl1}.num" }, { "description": "Skip leaf stage aggregation with GROUP BY hint", - "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num) FROM {tbl1} WHERE {tbl1}.num >= 0 GROUP BY {tbl1}.name" + "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_group_by='true') */ {tbl1}.name, COUNT(*), SUM({tbl1}.num), MIN({tbl1}.num), COUNT(DISTINCT {tbl1}.num) FROM {tbl1} WHERE {tbl1}.num >= 0 GROUP BY {tbl1}.name" }, { "description": "Colocated JOIN with partition column", @@ -87,7 +95,7 @@ }, { "description": "Colocated JOIN with partition column and group by partition column", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num), COUNT(DISTINCT {tbl2}.num) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name" }, { "description": "Colocated JOIN with partition column and group by non-partitioned column", @@ -103,7 +111,7 @@ }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column", - "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name" + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name" }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column", @@ -180,7 +188,7 @@ }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column", - "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name" + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(DISTINCT {tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='2') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name" }, { "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column", @@ -259,7 +267,7 @@ "queries": [ { "description": "Inner join with group by", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.name, AVG({tbl2}.data) FROM {tbl1} JOIN {tbl2} ON {tbl1}.name = {tbl2}.id WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' AND {tbl2}.data < 0 GROUP BY {tbl1}.name" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.name, AVG({tbl2}.data), COUNT(DISTINCT {tbl2}.data) FROM {tbl1} JOIN {tbl2} ON {tbl1}.name = {tbl2}.id WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' AND {tbl2}.data < 0 GROUP BY {tbl1}.name" }, { "description": "semi-join with dynamic_broadcast join strategy", @@ -275,7 +283,7 @@ }, { "description": "semi-join with dynamic_broadcast join strategy then group-by on same key", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} WHERE {tbl1}.name IN (SELECT id FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" }, { "description": "semi-join with dynamic_broadcast join strategy then group-by on different key", @@ -291,7 +299,7 @@ }, { "description": "aggregate with skip intermediate stage hint (via hint option is_partitioned_by_group_by_keys)", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT(*), SUM({tbl1}.val), SUM({tbl1}.num), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} WHERE {tbl1}.val >= 0 AND {tbl1}.name != 'a' GROUP BY {tbl1}.num" }, { "description": "join with pre-partitioned left and right tables", @@ -385,31 +393,31 @@ }, { "description": "partition agg + semi-join with pre-partitioned main and side tables & agg hint", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" }, { "description": "partition agg + semi-join with pre-partitioned main and side table & agg hint and colocated hint", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" }, { "description": "partition agg + semi-join with single table partition & agg hint", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num" }, { "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5" }, { "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with having filter on top of semi join; colocated hint", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num HAVING COUNT(*) > 5" }, { "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC" }, { "description": "partition agg + semi-join on pre-partitioned main tables with group by on partitioned column with sorting on top of semi join colocated on partition key; colocated hint", - "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC" + "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true'), joinOptions(is_colocated_by_join_keys='true') */ {tbl1}.num, SUM({tbl1}.val), COUNT(DISTINCT {tbl1}.val) FROM {tbl1} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl1}.num IN (SELECT num FROM {tbl2} /*+ tableOptions(partition_function='hashcode', partition_key='num', partition_size='4') */ WHERE {tbl2}.data > 0) GROUP BY {tbl1}.num ORDER BY SUM({tbl1}.val) DESC" }, { "description": "semi-join on pre-partitioned main and side tables with sorting on top of semi join on join key",