Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map BadQueryRequestException to QueryException.QUERY_VALIDATION_ERROR #14917

Merged
merged 19 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
aa2f562
Reclassify BadQueryRequestException as QUERY_VALIDATION_ERROR
real-mj-song Jan 23, 2025
0114c7a
Use BadQueryRequestException for column type error for sum/max/min ag…
real-mj-song Jan 24, 2025
89de614
Update testQueryExceptions to test correct QueryException code
real-mj-song Jan 24, 2025
1d8ac33
Import style cleanup
real-mj-song Jan 24, 2025
d1257f2
Update query exception type in OfflineClusterIntegrationTest
real-mj-song Jan 24, 2025
781733d
Throws BadQueryRequestException in reducer if QUERY_VALIDATION_ERROR_…
real-mj-song Jan 25, 2025
bf47aac
Throw BadQueryRequestException for IllegalArgumentException in BaseCo…
real-mj-song Jan 25, 2025
ec7cec1
Linter fix
real-mj-song Jan 25, 2025
e0e1b09
Patch all implementations of BaseCombineOperator
real-mj-song Jan 27, 2025
713c197
Increment metrics when QUERY_EXECUTION/VALIDATION_ERROR caught in Mul…
real-mj-song Jan 27, 2025
d51a37e
Increment BrokerMeter.QUERY_VALIDATION_EXCEPTIONS metric instead
real-mj-song Jan 27, 2025
beb88e0
Throw QUERY_VALIDATION_ERROR if its error code found when processing …
real-mj-song Jan 28, 2025
520bf69
Simplify error throw logic in BaseCombineOperator
real-mj-song Jan 29, 2025
c3397d8
Increment QUERY_VALIDATION_EXCEPTIONS broker metric in single stage b…
real-mj-song Jan 29, 2025
1bce9ff
Fix static import style
real-mj-song Jan 29, 2025
4f13fb0
Handle both IllegalArgumentException and BadQueryRequestException in …
real-mj-song Jan 29, 2025
d2e85cb
Refactor using QueryInfoException
real-mj-song Jan 29, 2025
b8a42e8
Missing license header for QueryInfoException
real-mj-song Jan 29, 2025
c3b0b48
Revert QueryRunnerTest
real-mj-song Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
Expand Down Expand Up @@ -845,6 +846,12 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
long totalTimeMs = System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis();
brokerResponse.setTimeUsedMs(totalTimeMs);
augmentStatistics(requestContext, brokerResponse);
// include both broker side exceptions and server side exceptions
List<QueryProcessingException> brokerExceptions = brokerResponse.getExceptions();
brokerExceptions.stream()
.filter(exception -> exception.getErrorCode() == QueryException.QUERY_VALIDATION_ERROR_CODE)
.findFirst()
.ifPresent(exception -> _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1));
if (QueryOptionsUtils.shouldDropResults(pinotQuery.getQueryOptions())) {
brokerResponse.setResultTable(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.exception.QueryInfoException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.response.broker.ResultTable;
Expand Down Expand Up @@ -269,11 +271,19 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
requestContext.setErrorCode(QueryException.EXECUTION_TIMEOUT_ERROR_CODE);
return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR);
} catch (Throwable t) {
ProcessingException queryException = QueryException.QUERY_EXECUTION_ERROR;
if (t instanceof QueryInfoException
&& ((QueryInfoException) t).getProcessingException().equals(QueryException.QUERY_VALIDATION_ERROR)) {
// provide more specific error code if available
queryException = QueryException.QUERY_VALIDATION_ERROR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update broker metrics - BrokerMeter.QUERY_VALIDATION_EXCEPTIONS ?

We should also update this in SingleStageBrokerRequestHandler?

Copy link
Contributor Author

@real-mj-song real-mj-song Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SingleStageBrokerRequestHandler, this is not needed because BadQueryRequestException thrown by ServerQueryExecutorV1Impl is stored in BrokerResponseNative object. It's directly parsed in PinotClientRequest::getPinotQueryResponse.

For this MultiStageBrokerRequestHandler, this was needed because query was handled by _queryDispatcher.submitAndReduce which may throw an exception. The Java exception needs to be caught on the broker request handler level.

Copy link
Contributor Author

@real-mj-song real-mj-song Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Diagrams for these two error reporting scenarios. Many middle steps are omitted and only relevant steps are shown 👇🏼

PinotClientRequest::processSqlQueryPost
  - BaseSingleStageBrokerRequestHandler::handleRequest
    - SingleConnectionBrokerRequestHandler::processBrokerRequest // broker metric is updated here
      - ServerQueryExecutorV1Impl::executeInternal // BadQueryRequestException is caught. InstanceResponseBlock is updated here
    - SingleConnectionBrokerRequestHandler::processBrokerRequest // No exception is caught here. Error code is embedded inside brokerResponse
PinotClientRequest::getPinotQueryResponse // 700 error code is parsed

MultiStage query

PinotClientRequest::processSqlQueryPost
  - MultiStageBrokerRequestHandler::handleRequest
    - QueryDispatcher::submitAndReduce // exception is thrown if ANY block errors. I re-throw BadQueryRequestException here
  - MultiStageBrokerRequestHandler::handleRequest // catch BadQueryRequestException. Exceptions in requestContext are added to brokerResponse via augmentStatistics
PinotClientRequest::getPinotQueryResponse // 700 error code is parsed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric incremented now. Resolved

_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
}

String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t);
LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage);
requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
requestContext.setErrorCode(queryException.getErrorCode());
return new BrokerResponseNative(
QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage));
QueryException.getException(queryException, consolidatedMessage));
} finally {
Tracing.getThreadAccountant().clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@


// TODO: Clean up ProcessingException (thrift) because we don't send it through the wire
// TODO: Rename this class to QueryExceptionUtil because it doesn't extend Exception
public class QueryException {
private QueryException() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.pinot.common.exception;

import org.apache.pinot.common.response.ProcessingException;


/**
* Exception to contain info about QueryException errors.
* Throwable version of {@link org.apache.pinot.common.response.broker.QueryProcessingException}
*/
public class QueryInfoException extends RuntimeException {
Copy link
Contributor Author

@real-mj-song real-mj-song Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this class?

QueryException is more of QueryExceptionUtil. It isn't throwable and rather a collection of static ProcessingException extends org.apache.thrift.TException. We need a throwable class that also contains information about ProcessingException.

Can't you use static QueryException.QUERY_VALIDATION_ERROR (or other from that family) directly?

Because they are basically org.apache.thrift.TException, they aren't suitable for general purpose exception. For instance, the message of QueryException.QUERY_VALIDATION_ERROR is statically set when defined/initialized.

Can't you use QueryProcessingException instead?

QueryProcessingException isn't throwable. It's named exception but it's really a QueryExceptionContainer used on broker side.

private ProcessingException _processingException;

public QueryInfoException(String message) {
super(message);
}

public QueryInfoException(String message, Throwable cause) {
super(message, cause);
}

public QueryInfoException(Throwable cause) {
super(cause);
}

public ProcessingException getProcessingException() {
return _processingException;
}

public void setProcessingException(ProcessingException processingException) {
_processingException = processingException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
Expand Down Expand Up @@ -190,11 +191,15 @@ protected static RuntimeException wrapOperatorException(Operator operator, Runti
// Otherwise, try to get the segment name to help locate the segment when debugging query errors.
// Not all operators have associated segment, so do this at best effort.
IndexSegment segment = operator.getIndexSegment();
if (segment == null) {
return e;
String errorMessage = null;
if (segment != null) {
errorMessage = "Caught exception while doing operator: " + operator.getClass()
+ " on segment: " + segment.getSegmentName();
}

if (e instanceof IllegalArgumentException || e instanceof BadQueryRequestException) {
throw new BadQueryRequestException(errorMessage, e);
}
throw new RuntimeException(
"Caught exception while doing operator: " + operator.getClass() + " on segment: " + segment.getSegmentName(),
e);
throw new RuntimeException(errorMessage, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -111,7 +112,11 @@ protected void processSegments() {
@Override
protected void onProcessSegmentsException(Throwable t) {
_processingException.compareAndSet(null, t);
_blockingQueue.offer(new ExceptionResultsBlock(t));
if (t instanceof BadQueryRequestException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServerQueryExecutorV1Impl, we are setting QUERY_VALIDATION_ERROR for BadQueryRequestException. Do we require the same changes here?

Either way I see this change only being made in BaseSingleBlockCombineOperator. What about GroupByCombineOperator and BaseStreamingCombineOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ServerQueryExecutorV1Impl, we are setting QUERY_VALIDATION_ERROR for BadQueryRequestException. Do we require the same changes here?

It does. Without this change, generic RuntimeException is thrown to QueryDispatcher.java which will bypass that check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way I see this change only being made in BaseSingleBlockCombineOperator. What about GroupByCombineOperator and BaseStreamingCombineOperator?

Yeah just noticed these implement the same interface. Will update them as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All patched.

_blockingQueue.offer(new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, t));
} else {
_blockingQueue.offer(new ExceptionResultsBlock(t));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.IntermediateRecord;
Expand All @@ -41,6 +42,7 @@
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.Tracing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -203,6 +205,9 @@ public BaseResultsBlock mergeResults()

Throwable processingException = _processingException.get();
if (processingException != null) {
if (processingException instanceof BadQueryRequestException) {
return new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, processingException);
}
return new ExceptionResultsBlock(processingException);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pinot.core.operator.combine.merger.ResultsBlockMerger;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
Expand Down Expand Up @@ -171,7 +172,11 @@ protected void onProcessSegmentsException(Throwable t) {
_processingException.compareAndSet(null, t);
// Clear the blocking queue and add the exception results block to terminate the main thread
_blockingQueue.clear();
_blockingQueue.offer(new ExceptionResultsBlock(t));
if (t instanceof BadQueryRequestException) {
_blockingQueue.offer(new ExceptionResultsBlock(QueryException.QUERY_VALIDATION_ERROR, t));
} else {
_blockingQueue.offer(new ExceptionResultsBlock(t));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class MaxAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute max for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute max for non-numeric type: " + blockValSet.getValueType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class MinAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -143,7 +144,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute min for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute min for non-numeric type: " + blockValSet.getValueType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;


public class SumAggregationFunction extends NullableSingleInputAggregationFunction<Double, Double> {
Expand Down Expand Up @@ -139,7 +140,7 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
break;
}
default:
throw new IllegalStateException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType());
throw new BadQueryRequestException("Cannot compute sum for non-numeric type: " + blockValSet.getValueType());
}
updateAggregationResultHolder(aggregationResultHolder, sum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, E
// Do not log verbose error for BadQueryRequestException and QueryCancelledException.
if (e instanceof BadQueryRequestException) {
LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
instanceResponse.addException(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
} else if (e instanceof QueryCancelledException) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Cancelled while processing requestId: {}", requestId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,15 @@ public void testQueryExceptions()

testQueryException("SELECT POTATO(ArrTime) FROM mytable",
useMultiStageQueryEngine()
? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_EXECUTION_ERROR_CODE);
? QueryException.QUERY_PLANNING_ERROR_CODE : QueryException.QUERY_VALIDATION_ERROR_CODE);

// ArrTime expects a numeric type
testQueryException("SELECT COUNT(*) FROM mytable where ArrTime = 'potato'",
QueryException.QUERY_EXECUTION_ERROR_CODE);
QueryException.QUERY_VALIDATION_ERROR_CODE);

// Cannot use numeric aggregate function for string column
testQueryException("SELECT MAX(OriginState) FROM mytable where ArrTime > 5",
QueryException.QUERY_VALIDATION_ERROR_CODE);
}

private void testQueryException(String query, int errorCode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine)
testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
} else {
response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError"));
}

// invalid argument
Expand All @@ -1043,7 +1043,7 @@ public void testBase64Func(boolean useMultiStageQueryEngine)
testQueryError(sqlQuery, QueryException.QUERY_PLANNING_ERROR_CODE);
} else {
response = postQuery(sqlQuery);
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));
assertTrue(response.get("exceptions").get(0).get("message").toString().startsWith("\"QueryValidationError"));
}

// invalid argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.calcite.runtime.PairList;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.exception.QueryInfoException;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
Expand Down Expand Up @@ -459,7 +461,15 @@ public static QueryResult runReducer(long requestId,
}
// TODO: Improve the error handling, e.g. return partial response
if (block.isErrorBlock()) {
throw new RuntimeException("Received error query execution result block: " + block.getExceptions());
Map<Integer, String> queryExceptions = block.getExceptions();
String errorMessage = "Received error query execution result block: " + queryExceptions;
if (queryExceptions.containsKey(QueryException.QUERY_VALIDATION_ERROR_CODE)) {
QueryInfoException queryInfoException = new QueryInfoException(errorMessage);
queryInfoException.setProcessingException(QueryException.QUERY_VALIDATION_ERROR);
throw queryInfoException;
}

throw new RuntimeException(errorMessage);
}
assert block.isSuccessfulEndOfStreamBlock();
MultiStageQueryStats queryStats = block.getQueryStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testSqlWithExceptionMsgChecker(String sql, String expectedError) {
Assert.assertTrue(
exceptionMessage.startsWith("Received error query execution result block: ") || exceptionMessage.startsWith(
"Error occurred during stage submission") || exceptionMessage.equals(expectedError),
"Exception message didn't start with proper heading: " + exceptionMessage);
"Exception message didn't start with proper heading: " + expectedError);
Assert.assertTrue(exceptionMessage.contains(expectedError),
"Exception should contain: " + expectedError + ", but found: " + exceptionMessage);
}
Expand Down
Loading