Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure final reduce phase threads for heavy aggreagtion functions #14662

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

xiangfu0
Copy link
Contributor

@xiangfu0 xiangfu0 commented Dec 16, 2024

Add a new query option: numThreadsForFinalReduce to allow customize the number of threads per aggregate/reduce call.

This will significantly reduce the execution time of aggregation groupby, where there are many groups and each group final reduce is very costly like funnel functions.

@codecov-commenter
Copy link

codecov-commenter commented Dec 16, 2024

Codecov Report

Attention: Patch coverage is 41.02564% with 23 lines in your changes missing coverage. Please review.

Project coverage is 64.01%. Comparing base (59551e4) to head (1f6e6b6).
Report is 1472 commits behind head on master.

Files with missing lines Patch % Lines
...org/apache/pinot/core/data/table/IndexedTable.java 27.58% 20 Missing and 1 partial ⚠️
...pinot/core/plan/maker/InstancePlanMakerImplV2.java 50.00% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14662      +/-   ##
============================================
+ Coverage     61.75%   64.01%   +2.26%     
- Complexity      207     1602    +1395     
============================================
  Files          2436     2703     +267     
  Lines        133233   148954   +15721     
  Branches      20636    22824    +2188     
============================================
+ Hits          82274    95351   +13077     
- Misses        44911    46612    +1701     
- Partials       6048     6991     +943     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.97% <41.02%> (+2.27%) ⬆️
java-21 63.88% <41.02%> (+2.25%) ⬆️
skip-bytebuffers-false 63.99% <41.02%> (+2.25%) ⬆️
skip-bytebuffers-true 63.85% <41.02%> (+36.12%) ⬆️
temurin 64.01% <41.02%> (+2.26%) ⬆️
unittests 64.00% <41.02%> (+2.26%) ⬆️
unittests1 56.34% <41.02%> (+9.45%) ⬆️
unittests2 34.42% <2.56%> (+6.69%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@xiangfu0 xiangfu0 added enhancement Configuration Config changes (addition/deletion/change in behavior) query labels Dec 16, 2024
// Wait for all tasks to complete
for (Future<Void> future : futures) {
future.get();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth cancelling running threads if one of the throws exception instead of letting them run until completion ?

@@ -37,6 +42,8 @@
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be good to other pool constructor and:

  • grant pool threads reasonable names to make identifying them in thread stack dumps easier
  • make them deamon threads so they don't affect jvm shutdown (especially that pool is not closed anywhere)

Object[] values = record.getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
Copy link
Contributor

@bziobrowski bziobrowski Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd make sense to either :

  • put an upper limit on _numThreadsForFinalReduce (e.g. 2 or 3* Runtime.getRuntime().availableProcessors()) or
  • change the variable to a boolean flag enableParallelFinalReduce and use a sensible number of task
    to prevent using excessive number of futures or various error modes, e.g.
    if _numThreadsForFinalReduce is Integer.MAX_VALUE then chunkSize is going to be negative.

If shared thread pool is overwhelmed by running tasks it might be good to use current thread not only to wait but also task processing, stealing tasks until there's nothing left and only then waiting for futures to finish.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shared thread pool is overwhelmed by running tasks it might be good to use current thread not only to wait but also task processing, stealing tasks until there's nothing left and only then waiting for futures to finish.

Potentially, and this can be done transparently by configuring the executor's rejected execution handler to CallerRunsPolicy. However, beware if the executor, which does non-blocking work, is sized to the number of available processors, then if the thread pool is overwhelmed, it means the available CPUs are overwhelmed too. Performing reductions on the caller thread would only lead to excessive context switching and it might be better, from a global perspective, for the task to wait for capacity to be available.

@@ -232,6 +232,12 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, groupByTrimThreshold);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to show that final reduce is parallelized in explain output ?


if (startIdx < endIdx) {
// Submit a task for processing a chunk of values
futures.add(EXECUTOR_SERVICE.submit(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth making this a TraceCallable so the request id propagates into the reduction?

@kishoreg
Copy link
Member

can we do this automatically if the keys > X and for specific aggregation functions like funnel etc?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) enhancement query
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants