From e5aa8c3287f75570c4dc485724ed6d0e7230f85b Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Thu, 12 Dec 2024 11:45:26 -0800 Subject: [PATCH] cache query plan for AcquireReleaseColumnsSegmentOperator to work with V2 engine --- .../AcquireReleaseColumnsSegmentOperator.java | 7 ++++++- .../executor/ServerQueryExecutorV1Impl.java | 7 ++++++- .../tests/MultiStageEngineIntegrationTest.java | 16 ++++++++++++---- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java index ad6da28fdc80..a9cbb654a82c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java @@ -57,7 +57,12 @@ public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment inde } public void materializeChildOperator() { - _childOperator = (Operator) _planNode.run(); + // V2 query engine can call getNextBlock() methods repetitively to stream result blocks between query stages, but + // the query plan should be created just once, so cache the child operator. And no need to synchronize here as the + // operator object is used by a single thread. + if (_childOperator == null) { + _childOperator = (Operator) _planNode.run(); + } } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 16cde432c170..9f9f3057ec07 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -180,7 +180,12 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E long queryEndTimeMs = timerContext.getQueryArrivalTimeMs() + queryTimeoutMs; queryContext.setEndTimeMs(queryEndTimeMs); - queryContext.setEnablePrefetch(_enablePrefetch); + String flag = queryContext.getQueryOptions().get(ENABLE_PREFETCH); + if (flag != null) { + queryContext.setEnablePrefetch(Boolean.parseBoolean(flag)); + } else { + queryContext.setEnablePrefetch(_enablePrefetch); + } // Query scheduler wait time already exceeds query timeout, directly return long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java index 40ae6920e238..145119261d40 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java @@ -154,10 +154,18 @@ public void testHardcodedQueries() super.testHardcodedQueries(); } - @Test - public void testSingleValueQuery() + @DataProvider(name = "enablePrefetch") + public Object[][] enablePrefetch() { + return new Object[][]{ + {false}, {true} + }; + } + + @Test(dataProvider = "enablePrefetch") + public void testSingleValueQuery(boolean enablePrefetch) throws Exception { - String query = "select sum(ActualElapsedTime) from mytable WHERE ActualElapsedTime > " + String query = "set \"enable.prefetch\"=" + enablePrefetch + + "; select sum(ActualElapsedTime) from mytable WHERE ActualElapsedTime > " + "(select avg(ActualElapsedTime) as avg_profit from mytable)"; JsonNode jsonNode = postQuery(query); long joinResult = jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(); @@ -166,7 +174,7 @@ public void testSingleValueQuery() query = "select sum(ActualElapsedTime) as profit from mytable WHERE ActualElapsedTime > -1412.435033969449"; jsonNode = postQuery(query); long expectedResult = jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(); - assertEquals(joinResult, expectedResult); + assertEquals(joinResult, expectedResult, "With enablePrefetch: " + enablePrefetch); } @Test