Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configure final reduce phase threads for heavy aggreagtion functions #14662

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
return uncheckedParseInt(QueryOptionKey.GROUP_TRIM_THRESHOLD, groupByTrimThreshold);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to show that final reduce is parallelized in explain output ?


@Nullable
public static Integer getNumThreadsExtractFinalResult(Map<String, String> queryOptions) {
String numThreadsExtractFinalResultString = queryOptions.get(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT);
return checkedParseInt(QueryOptionKey.NUM_THREADS_EXTRACT_FINAL_RESULT, numThreadsExtractFinalResultString, 1);
}

@Nullable
public static Integer getChunkSizeExtractFinalResult(Map<String, String> queryOptions) {
String chunkSizeExtractFinalResultString =
queryOptions.get(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT);
return checkedParseInt(QueryOptionKey.CHUNK_SIZE_EXTRACT_FINAL_RESULT, chunkSizeExtractFinalResultString, 1);
}

public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pinot.common.utils.DataSchema;
Expand All @@ -33,9 +34,9 @@ public class ConcurrentIndexedTable extends IndexedTable {
private final ReentrantReadWriteLock _readWriteLock = new ReentrantReadWriteLock();

public ConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, int initialCapacity) {
int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold,
new ConcurrentHashMap<>(initialCapacity));
new ConcurrentHashMap<>(initialCapacity), executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,32 @@
package org.apache.pinot.core.data.table;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.util.QueryMultiThreadingUtils;
import org.apache.pinot.core.util.trace.TraceCallable;


/**
* Base implementation of Map-based Table for indexed lookup
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public abstract class IndexedTable extends BaseTable {
private final ExecutorService _executorService;
protected final Map<Key, Record> _lookupMap;
protected final boolean _hasFinalInput;
protected final int _resultSize;
Expand All @@ -46,6 +54,8 @@ public abstract class IndexedTable extends BaseTable {
protected final TableResizer _tableResizer;
protected final int _trimSize;
protected final int _trimThreshold;
protected final int _numThreadsExtractFinalResult;
protected final int _chunkSizeExtractFinalResult;

protected Collection<Record> _topRecords;
private int _numResizes;
Expand All @@ -63,13 +73,14 @@ public abstract class IndexedTable extends BaseTable {
* @param lookupMap Map from keys to records
*/
protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, Map<Key, Record> lookupMap) {
int trimSize, int trimThreshold, Map<Key, Record> lookupMap, ExecutorService executorService) {
super(dataSchema);

Preconditions.checkArgument(resultSize >= 0, "Result size can't be negative");
Preconditions.checkArgument(trimSize >= 0, "Trim size can't be negative");
Preconditions.checkArgument(trimThreshold >= 0, "Trim threshold can't be negative");

_executorService = executorService;
_lookupMap = lookupMap;
_hasFinalInput = hasFinalInput;
_resultSize = resultSize;
Expand All @@ -84,6 +95,10 @@ protected IndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContex
assert _hasOrderBy || (trimSize == Integer.MAX_VALUE && trimThreshold == Integer.MAX_VALUE);
_trimSize = trimSize;
_trimThreshold = trimThreshold;
// NOTE: The upper limit of threads number for final reduce is set to 2 * number of available processors by default
_numThreadsExtractFinalResult = Math.min(queryContext.getNumThreadsExtractFinalResult(),
Math.max(1, ResourceManager.DEFAULT_QUERY_RUNNER_THREADS));
_chunkSizeExtractFinalResult = queryContext.getChunkSizeExtractFinalResult();
}

@Override
Expand Down Expand Up @@ -157,14 +172,88 @@ public void finish(boolean sort, boolean storeFinalResult) {
for (int i = 0; i < numAggregationFunctions; i++) {
columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
}
for (Record record : _topRecords) {
Object[] values = record.getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
Copy link
Collaborator

@bziobrowski bziobrowski Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd make sense to either :

  • put an upper limit on _numThreadsForFinalReduce (e.g. 2 or 3* Runtime.getRuntime().availableProcessors()) or
  • change the variable to a boolean flag enableParallelFinalReduce and use a sensible number of task
    to prevent using excessive number of futures or various error modes, e.g.
    if _numThreadsForFinalReduce is Integer.MAX_VALUE then chunkSize is going to be negative.

If shared thread pool is overwhelmed by running tasks it might be good to use current thread not only to wait but also task processing, stealing tasks until there's nothing left and only then waiting for futures to finish.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shared thread pool is overwhelmed by running tasks it might be good to use current thread not only to wait but also task processing, stealing tasks until there's nothing left and only then waiting for futures to finish.

Potentially, and this can be done transparently by configuring the executor's rejected execution handler to CallerRunsPolicy. However, beware if the executor, which does non-blocking work, is sized to the number of available processors, then if the thread pool is overwhelmed, it means the available CPUs are overwhelmed too. Performing reductions on the caller thread would only lead to excessive context switching and it might be better, from a global perspective, for the task to wait for capacity to be available.

int numThreadsExtractFinalResult = inferNumThreadsExtractFinalResult();
// Submit task when the EXECUTOR_SERVICE is not overloaded
if (numThreadsExtractFinalResult > 1) {
// Multi-threaded final reduce
List<Future<Void>> futures = new ArrayList<>(numThreadsExtractFinalResult);
try {
List<Record> topRecordsList = new ArrayList<>(_topRecords);
int chunkSize = (topRecordsList.size() + numThreadsExtractFinalResult - 1) / numThreadsExtractFinalResult;
for (int threadId = 0; threadId < numThreadsExtractFinalResult; threadId++) {
int startIdx = threadId * chunkSize;
int endIdx = Math.min(startIdx + chunkSize, topRecordsList.size());
if (startIdx < endIdx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not always the case in the test with very small segment.

// Submit a task for processing a chunk of values
futures.add(_executorService.submit(new TraceCallable<Void>() {
@Override
public Void callJob() {
for (int recordIdx = startIdx; recordIdx < endIdx; recordIdx++) {
Object[] values = topRecordsList.get(recordIdx).getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
}
}
return null;
}
}));
}
}
// Wait for all tasks to complete
for (Future<Void> future : futures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
// Cancel all running tasks
for (Future<Void> future : futures) {
future.cancel(true);
}
throw new RuntimeException("Error during multi-threaded final reduce", e);
}
} else {
for (Record record : _topRecords) {
Object[] values = record.getValues();
for (int i = 0; i < numAggregationFunctions; i++) {
int colId = i + _numKeyColumns;
values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
}
}
}
}
}

private int inferNumThreadsExtractFinalResult() {
if (_numThreadsExtractFinalResult > 1) {
return _numThreadsExtractFinalResult;
}
if (containsExpensiveAggregationFunctions()) {
int parallelChunkSize = _chunkSizeExtractFinalResult;
if (_topRecords != null && _topRecords.size() > parallelChunkSize) {
int estimatedThreads = (int) Math.ceil((double) _topRecords.size() / parallelChunkSize);
if (estimatedThreads == 0) {
return 1;
}
return Math.min(estimatedThreads, QueryMultiThreadingUtils.MAX_NUM_THREADS_PER_QUERY);
}
}
// Default to 1 thread
return 1;
}

private boolean containsExpensiveAggregationFunctions() {
for (AggregationFunction aggregationFunction : _aggregationFunctions) {
switch (aggregationFunction.getType()) {
case FUNNELCOMPLETECOUNT:
case FUNNELCOUNT:
case FUNNELMATCHSTEP:
case FUNNELMAXSTEP:
return true;
default:
break;
}
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.data.table;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;
Expand All @@ -31,8 +32,9 @@
public class SimpleIndexedTable extends IndexedTable {

public SimpleIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext, int resultSize,
int trimSize, int trimThreshold, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity));
int trimSize, int trimThreshold, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, trimSize, trimThreshold, new HashMap<>(initialCapacity),
executorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.data.table;

import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.request.context.QueryContext;

Expand All @@ -36,8 +37,9 @@
public class UnboundedConcurrentIndexedTable extends ConcurrentIndexedTable {

public UnboundedConcurrentIndexedTable(DataSchema dataSchema, boolean hasFinalInput, QueryContext queryContext,
int resultSize, int initialCapacity) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity);
int resultSize, int initialCapacity, ExecutorService executorService) {
super(dataSchema, hasFinalInput, queryContext, resultSize, Integer.MAX_VALUE, Integer.MAX_VALUE, initialCapacity,
executorService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ protected void processSegments() {
if (_indexedTable == null) {
synchronized (this) {
if (_indexedTable == null) {
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks);
_indexedTable = GroupByUtils.createIndexedTableForCombineOperator(resultsBlock, _queryContext, _numTasks,
_executorService);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
// set as pinot.server.query.executor.groupby.trim.threshold
public static final String GROUPBY_TRIM_THRESHOLD_KEY = "groupby.trim.threshold";
public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000;
public static final int DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT = 1;
public static final int DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT = 10_000;

private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class);

Expand Down Expand Up @@ -268,6 +270,21 @@ private void applyQueryOptions(QueryContext queryContext) {
} else {
queryContext.setGroupTrimThreshold(_groupByTrimThreshold);
}
// Set numThreadsExtractFinalResult
Integer numThreadsExtractFinalResult = QueryOptionsUtils.getNumThreadsExtractFinalResult(queryOptions);
if (numThreadsExtractFinalResult != null) {
queryContext.setNumThreadsExtractFinalResult(numThreadsExtractFinalResult);
} else {
queryContext.setNumThreadsExtractFinalResult(DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT);
}
// Set chunkSizeExtractFinalResult
Integer chunkSizeExtractFinalResult =
QueryOptionsUtils.getChunkSizeExtractFinalResult(queryOptions);
if (chunkSizeExtractFinalResult != null) {
queryContext.setChunkSizeExtractFinalResult(chunkSizeExtractFinalResult);
} else {
queryContext.setChunkSizeExtractFinalResult(DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable
// Create an indexed table to perform the reduce.
IndexedTable indexedTable =
GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0), _queryContext, reducerContext,
numReduceThreadsToUse);
numReduceThreadsToUse, reducerContext.getExecutorService());

// Create groups of data tables that each thread can process concurrently.
// Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public class QueryContext {
private int _minServerGroupTrimSize = InstancePlanMakerImplV2.DEFAULT_MIN_SERVER_GROUP_TRIM_SIZE;
// Trim threshold to use for server combine for SQL GROUP BY
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
// Number of threads to use for final reduce
private int _numThreadsExtractFinalResult = InstancePlanMakerImplV2.DEFAULT_NUM_THREADS_EXTRACT_FINAL_RESULT;
// Parallel chunk size for final reduce
private int _chunkSizeExtractFinalResult =
InstancePlanMakerImplV2.DEFAULT_CHUNK_SIZE_EXTRACT_FINAL_RESULT;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
// Whether server returns the final result
Expand Down Expand Up @@ -411,6 +416,22 @@ public void setGroupTrimThreshold(int groupTrimThreshold) {
_groupTrimThreshold = groupTrimThreshold;
}

public int getNumThreadsExtractFinalResult() {
return _numThreadsExtractFinalResult;
}

public void setNumThreadsExtractFinalResult(int numThreadsExtractFinalResult) {
_numThreadsExtractFinalResult = numThreadsExtractFinalResult;
}

public int getChunkSizeExtractFinalResult() {
return _chunkSizeExtractFinalResult;
}

public void setChunkSizeExtractFinalResult(int chunkSizeExtractFinalResult) {
_chunkSizeExtractFinalResult = chunkSizeExtractFinalResult;
}

public boolean isNullHandlingEnabled() {
return _nullHandlingEnabled;
}
Expand Down
Loading
Loading