Skip to content

Commit

Permalink
cache query plan for AcquireReleaseColumnsSegmentOperator to work wit…
Browse files Browse the repository at this point in the history
…h V2 engine
  • Loading branch information
klsince committed Dec 12, 2024
1 parent 90b437f commit e5aa8c3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment inde
}

public void materializeChildOperator() {
_childOperator = (Operator<BaseResultsBlock>) _planNode.run();
// V2 query engine can call getNextBlock() methods repetitively to stream result blocks between query stages, but
// the query plan should be created just once, so cache the child operator. And no need to synchronize here as the
// operator object is used by a single thread.
if (_childOperator == null) {
_childOperator = (Operator<BaseResultsBlock>) _planNode.run();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,12 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E
long queryEndTimeMs = timerContext.getQueryArrivalTimeMs() + queryTimeoutMs;
queryContext.setEndTimeMs(queryEndTimeMs);

queryContext.setEnablePrefetch(_enablePrefetch);
String flag = queryContext.getQueryOptions().get(ENABLE_PREFETCH);
if (flag != null) {
queryContext.setEnablePrefetch(Boolean.parseBoolean(flag));
} else {
queryContext.setEnablePrefetch(_enablePrefetch);
}

// Query scheduler wait time already exceeds query timeout, directly return
long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,18 @@ public void testHardcodedQueries()
super.testHardcodedQueries();
}

@Test
public void testSingleValueQuery()
@DataProvider(name = "enablePrefetch")
public Object[][] enablePrefetch() {
return new Object[][]{
{false}, {true}
};
}

@Test(dataProvider = "enablePrefetch")
public void testSingleValueQuery(boolean enablePrefetch)
throws Exception {
String query = "select sum(ActualElapsedTime) from mytable WHERE ActualElapsedTime > "
String query = "set \"enable.prefetch\"=" + enablePrefetch
+ "; select sum(ActualElapsedTime) from mytable WHERE ActualElapsedTime > "
+ "(select avg(ActualElapsedTime) as avg_profit from mytable)";
JsonNode jsonNode = postQuery(query);
long joinResult = jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
Expand All @@ -166,7 +174,7 @@ public void testSingleValueQuery()
query = "select sum(ActualElapsedTime) as profit from mytable WHERE ActualElapsedTime > -1412.435033969449";
jsonNode = postQuery(query);
long expectedResult = jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
assertEquals(joinResult, expectedResult);
assertEquals(joinResult, expectedResult, "With enablePrefetch: " + enablePrefetch);
}

@Test
Expand Down

0 comments on commit e5aa8c3

Please sign in to comment.