From f7a948854f178cd871cca19e6dd31c2237d4947e Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Wed, 15 Jan 2025 15:05:14 -0800 Subject: [PATCH 01/22] add cancelClientQuery operation for SingleStageBroker (only numerical identifiers so far) --- .../api/resources/PinotClientRequest.java | 39 ++++++++++ .../BaseSingleStageBrokerRequestHandler.java | 75 +++++++++++++++++-- .../requesthandler/BrokerRequestHandler.java | 15 ++++ .../BrokerRequestHandlerDelegate.java | 10 +++ .../MultiStageBrokerRequestHandler.java | 8 ++ .../TimeSeriesRequestHandler.java | 8 ++ .../pinot/common/response/BrokerResponse.java | 10 +++ .../response/broker/BrokerResponseNative.java | 23 ++++-- .../broker/BrokerResponseNativeV2.java | 17 ++++- .../resources/PinotRunningQueryResource.java | 63 ++++++++++++++++ .../pinot/spi/utils/CommonConstants.java | 3 + 11 files changed, 257 insertions(+), 14 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 44da5f962d32..3aee5279dc34 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -415,6 +415,45 @@ public String cancelQuery( .build()); } + @DELETE + @Path("clientQuery/{clientQueryId}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for" + + "the given clientQueryId on the requested broker. Query may continue to run for a short while after calling" + + "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found on the requested broker") + }) + public String cancelClientQuery( + @ApiParam(value = "ClientQueryId given by the client", required = true) + @PathParam("clientQueryId") String clientQueryId, + @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") + @DefaultValue("3000") int timeoutMs, + @ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") + boolean verbose) { + try { + Map serverResponses = verbose ? new HashMap<>() : null; + if (_requestHandler.cancelQueryByClientId(clientQueryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) { + String resp = "Cancelled client query: " + clientQueryId; + if (verbose) { + resp += " with responses from servers: " + serverResponses; + } + return resp; + } + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(String.format( + "Failed to cancel client query: %s on the broker due to error: %s", clientQueryId, e.getMessage())) + .build()); + } + throw new WebApplicationException( + Response.status(Response.Status.NOT_FOUND).entity( + String.format("Client query: %s not found on the broker", clientQueryId)) + .build()); + } + @GET @Path("queries") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 72b69a24fadb..cd804ece2573 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -21,7 +21,10 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -141,6 +144,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ protected final boolean _enableDistinctCountBitmapOverride; protected final int _queryResponseLimit; protected final Map _queriesById; + protected final BiMap _clientQueryIds; protected final boolean _enableMultistageMigrationMetric; protected ExecutorService _multistageCompileExecutor; protected BlockingQueue> _multistageCompileQueryQueue; @@ -160,7 +164,13 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT); boolean enableQueryCancellation = Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); - _queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null; + if (enableQueryCancellation) { + _queriesById = new ConcurrentHashMap<>(); + _clientQueryIds = Maps.synchronizedBiMap(HashBiMap.create()); + } else { + _queriesById = null; + _clientQueryIds = null; + } _enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC); @@ -210,13 +220,13 @@ public void shutDown() { @Override public Map getRunningQueries() { - Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); } @VisibleForTesting Set getRunningServers(long requestId) { - Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); QueryServers queryServers = _queriesById.get(requestId); return queryServers != null ? queryServers._servers : Collections.emptySet(); } @@ -225,7 +235,12 @@ Set getRunningServers(long requestId) { public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { - Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + return cancelQueryByRequestId(requestId, timeoutMs, executor, connMgr, serverResponses); + } + + private boolean cancelQueryByRequestId(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map serverResponses) throws Exception { QueryServers queryServers = _queriesById.get(requestId); if (queryServers == null) { return false; @@ -275,6 +290,19 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt return true; } + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + Long requestId = this._clientQueryIds.inverse().get(clientQueryId); + if (requestId == null) { + LOGGER.warn("query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); + return false; + } + return this.cancelQueryByRequestId(requestId, timeoutMs, executor, connMgr, serverResponses); + } + @Override protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @@ -797,7 +825,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } BrokerResponseNative brokerResponse; - if (_queriesById != null) { + if (isQueryCancellationEnabled()) { // Start to track the running query for cancellation just before sending it out to servers to avoid any // potential failures that could happen before sending it out, like failures to calculate the routing table etc. // TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and @@ -807,13 +835,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // condition makes cancel API less reliable. This should be rare as it assumes sending queries out to // servers takes time, but will address later if needed. _queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); + String cqid = maybeSaveClientQueryId(requestId, sqlNodeAndOptions); LOGGER.debug("Keep track of running query: {}", requestId); try { brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); + brokerResponse.setClientRequestId(cqid); } finally { _queriesById.remove(requestId); + maybeRemoveClientQueryId(requestId); LOGGER.debug("Remove track of running query: {}", requestId); } } else { @@ -865,6 +896,40 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } + private String maybeSaveClientQueryId(long requestId, SqlNodeAndOptions sqlNodeAndOptions) { + if (!isQueryCancellationEnabled()) return null; + String clientQueryId = extractClientQueryId(sqlNodeAndOptions); + if (StringUtils.isBlank(clientQueryId)) return null; + String prev = _clientQueryIds.put(requestId, clientQueryId); + if (!clientQueryId.equals(prev)) { + LOGGER.warn("client query id override for id {} (old: {}, new: {})", requestId, prev, clientQueryId); + } else { + LOGGER.info("client query id stored for requestId {}: {}", requestId, clientQueryId); + } + return clientQueryId; + } + + private boolean maybeRemoveClientQueryId(long requestId) { + if (!isQueryCancellationEnabled()) return false; + // we protected insertion with isBlank, so null is enough to assume that no entry exists + String clientQueryId = _clientQueryIds.remove(requestId); + if (clientQueryId != null) { + LOGGER.debug("client query id {} removed for requestId {}", clientQueryId, requestId); + return true; + } else { + return false; + } + } + + private String extractClientQueryId(SqlNodeAndOptions sqlNodeAndOptions) { + if (sqlNodeAndOptions.getOptions() == null) return null; + return sqlNodeAndOptions.getOptions().get(QueryOptionKey.CLIENT_QUERY_ID); + } + + private boolean isQueryCancellationEnabled() { + return _queriesById != null; + } + @VisibleForTesting static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy, String offlineRoutingPolicy) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java index 277f5f96df63..710cc68b182a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -83,4 +83,19 @@ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, Strin boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception; + + /** + * Cancel a query as identified by the clientQueryId provided externally. This method is non-blocking so the query may + * still run for a while after calling this method. This cancel method can be called multiple times. + * @param clientQueryId the Id assigned to the query by the client + * @param timeoutMs timeout to wait for servers to respond the cancel requests + * @param executor to send cancel requests to servers in parallel + * @param connMgr to provide the http connections + * @param serverResponses to collect cancel responses from all servers if a map is provided + * @return true if there is a running query for the given clientQueryId. + */ + boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index 561e79abb4fe..cc64548717e2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -148,6 +148,16 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses); } + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + // TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if + // not found, try on the singleStaged engine. + return _singleStageBrokerRequestHandler.cancelQueryByClientId( + clientQueryId, timeoutMs, executor, connMgr, serverResponses); + } + private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response) throws Exception { if (numRows == null) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 2e75b6dd9018..1f47657bc842 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -432,6 +432,14 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC throw new UnsupportedOperationException(); } + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + // TODO: Support query cancellation for multi-stage engine + throw new UnsupportedOperationException(); + } + /** * Returns the string representation of the Set of Strings with a limit on the number of elements. * @param setOfStrings Set of strings diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java index d14f2860138a..8dc58f522dd1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java @@ -141,6 +141,14 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC return false; } + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + // TODO: Implement this. + return false; + } + private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString) throws URISyntaxException { List pairs = URLEncodedUtils.parse( diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index a1db078a3cca..90cc107b450a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -119,6 +119,16 @@ default int getExceptionsSize() { */ void setRequestId(String requestId); + /** + * Returns the client request IF of the query (if any). + */ + String getClientRequestId(); + + /** + * Sets the (optional) client requestID of the query; + */ + void setClientRequestId(String clientRequestId); + /** * Returns the broker ID that handled the query. */ diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index 6d8cdac1325a..71acb560ce1e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -45,12 +45,12 @@ */ @JsonPropertyOrder({ "resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached", "timeUsedMs", - "requestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", - "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", - "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", - "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", - "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs", - "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", + "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", + "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried", + "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", + "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", + "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", + "brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried" @@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse { private boolean _numGroupsLimitReached = false; private long _timeUsedMs = 0L; private String _requestId; + private String _clientRequestId; private String _brokerId; private long _numDocsScanned = 0L; private long _totalDocs = 0L; @@ -227,6 +228,16 @@ public void setRequestId(String requestId) { _requestId = requestId; } + @Override + public String getClientRequestId() { + return _clientRequestId; + } + + @Override + public void setClientRequestId(String clientRequestId) { + _clientRequestId = clientRequestId; + } + @Override public String getBrokerId() { return _brokerId; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 8e9a4d6b81d7..4642ca200e9a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -39,9 +39,9 @@ */ @JsonPropertyOrder({ "resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached", "maxRowsInJoinReached", - "maxRowsInWindowReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId", "brokerId", - "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", - "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", + "maxRowsInWindowReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId", "clientRequestId", + "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", + "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs", @@ -182,6 +182,17 @@ public String getRequestId() { return _requestId; } + @Override + public String getClientRequestId() { + // TODO: support cqid for MSQE + return null; + } + + @Override + public void setClientRequestId(String clientRequestId) { + // TODO: support cqid for MSQE + } + @Override public void setRequestId(String requestId) { _requestId = requestId; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java index 08dac9e756dd..4e6a610cab0c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java @@ -160,6 +160,69 @@ public String cancelQuery( } } + @DELETE + @Path("clientQuery/{brokerId}/{clientQueryId}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for" + + "the given clientQueryId on the requested broker. Query may continue to run for a short while after calling" + + "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found on the requested broker") + }) + public String cancelClientQuery( + @ApiParam(value = "Broker that's running the query", required = true) @PathParam("brokerId") String brokerId, + @ApiParam(value = "ClientQueryId provided by the client", required = true) + @PathParam("clientQueryId") long clientQueryId, + @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") + @DefaultValue("3000") int timeoutMs, + @ApiParam(value = "Return verbose responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") + boolean verbose, @Context HttpHeaders httpHeaders) { + InstanceConfig broker = _pinotHelixResourceManager.getHelixInstanceConfig(brokerId); + if (broker == null) { + throw new WebApplicationException( + Response.status(Response.Status.BAD_REQUEST).entity("Unknown broker: " + brokerId).build()); + } + try { + Timeout timeout = Timeout.of(timeoutMs, TimeUnit.MILLISECONDS); + RequestConfig defaultRequestConfig = + RequestConfig.custom().setConnectionRequestTimeout(timeout).setResponseTimeout(timeout).build(); + + CloseableHttpClient client = + HttpClients.custom().setConnectionManager(_httpConnMgr).setDefaultRequestConfig(defaultRequestConfig).build(); + + String protocol = _controllerConf.getControllerBrokerProtocol(); + int portOverride = _controllerConf.getControllerBrokerPortOverride(); + int port = portOverride > 0 ? portOverride : Integer.parseInt(broker.getPort()); + HttpDelete deleteMethod = new HttpDelete(String.format( + "%s://%s:%d/clientQuery/%d?verbose=%b", + protocol, broker.getHostName(), port, clientQueryId, verbose)); + Map requestHeaders = createRequestHeaders(httpHeaders); + requestHeaders.forEach(deleteMethod::setHeader); + try (CloseableHttpResponse response = client.execute(deleteMethod)) { + int status = response.getCode(); + String responseContent = EntityUtils.toString(response.getEntity()); + if (status == 200) { + return responseContent; + } + if (status == 404) { + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(String.format("Client query: %s not found on the broker: %s", clientQueryId, brokerId)).build()); + } + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity( + String.format("Failed to cancel client query: %s on the broker: %s with unexpected status=%d and resp='%s'", + clientQueryId, brokerId, status, responseContent)).build()); + } + } catch (WebApplicationException e) { + throw e; + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity( + String.format("Failed to cancel client query: %s on the broker: %s due to error: %s", clientQueryId, brokerId, + e.getMessage())).build()); + } + } + @GET @Path("/queries") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c44bd246a0f5..ca3550c8b0d0 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -527,6 +527,9 @@ public static class QueryOptionKey { public static final String GET_CURSOR = "getCursor"; // Number of rows that the cursor should contain public static final String CURSOR_NUM_ROWS = "cursorNumRows"; + + // Custom Query ID provided by the client + public static final String CLIENT_QUERY_ID = "clientQueryId"; } public static class QueryOptionValue { From 39a4f9426098ad24299f3e37f2c3b814b92a6778 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Wed, 15 Jan 2025 16:08:30 -0800 Subject: [PATCH 02/22] avoid synchronized BiMap and checkstyle --- .../BaseSingleStageBrokerRequestHandler.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index cd804ece2573..1ceea86ca1cd 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -21,16 +21,14 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionService; @@ -144,7 +142,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ protected final boolean _enableDistinctCountBitmapOverride; protected final int _queryResponseLimit; protected final Map _queriesById; - protected final BiMap _clientQueryIds; + protected final Map _clientQueryIds; protected final boolean _enableMultistageMigrationMetric; protected ExecutorService _multistageCompileExecutor; protected BlockingQueue> _multistageCompileQueryQueue; @@ -166,7 +164,7 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); if (enableQueryCancellation) { _queriesById = new ConcurrentHashMap<>(); - _clientQueryIds = Maps.synchronizedBiMap(HashBiMap.create()); + _clientQueryIds = new ConcurrentHashMap<>(); } else { _queriesById = null; _clientQueryIds = null; @@ -239,8 +237,8 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt return cancelQueryByRequestId(requestId, timeoutMs, executor, connMgr, serverResponses); } - private boolean cancelQueryByRequestId(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, - Map serverResponses) throws Exception { + private boolean cancelQueryByRequestId(long requestId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { QueryServers queryServers = _queriesById.get(requestId); if (queryServers == null) { return false; @@ -295,12 +293,14 @@ public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Execut HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); - Long requestId = this._clientQueryIds.inverse().get(clientQueryId); - if (requestId == null) { + Optional requestId = _clientQueryIds.entrySet().stream() + .filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst(); + if (requestId.isPresent()) { + return cancelQueryByRequestId(requestId.get(), timeoutMs, executor, connMgr, serverResponses); + } else { LOGGER.warn("query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); return false; } - return this.cancelQueryByRequestId(requestId, timeoutMs, executor, connMgr, serverResponses); } @Override @@ -897,9 +897,13 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } private String maybeSaveClientQueryId(long requestId, SqlNodeAndOptions sqlNodeAndOptions) { - if (!isQueryCancellationEnabled()) return null; + if (!isQueryCancellationEnabled()) { + return null; + } String clientQueryId = extractClientQueryId(sqlNodeAndOptions); - if (StringUtils.isBlank(clientQueryId)) return null; + if (StringUtils.isBlank(clientQueryId)) { + return null; + } String prev = _clientQueryIds.put(requestId, clientQueryId); if (!clientQueryId.equals(prev)) { LOGGER.warn("client query id override for id {} (old: {}, new: {})", requestId, prev, clientQueryId); @@ -910,7 +914,9 @@ private String maybeSaveClientQueryId(long requestId, SqlNodeAndOptions sqlNodeA } private boolean maybeRemoveClientQueryId(long requestId) { - if (!isQueryCancellationEnabled()) return false; + if (!isQueryCancellationEnabled()) { + return false; + } // we protected insertion with isBlank, so null is enough to assume that no entry exists String clientQueryId = _clientQueryIds.remove(requestId); if (clientQueryId != null) { @@ -922,7 +928,9 @@ private boolean maybeRemoveClientQueryId(long requestId) { } private String extractClientQueryId(SqlNodeAndOptions sqlNodeAndOptions) { - if (sqlNodeAndOptions.getOptions() == null) return null; + if (sqlNodeAndOptions.getOptions() == null) { + return null; + } return sqlNodeAndOptions.getOptions().get(QueryOptionKey.CLIENT_QUERY_ID); } From 7a5f713f3af2db64dd126f5484c843b68f2bd493 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Mon, 27 Jan 2025 13:52:37 +0100 Subject: [PATCH 03/22] add cancel feature (with queryId and clientQueryId) to MSQE, some refactor for code reusal --- .../BaseBrokerRequestHandler.java | 83 ++++++++++++++ .../BaseSingleStageBrokerRequestHandler.java | 103 +++++------------- .../BrokerRequestHandlerDelegate.java | 22 ++-- .../MultiStageBrokerRequestHandler.java | 29 ++--- .../broker/BrokerResponseNativeV2.java | 6 +- .../service/dispatch/QueryDispatcher.java | 69 +++++++++--- .../tools/MultistageEngineQuickStart.java | 2 + 7 files changed, 187 insertions(+), 127 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 9a5e0e94a487..d4e5d6508042 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -19,11 +19,16 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import javax.ws.rs.WebApplicationException; @@ -31,6 +36,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.pinot.broker.api.AccessControl; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.broker.broker.AccessControlFactory; @@ -51,6 +57,7 @@ import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory; import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.slf4j.Logger; @@ -74,6 +81,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final QueryLogger _queryLogger; @Nullable protected final String _enableNullHandling; + protected final Map _queriesById; + protected final Map _clientQueryIds; public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) { @@ -90,6 +99,16 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); _queryLogger = new QueryLogger(config); _enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING); + + boolean enableQueryCancellation = + Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); + if (enableQueryCancellation) { + _queriesById = new ConcurrentHashMap<>(); + _clientQueryIds = new ConcurrentHashMap<>(); + } else { + _queriesById = null; + _clientQueryIds = null; + } } @Override @@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq @Nullable HttpHeaders httpHeaders, AccessControl accessControl) throws Exception; + protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) throws Exception; + protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { statistics.setNumRowsResultSet(response.getNumRowsResultSet()); // TODO: Add partial result flag to RequestContext @@ -223,4 +245,65 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments()); statistics.setTraceInfo(response.getTraceInfo()); } + + @Override + public Map getRunningQueries() { + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + return new HashMap<>(_queriesById); + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception { + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + try { + return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses); + } finally { + maybeRemoveQuery(queryId); + } + } + + @Override + public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, + HttpClientConnectionManager connMgr, Map serverResponses) + throws Exception { + Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); + Optional requestId = _clientQueryIds.entrySet().stream() + .filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst(); + if (requestId.isPresent()) { + return cancelQuery(requestId.get(), timeoutMs, executor, connMgr, serverResponses); + } else { + LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); + return false; + } + } + + protected String maybeSaveQuery(long requestId, SqlNodeAndOptions sqlNodeAndOptions, String query) { + if (isQueryCancellationEnabled()) { + String clientRequestId = sqlNodeAndOptions.getOptions() != null + ? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null; + _queriesById.put(requestId, query); + if (StringUtils.isNotBlank(clientRequestId)) { + _clientQueryIds.put(requestId, clientRequestId); + LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); + } else { + LOGGER.debug("Keep track of running query: {}", requestId); + } + return clientRequestId; + } + return null; + } + + protected void maybeRemoveQuery(long requestId) { + if (isQueryCancellationEnabled()) { + _queriesById.remove(requestId); + _clientQueryIds.remove(requestId); + LOGGER.debug("Remove track of running query: {}", requestId); + } + } + + protected boolean isQueryCancellationEnabled() { + return _queriesById != null; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 79c5c07a754d..592b5042c67d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -28,7 +28,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionService; @@ -39,7 +38,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import javax.ws.rs.WebApplicationException; @@ -141,8 +139,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ protected final boolean _enableQueryLimitOverride; protected final boolean _enableDistinctCountBitmapOverride; protected final int _queryResponseLimit; - protected final Map _queriesById; - protected final Map _clientQueryIds; + protected final Map _serversById; protected final boolean _enableMultistageMigrationMetric; protected ExecutorService _multistageCompileExecutor; protected BlockingQueue> _multistageCompileQueryQueue; @@ -160,14 +157,10 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro _config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false); _queryResponseLimit = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT); - boolean enableQueryCancellation = - Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); - if (enableQueryCancellation) { - _queriesById = new ConcurrentHashMap<>(); - _clientQueryIds = new ConcurrentHashMap<>(); + if (this.isQueryCancellationEnabled()) { + _serversById = new ConcurrentHashMap<>(); } else { - _queriesById = null; - _clientQueryIds = null; + _serversById = null; } _enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, @@ -178,9 +171,9 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro } LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, query log max length: {}, " - + "query log max rate: {}, query cancellation enabled: {}", getClass().getSimpleName(), _brokerId, + + "query log max rate: {}", getClass().getSimpleName(), _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), - enableQueryCancellation); + this.isQueryCancellationEnabled()); } @Override @@ -216,36 +209,33 @@ public void shutDown() { } } - @Override - public Map getRunningQueries() { - Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); - return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); - } - @VisibleForTesting Set getRunningServers(long requestId) { Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); - QueryServers queryServers = _queriesById.get(requestId); + QueryServers queryServers = _serversById.get(requestId); return queryServers != null ? queryServers._servers : Collections.emptySet(); } @Override - public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, - Map serverResponses) - throws Exception { - Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); - return cancelQueryByRequestId(requestId, timeoutMs, executor, connMgr, serverResponses); + protected void maybeRemoveQuery(long requestId) { + super.maybeRemoveQuery(requestId); + if (isQueryCancellationEnabled()) { + _serversById.remove(requestId); + } } - private boolean cancelQueryByRequestId(long requestId, int timeoutMs, Executor executor, - HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { - QueryServers queryServers = _queriesById.get(requestId); + @Override + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception { + QueryServers queryServers = _serversById.get(queryId); if (queryServers == null) { return false; } + // TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for // details - String globalQueryId = getGlobalQueryId(requestId); + String globalQueryId = getGlobalQueryId(queryId); List> serverUrls = new ArrayList<>(); for (ServerInstance serverInstance : queryServers._servers) { serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null)); @@ -288,21 +278,6 @@ private boolean cancelQueryByRequestId(long requestId, int timeoutMs, Executor e return true; } - @Override - public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, - HttpClientConnectionManager connMgr, Map serverResponses) - throws Exception { - Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); - Optional requestId = _clientQueryIds.entrySet().stream() - .filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst(); - if (requestId.isPresent()) { - return cancelQueryByRequestId(requestId.get(), timeoutMs, executor, connMgr, serverResponses); - } else { - LOGGER.warn("query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); - return false; - } - } - @Override protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @@ -836,17 +811,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // can always list the running queries and cancel query again until it ends. Just that such race // condition makes cancel API less reliable. This should be rare as it assumes sending queries out to // servers takes time, but will address later if needed. - _queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); - String cqid = maybeSaveClientQueryId(requestId, sqlNodeAndOptions); - LOGGER.debug("Keep track of running query: {}", requestId); + String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); + if (isQueryCancellationEnabled()) { + _serversById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); + } try { brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); - brokerResponse.setClientRequestId(cqid); + brokerResponse.setClientRequestId(clientRequestId); } finally { - _queriesById.remove(requestId); - maybeRemoveClientQueryId(requestId); + maybeRemoveQuery(requestId); LOGGER.debug("Remove track of running query: {}", requestId); } } else { @@ -902,23 +877,6 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } - private String maybeSaveClientQueryId(long requestId, SqlNodeAndOptions sqlNodeAndOptions) { - if (!isQueryCancellationEnabled()) { - return null; - } - String clientQueryId = extractClientQueryId(sqlNodeAndOptions); - if (StringUtils.isBlank(clientQueryId)) { - return null; - } - String prev = _clientQueryIds.put(requestId, clientQueryId); - if (!clientQueryId.equals(prev)) { - LOGGER.warn("client query id override for id {} (old: {}, new: {})", requestId, prev, clientQueryId); - } else { - LOGGER.info("client query id stored for requestId {}: {}", requestId, clientQueryId); - } - return clientQueryId; - } - private boolean maybeRemoveClientQueryId(long requestId) { if (!isQueryCancellationEnabled()) { return false; @@ -933,17 +891,6 @@ private boolean maybeRemoveClientQueryId(long requestId) { } } - private String extractClientQueryId(SqlNodeAndOptions sqlNodeAndOptions) { - if (sqlNodeAndOptions.getOptions() == null) { - return null; - } - return sqlNodeAndOptions.getOptions().get(QueryOptionKey.CLIENT_QUERY_ID); - } - - private boolean isQueryCancellationEnabled() { - return _queriesById != null; - } - @VisibleForTesting static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy, String offlineRoutingPolicy) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index cc64548717e2..03965b6fef69 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -133,18 +133,22 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String @Override public Map getRunningQueries() { - // TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its - // running queries with those from singleStaged engine. Both engines share the same request Id generator, so - // the query will have unique ids across the two engines. - return _singleStageBrokerRequestHandler.getRunningQueries(); + // Both engines share the same request Id generator, so the query will have unique ids across the two engines. + Map queries = _singleStageBrokerRequestHandler.getRunningQueries(); + if (_multiStageBrokerRequestHandler != null) { + queries.putAll(_multiStageBrokerRequestHandler.getRunningQueries()); + } + return queries; } @Override public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { - // TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if - // not found, try on the singleStaged engine. + if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQuery( + queryId, timeoutMs, executor, connMgr, serverResponses)) { + return true; + } return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses); } @@ -152,8 +156,10 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception { - // TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if - // not found, try on the singleStaged engine. + if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQueryByClientId( + clientQueryId, timeoutMs, executor, connMgr, serverResponses)) { + return true; + } return _singleStageBrokerRequestHandler.cancelQueryByClientId( clientQueryId, timeoutMs, executor, connMgr, serverResponses); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 600f3f29c830..02bd47101d25 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -104,10 +104,12 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED, CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config, CommonConstants.Broker.BROKER_TLS_PREFIX) : null; - _queryDispatcher = new QueryDispatcher(new MailboxService(hostname, port, config, tlsConfig), tlsConfig); + _queryDispatcher = new QueryDispatcher( + new MailboxService(hostname, port, config, tlsConfig), tlsConfig, this.isQueryCancellationEnabled()); LOGGER.info("Initialized MultiStageBrokerRequestHandler on host: {}, port: {} with broker id: {}, timeout: {}ms, " - + "query log max length: {}, query log max rate: {}", hostname, port, _brokerId, _brokerTimeoutMs, - _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit()); + + "query log max length: {}, query log max rate: {}, query cancellation enabled: {}", hostname, port, + _brokerId, _brokerTimeoutMs, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), + this.isQueryCancellationEnabled()); _explainAskingServerDefault = _config.getProperty( CommonConstants.MultiStageQueryRunner.KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN, CommonConstants.MultiStageQueryRunner.DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN); @@ -258,6 +260,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO long executionStartTimeNs = System.nanoTime(); QueryDispatcher.QueryResult queryResults; + String clientRequestId = null; try { queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimer.getRemainingTime(), @@ -283,6 +286,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO executionEndTimeNs - executionStartTimeNs); BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); + brokerResponse.setClientRequestId(clientRequestId); brokerResponse.setResultTable(queryResults.getResultTable()); brokerResponse.setTablesQueried(tableNames); brokerResponse.setBrokerReduceTimeMs(queryResults.getBrokerReduceTimeMs()); @@ -432,24 +436,9 @@ private BrokerResponse constructMultistageExplainPlan(String sql, String plan) { } @Override - public Map getRunningQueries() { - // TODO: Support running query tracking for multi-stage engine - throw new UnsupportedOperationException(); - } - - @Override - public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) { - // TODO: Support query cancellation for multi-stage engine - throw new UnsupportedOperationException(); - } - - @Override - public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, - HttpClientConnectionManager connMgr, Map serverResponses) - throws Exception { - // TODO: Support query cancellation for multi-stage engine - throw new UnsupportedOperationException(); + return _queryDispatcher.cancel(queryId); } /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 4642ca200e9a..ce9a95bf0eeb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -72,6 +72,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse { */ private long _maxRowsInOperator; private String _requestId; + private String _clientRequestId; private String _brokerId; private int _numServersQueried; private int _numServersResponded; @@ -184,13 +185,12 @@ public String getRequestId() { @Override public String getClientRequestId() { - // TODO: support cqid for MSQE - return null; + return _clientRequestId; } @Override public void setClientRequestId(String clientRequestId) { - // TODO: support cqid for MSQE + _clientRequestId = clientRequestId; } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 253f800d5d04..9ddb820aa0ce 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -105,18 +105,24 @@ public class QueryDispatcher { private final Map _timeSeriesDispatchClientMap = new ConcurrentHashMap<>(); @Nullable private final TlsConfig _tlsConfig; + private final Map> _serversByQuery; private final PhysicalTimeSeriesBrokerPlanVisitor _timeSeriesBrokerPlanVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); public QueryDispatcher(MailboxService mailboxService) { - this(mailboxService, null); + this(mailboxService, null, false); } - public QueryDispatcher(MailboxService mailboxService, @Nullable TlsConfig tlsConfig) { + public QueryDispatcher(MailboxService mailboxService, @Nullable TlsConfig tlsConfig, boolean enableCancellation) { _mailboxService = mailboxService; _executorService = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors(), new TracedThreadFactory(Thread.NORM_PRIORITY, false, PINOT_BROKER_QUERY_DISPATCHER_FORMAT)); _tlsConfig = tlsConfig; + if (enableCancellation) { + _serversByQuery = new ConcurrentHashMap<>(); + } else { + _serversByQuery = null; + } } public void start() { @@ -127,13 +133,13 @@ public QueryResult submitAndReduce(RequestContext context, DispatchableSubPlan d Map queryOptions) throws Exception { long requestId = context.getRequestId(); - List plans = dispatchableSubPlan.getQueryStageList(); + Set servers = null; try { - submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions); + servers = submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions); return runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, _mailboxService); } catch (Throwable e) { // TODO: Consider always cancel when it returns (early terminate) - cancel(requestId, plans); + cancel(requestId, servers); throw e; } } @@ -145,11 +151,13 @@ public List explain(RequestContext context, DispatchablePlanFragment f List planNodes = new ArrayList<>(); List plans = Collections.singletonList(fragment); + final Set[] servers = new Set[1]; try { SendRequest> requestSender = DispatchClient::explain; - execute(requestId, plans, timeoutMs, queryOptions, requestSender, (responses, serverInstance) -> { + servers[0] = execute(requestId, plans, timeoutMs, queryOptions, requestSender, (responses, serverInstance) -> { for (Worker.ExplainResponse response : responses) { if (response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR)) { + cancel(requestId, servers[0]); throw new RuntimeException( String.format("Unable to explain query plan for request: %d on server: %s, ERROR: %s", requestId, serverInstance, @@ -162,6 +170,7 @@ public List explain(RequestContext context, DispatchablePlanFragment f Plan.PlanNode planNode = Plan.PlanNode.parseFrom(rootNode); planNodes.add(PlanNodeDeserializer.process(planNode)); } catch (InvalidProtocolBufferException e) { + cancel(requestId, servers[0]); throw new RuntimeException("Failed to parse explain plan node for request " + requestId + " from server " + serverInstance, e); } @@ -170,20 +179,24 @@ public List explain(RequestContext context, DispatchablePlanFragment f }); } catch (Throwable e) { // TODO: Consider always cancel when it returns (early terminate) - cancel(requestId, plans); + cancel(requestId, servers[0]); throw e; } return planNodes; } @VisibleForTesting - void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map queryOptions) + Set submit( + long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map queryOptions) throws Exception { SendRequest requestSender = DispatchClient::submit; List stagePlans = dispatchableSubPlan.getQueryStageList(); List plansWithoutRoot = stagePlans.subList(1, stagePlans.size()); - execute(requestId, plansWithoutRoot, timeoutMs, queryOptions, requestSender, (response, serverInstance) -> { + final Set[] servers = new Set[1]; + servers[0] = execute(requestId, plansWithoutRoot, timeoutMs, queryOptions, requestSender, + (response, serverInstance) -> { if (response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR)) { + cancel(requestId, servers[0]); throw new RuntimeException( String.format("Unable to execute query plan for request: %d on server: %s, ERROR: %s", requestId, serverInstance, @@ -191,10 +204,19 @@ void submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeou "null"))); } }); + if (isQueryCancellationEnabled()) { + _serversByQuery.put(requestId, servers[0]); + } + return servers[0]; } - private void execute(long requestId, List stagePlans, long timeoutMs, - Map queryOptions, SendRequest sendRequest, BiConsumer resultConsumer) + private boolean isQueryCancellationEnabled() { + return _serversByQuery != null; + } + + private Set execute(long requestId, List stagePlans, + long timeoutMs, Map queryOptions, + SendRequest sendRequest, BiConsumer resultConsumer) throws ExecutionException, InterruptedException, TimeoutException { Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS); @@ -253,6 +275,8 @@ private void execute(long requestId, List stagePla if (deadline.isExpired()) { throw new TimeoutException("Timed out waiting for response of async query-dispatch"); } + + return serverInstances; } Map initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan, long deadlineMs, @@ -359,20 +383,29 @@ private StageInfo(ByteString rootNode, ByteString customProperty) { } } - private void cancel(long requestId, List stagePlans) { - int numStages = stagePlans.size(); - // Skip the reduce stage (stage 0) - Set serversToCancel = new HashSet<>(); - for (int stageId = 1; stageId < numStages; stageId++) { - serversToCancel.addAll(stagePlans.get(stageId).getServerInstanceToWorkerIdMap().keySet()); + public boolean cancel(long requestId) { + if (isQueryCancellationEnabled()) { + return cancel(requestId, _serversByQuery.remove(requestId)); + } else { + return false; } - for (QueryServerInstance queryServerInstance : serversToCancel) { + } + + private boolean cancel(long requestId, @Nullable Set servers) { + if (servers == null) { + return false; + } + for (QueryServerInstance queryServerInstance : servers) { try { getOrCreateDispatchClient(queryServerInstance).cancel(requestId); } catch (Throwable t) { LOGGER.warn("Caught exception while cancelling query: {} on server: {}", requestId, queryServerInstance, t); } } + if (isQueryCancellationEnabled()) { + _serversByQuery.remove(requestId); + } + return true; } private DispatchClient getOrCreateDispatchClient(QueryServerInstance queryServerInstance) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java index e249c37c0b70..92f771a70f89 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/MultistageEngineQuickStart.java @@ -294,6 +294,8 @@ public String[] getDefaultBatchTableDirectories() { protected Map getConfigOverrides() { Map configOverrides = new HashMap<>(); configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, true); + configOverrides.put(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, true); + configOverrides.put(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION, true); return configOverrides; } From a9d1e49f9ba462cc70b617f9d7980a5c80d26311 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Mon, 27 Jan 2025 14:11:41 +0100 Subject: [PATCH 04/22] set and delete clientRequestId on MSQE --- .../broker/requesthandler/MultiStageBrokerRequestHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 02bd47101d25..310f8faed526 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -255,12 +255,13 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } + String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); + try { Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId), ThreadExecutionContext.TaskType.MSE); long executionStartTimeNs = System.nanoTime(); QueryDispatcher.QueryResult queryResults; - String clientRequestId = null; try { queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimer.getRemainingTime(), @@ -280,6 +281,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, consolidatedMessage)); } finally { Tracing.getThreadAccountant().clear(); + maybeRemoveQuery(requestId); } long executionEndTimeNs = System.nanoTime(); updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, From 97e7b5df3277706f76034fd14c355ee22e4134e2 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Mon, 27 Jan 2025 14:22:43 +0100 Subject: [PATCH 05/22] fix unimplemented method --- .../broker/requesthandler/TimeSeriesRequestHandler.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java index 8dc58f522dd1..b486b7c0a18d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java @@ -82,6 +82,13 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S throw new IllegalArgumentException("Not supported yet"); } + @Override + protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, + Map serverResponses) + throws Exception { + throw new IllegalArgumentException("Not supported yet"); + } + @Override public void start() { LOGGER.info("Starting time-series request handler"); From 65f73a06d82cb0e0924ce78a0f4d66f54fa6b526 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Mon, 27 Jan 2025 17:14:06 +0100 Subject: [PATCH 06/22] fix I/O parameter and related tests --- .../service/dispatch/QueryDispatcher.java | 44 +++++++++---------- .../service/dispatch/QueryDispatcherTest.java | 19 +++++--- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 9ddb820aa0ce..a810305c114e 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -133,9 +133,9 @@ public QueryResult submitAndReduce(RequestContext context, DispatchableSubPlan d Map queryOptions) throws Exception { long requestId = context.getRequestId(); - Set servers = null; + Set servers = new HashSet<>(); try { - servers = submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions); + submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions); return runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, _mailboxService); } catch (Throwable e) { // TODO: Consider always cancel when it returns (early terminate) @@ -151,13 +151,13 @@ public List explain(RequestContext context, DispatchablePlanFragment f List planNodes = new ArrayList<>(); List plans = Collections.singletonList(fragment); - final Set[] servers = new Set[1]; + Set servers = new HashSet<>(); try { SendRequest> requestSender = DispatchClient::explain; - servers[0] = execute(requestId, plans, timeoutMs, queryOptions, requestSender, (responses, serverInstance) -> { + execute(requestId, plans, timeoutMs, queryOptions, requestSender, servers, (responses, serverInstance) -> { for (Worker.ExplainResponse response : responses) { if (response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR)) { - cancel(requestId, servers[0]); + cancel(requestId, servers); throw new RuntimeException( String.format("Unable to explain query plan for request: %d on server: %s, ERROR: %s", requestId, serverInstance, @@ -170,7 +170,7 @@ public List explain(RequestContext context, DispatchablePlanFragment f Plan.PlanNode planNode = Plan.PlanNode.parseFrom(rootNode); planNodes.add(PlanNodeDeserializer.process(planNode)); } catch (InvalidProtocolBufferException e) { - cancel(requestId, servers[0]); + cancel(requestId, servers); throw new RuntimeException("Failed to parse explain plan node for request " + requestId + " from server " + serverInstance, e); } @@ -179,24 +179,24 @@ public List explain(RequestContext context, DispatchablePlanFragment f }); } catch (Throwable e) { // TODO: Consider always cancel when it returns (early terminate) - cancel(requestId, servers[0]); + cancel(requestId, servers); throw e; } return planNodes; } @VisibleForTesting - Set submit( - long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Map queryOptions) + void submit( + long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, Set serversOut, + Map queryOptions) throws Exception { SendRequest requestSender = DispatchClient::submit; List stagePlans = dispatchableSubPlan.getQueryStageList(); List plansWithoutRoot = stagePlans.subList(1, stagePlans.size()); - final Set[] servers = new Set[1]; - servers[0] = execute(requestId, plansWithoutRoot, timeoutMs, queryOptions, requestSender, + execute(requestId, plansWithoutRoot, timeoutMs, queryOptions, requestSender, serversOut, (response, serverInstance) -> { if (response.containsMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR)) { - cancel(requestId, servers[0]); + cancel(requestId, serversOut); throw new RuntimeException( String.format("Unable to execute query plan for request: %d on server: %s, ERROR: %s", requestId, serverInstance, @@ -205,27 +205,25 @@ Set submit( } }); if (isQueryCancellationEnabled()) { - _serversByQuery.put(requestId, servers[0]); + _serversByQuery.put(requestId, serversOut); } - return servers[0]; } private boolean isQueryCancellationEnabled() { return _serversByQuery != null; } - private Set execute(long requestId, List stagePlans, + private void execute(long requestId, List stagePlans, long timeoutMs, Map queryOptions, - SendRequest sendRequest, BiConsumer resultConsumer) + SendRequest sendRequest, Set serverInstancesOut, + BiConsumer resultConsumer) throws ExecutionException, InterruptedException, TimeoutException { Deadline deadline = Deadline.after(timeoutMs, TimeUnit.MILLISECONDS); - Set serverInstances = new HashSet<>(); + List stageInfos = serializePlanFragments(stagePlans, serverInstancesOut, deadline); - List stageInfos = serializePlanFragments(stagePlans, serverInstances, deadline); - - if (serverInstances.isEmpty()) { + if (serverInstancesOut.isEmpty()) { throw new RuntimeException("No server instances to dispatch query to"); } @@ -233,10 +231,10 @@ private Set execute(long requestId, List> dispatchCallbacks = new ArrayBlockingQueue<>(numServers); - for (QueryServerInstance serverInstance : serverInstances) { + for (QueryServerInstance serverInstance : serverInstancesOut) { Consumer> callbackConsumer = response -> { if (!dispatchCallbacks.offer(response)) { LOGGER.warn("Failed to offer response to dispatchCallbacks queue for query: {} on server: {}", requestId, @@ -275,8 +273,6 @@ private Set execute(long requestId, List initializeTimeSeriesMetadataMap(TimeSeriesDispatchablePlan dispatchablePlan, long deadlineMs, diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java index c21c40b2d9db..6f64791ec69f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java @@ -22,8 +22,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -85,7 +87,8 @@ public void tearDown() { public void testQueryDispatcherCanSendCorrectPayload(String sql) throws Exception { DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, new HashSet<>(), Collections.emptyMap()); } @Test @@ -95,7 +98,8 @@ public void testQueryDispatcherThrowsWhenQueryServerThrows() { Mockito.doThrow(new RuntimeException("foo")).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, new HashSet<>(), Collections.emptyMap()); Assert.fail("Method call above should have failed"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("Error dispatching query")); @@ -111,7 +115,7 @@ public void testQueryDispatcherCancelWhenQueryServerCallsOnError() Mockito.doAnswer(invocationOnMock -> { StreamObserver observer = invocationOnMock.getArgument(1); observer.onError(new RuntimeException("foo")); - return null; + return Set.of(); }).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); long requestId = REQUEST_ID_GEN.getAndIncrement(); RequestContext context = new DefaultRequestContext(); @@ -166,7 +170,8 @@ public void testQueryDispatcherThrowsWhenQueryServerCallsOnError() { }).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 10_000L, new HashSet<>(), Collections.emptyMap()); Assert.fail("Method call above should have failed"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("Error dispatching query")); @@ -187,7 +192,8 @@ public void testQueryDispatcherThrowsWhenQueryServerTimesOut() { }).when(failingQueryServer).submit(Mockito.any(), Mockito.any()); DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 200L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 200L, new HashSet<>(), Collections.emptyMap()); Assert.fail("Method call above should have failed"); } catch (Exception e) { String message = e.getMessage(); @@ -203,6 +209,7 @@ public void testQueryDispatcherThrowsWhenDeadlinePreExpiredAndAsyncResponseNotPo throws Exception { String sql = "SELECT * FROM a WHERE col1 = 'foo'"; DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); - _queryDispatcher.submit(REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 0L, Collections.emptyMap()); + _queryDispatcher.submit( + REQUEST_ID_GEN.getAndIncrement(), dispatchableSubPlan, 0L, new HashSet<>(), Collections.emptyMap()); } } From fe5c8467e375cf30148eb9c6f749d9083a767373 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Tue, 28 Jan 2025 07:57:31 +0100 Subject: [PATCH 07/22] add clientRequestId on response test --- .../tests/OfflineClusterIntegrationTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index d67a8fb18838..9ecb99df160f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; @@ -79,6 +80,7 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.JsonUtils; @@ -180,6 +182,12 @@ protected List getFieldConfigs() { CompressionCodec.MV_ENTRY_DICT, null)); } + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, "true"); + } + @BeforeClass public void setUp() throws Exception { @@ -3753,4 +3761,18 @@ public void testFastFilteredCountWithOrFilterOnBitmapWithExclusiveBitmap(boolean "SELECT COUNT(*) FROM mytable WHERE Origin BETWEEN 'ALB' AND 'LMT' OR DayofMonth <> 2" ); } + + @Test(dataProvider = "useBothQueryEngines") + public void testResponseWithClientRequestId(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String clientRequestId = UUID.randomUUID().toString(); + String sqlQuery = + "SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; " + + "SELECT AirlineID FROM mytable LIMIT 1"; + JsonNode result = postQuery(sqlQuery); + assertNoError(result); + + assertEquals(result.get("clientRequestId").asText(), clientRequestId); + } } From 2eb506ee5aedfd58f7dfcba50f0ede57b9ee631d Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Tue, 28 Jan 2025 18:01:51 +0100 Subject: [PATCH 08/22] add sleep and random functions for further tests --- .../function/scalar/ArithmeticFunctions.java | 8 ++++++++ .../function/scalar/DateTimeFunctions.java | 11 +++++++++++ .../function/ArithmeticFunctionsTest.java | 18 ++++++++++++++++++ .../data/function/DateTimeFunctionsTest.java | 19 +++++++++++++++++++ 4 files changed, 56 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java index d27a3fa6cccd..5d8b3d75233a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.math.RoundingMode; +import java.util.Random; import org.apache.pinot.spi.annotations.ScalarFunction; @@ -219,4 +220,11 @@ public static long byteswapLong(long a) { // Skip the heading 0s in the long value return Long.reverseBytes(a); } + + @ScalarFunction(names = {"rand", "random"}) + public static long random(long min, long max) { + assert max >= min; + Random rnd = new Random(); + return min + (long) (rnd.nextDouble() * (max - min)); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java index ebb31ab2cb23..bc31372d9ca5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java @@ -1260,4 +1260,15 @@ public static long[] timestampDiffMVReverse(String unit, long timestamp1, long[] public static int extract(String interval, long timestamp) { return DateTimeUtils.extract(DateTimeUtils.ExtractFieldType.valueOf(interval), timestamp); } + + @ScalarFunction + public static long sleep(long ms) { + assert ms >= 0; + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return ms; + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java index 61d62e45318e..28e272be4b96 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; import org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator; import org.apache.pinot.spi.data.readers.GenericRow; import org.testng.Assert; @@ -40,12 +41,29 @@ private void testFunction(String functionExpression, List expectedArgume Assert.assertEquals(evaluator.evaluate(row), expectedResult); } + private void testFunction(String functionExpression, List expectedArguments, GenericRow row, + Consumer assertResult) { + InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(functionExpression); + Assert.assertEquals(evaluator.getArguments(), expectedArguments); + assertResult.accept(evaluator.evaluate(row)); + } + @Test(dataProvider = "arithmeticFunctionsDataProvider") public void testArithmeticFunctions(String functionExpression, List expectedArguments, GenericRow row, Object expectedResult) { testFunction(functionExpression, expectedArguments, row, expectedResult); } + @Test + public void testRandomFunction() { + GenericRow row = new GenericRow(); + row.putValue("a", 1000L); + row.putValue("b", 2000L); + testFunction("rand(a,b)", Lists.newArrayList("a", "b"), row, result -> { + Assert.assertTrue((Long) result >= 1000L && (Long) result <= 2000L); + }); + } + @DataProvider(name = "arithmeticFunctionsDataProvider") public Object[][] arithmeticFunctionsDataProvider() { List inputs = new ArrayList<>(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java index 190142e65d3f..adae37b814fe 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Consumer; import org.apache.pinot.common.function.scalar.DateTimeFunctions; import org.apache.pinot.segment.local.function.InbuiltFunctionEvaluator; import org.apache.pinot.spi.data.readers.GenericRow; @@ -36,6 +37,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.AssertJUnit.assertTrue; /** @@ -54,6 +56,13 @@ private void testFunction(String functionExpression, List expectedArgume assertEquals(evaluator.evaluate(row), expectedResult); } + private void testFunction(String functionExpression, List expectedArguments, GenericRow row, + Consumer assertResult) { + InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(functionExpression); + assertEquals(evaluator.getArguments(), expectedArguments); + assertResult.accept(evaluator.evaluate(row)); + } + private void testDateFunction(String functionExpression, List expectedArguments, GenericRow row, Object expectedResult) { InbuiltFunctionEvaluator evaluator = new InbuiltFunctionEvaluator(functionExpression); @@ -778,4 +787,14 @@ public void testDateTimeConvertMultipleInvocations() { testMultipleInvocations(String.format("dateTimeConvert(timeCol, '%s', '%s', '%s')", inputFormatStr, outputFormatStr, outputGranularityStr), rows, expectedResults); } + + @Test + public void testSleepFunction() { + long startTime = System.currentTimeMillis(); + testFunction("sleep(500)", Collections.emptyList(), new GenericRow(), result -> { + assertTrue((long) result >= 500); + }); + long endTime = System.currentTimeMillis(); + assertTrue(endTime - startTime >= 500); + } } From e9bbdac8371fc2c48ba4da0b1c911fe0e40b5da7 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Wed, 29 Jan 2025 08:32:22 +0100 Subject: [PATCH 09/22] override test server conf --- .../integration/tests/OfflineClusterIntegrationTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 02f326773d2c..0a8829095221 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -187,6 +187,12 @@ protected void overrideBrokerConf(PinotConfiguration brokerConf) { brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, "true"); } + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + super.overrideServerConf(serverConf); + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION, "true"); + } + @BeforeClass public void setUp() throws Exception { From 9dcc3939fb675fe795808f6ea3045d6bbf9aeac8 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Wed, 29 Jan 2025 10:04:12 +0100 Subject: [PATCH 10/22] add missing superclass call --- .../tests/MultiNodesOfflineClusterIntegrationTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index a981a930ce41..24349eb78756 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -66,6 +66,7 @@ protected int getNumReplicas() { @Override protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); brokerConf.setProperty(FailureDetector.CONFIG_OF_TYPE, FailureDetector.Type.CONNECTION.name()); } From 5ac7d9ee5f869d01a31d9502ed568e4ad887c1e5 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Wed, 29 Jan 2025 14:22:30 +0100 Subject: [PATCH 11/22] add some cancel query test using internal sleep function with a trick --- .../BaseSingleStageBrokerRequestHandler.java | 14 -- .../function/scalar/ArithmeticFunctions.java | 8 - .../function/scalar/DateTimeFunctions.java | 22 +- .../resources/PinotRunningQueryResource.java | 92 +++++++- .../function/ArithmeticFunctionsTest.java | 10 - .../tests/ClusterIntegrationTestUtils.java | 4 + .../pinot/integration/tests/ClusterTest.java | 7 + .../tests/CancelQueryIntegrationTests.java | 222 ++++++++++++++++++ .../builder/ControllerRequestURLBuilder.java | 4 + 9 files changed, 334 insertions(+), 49 deletions(-) create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 5ada7c9f3d5e..09348b74b547 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -880,20 +880,6 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } } - private boolean maybeRemoveClientQueryId(long requestId) { - if (!isQueryCancellationEnabled()) { - return false; - } - // we protected insertion with isBlank, so null is enough to assume that no entry exists - String clientQueryId = _clientQueryIds.remove(requestId); - if (clientQueryId != null) { - LOGGER.debug("client query id {} removed for requestId {}", clientQueryId, requestId); - return true; - } else { - return false; - } - } - @VisibleForTesting static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy, String offlineRoutingPolicy) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java index 5d8b3d75233a..d27a3fa6cccd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArithmeticFunctions.java @@ -20,7 +20,6 @@ import java.math.BigDecimal; import java.math.RoundingMode; -import java.util.Random; import org.apache.pinot.spi.annotations.ScalarFunction; @@ -220,11 +219,4 @@ public static long byteswapLong(long a) { // Skip the heading 0s in the long value return Long.reverseBytes(a); } - - @ScalarFunction(names = {"rand", "random"}) - public static long random(long min, long max) { - assert max >= min; - Random rnd = new Random(); - return min + (long) (rnd.nextDouble() * (max - min)); - } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java index bc31372d9ca5..1de85f92022e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java @@ -547,6 +547,17 @@ public static long now() { return System.currentTimeMillis(); } + @ScalarFunction + public static long sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + //TODO: handle interruption + //Thread.currentThread().interrupt(); + } + return millis; + } + /** * Return time as epoch millis before the given period (in ISO-8601 duration format). * Examples: @@ -1260,15 +1271,4 @@ public static long[] timestampDiffMVReverse(String unit, long timestamp1, long[] public static int extract(String interval, long timestamp) { return DateTimeUtils.extract(DateTimeUtils.ExtractFieldType.valueOf(interval), timestamp); } - - @ScalarFunction - public static long sleep(long ms) { - assert ms >= 0; - try { - Thread.sleep(ms); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return ms; - } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java index 4e6a610cab0c..653e9a433202 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java @@ -30,8 +30,10 @@ import java.net.URI; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletionService; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -164,14 +166,14 @@ public String cancelQuery( @Path("clientQuery/{brokerId}/{clientQueryId}") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for" + @ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for " + "the given clientQueryId on the requested broker. Query may continue to run for a short while after calling" + "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.") @ApiResponses(value = { @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 404, message = "Query not found on the requested broker") }) - public String cancelClientQuery( + public String cancelClientQueryInBroker( @ApiParam(value = "Broker that's running the query", required = true) @PathParam("brokerId") String brokerId, @ApiParam(value = "ClientQueryId provided by the client", required = true) @PathParam("clientQueryId") long clientQueryId, @@ -223,6 +225,79 @@ public String cancelClientQuery( } } + @DELETE + @Path("clientQuery/{clientQueryId}") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for" + + "the given clientQueryId on any broker. Query may continue to run for a short while after calling" + + "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found on any broker") + }) + public String cancelClientQuery( + @ApiParam(value = "ClientQueryId provided by the client", required = true) + @PathParam("clientQueryId") String clientQueryId, + @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") + @DefaultValue("3000") int timeoutMs, + @ApiParam(value = "Return verbose responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") + boolean verbose, @Context HttpHeaders httpHeaders) { + try { + Timeout timeout = Timeout.of(timeoutMs, TimeUnit.MILLISECONDS); + RequestConfig defaultRequestConfig = + RequestConfig.custom().setConnectionRequestTimeout(timeout).setResponseTimeout(timeout).build(); + CloseableHttpClient client = + HttpClients.custom().setConnectionManager(_httpConnMgr).setDefaultRequestConfig(defaultRequestConfig).build(); + + String protocol = _controllerConf.getControllerBrokerProtocol(); + int portOverride = _controllerConf.getControllerBrokerPortOverride(); + + Map requestHeaders = createRequestHeaders(httpHeaders); + List brokerDeletes = new ArrayList<>(); + for (InstanceInfo broker: getBrokers(httpHeaders.getHeaderString(DATABASE)).values()) { + int port = portOverride > 0 ? portOverride : broker.getPort(); + HttpDelete delete = new HttpDelete(String.format( + "%s://%s:%d/clientQuery/%s?verbose=%b", protocol, broker.getHost(), port, clientQueryId, verbose)); + requestHeaders.forEach(delete::setHeader); + brokerDeletes.add(delete); + } + + if (brokerDeletes.isEmpty()) { + throw new WebApplicationException( + Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity("No available brokers").build()); + } + + Set statusCodes = new HashSet<>(); + for (HttpDelete delete: brokerDeletes) { + try (CloseableHttpResponse response = client.execute(delete)) { + int status = response.getCode(); + String responseContent = EntityUtils.toString(response.getEntity()); + if (status == 200) { + return responseContent; + } else { + statusCodes.add(status); + } + } + } + + if (statusCodes.size() == 1 && statusCodes.iterator().next() == 404) { + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(String.format("Client query: %s not found on any broker", clientQueryId)).build()); + } + + statusCodes.remove(404); + int status = statusCodes.iterator().next(); + throw new Exception( + String.format("Unexpected status=%d", status)); + } catch (WebApplicationException e) { + throw e; + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity( + String.format("Failed to cancel client query: %s due to error: %s", clientQueryId, e.getMessage())).build()); + } + } + @GET @Path("/queries") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY) @@ -236,10 +311,7 @@ public Map> getRunningQueries( @ApiParam(value = "Timeout for brokers to return running queries") @QueryParam("timeoutMs") @DefaultValue("3000") int timeoutMs, @Context HttpHeaders httpHeaders) { try { - Map> tableBrokers = - _pinotHelixResourceManager.getTableToLiveBrokersMapping(httpHeaders.getHeaderString(DATABASE)); - Map brokers = new HashMap<>(); - tableBrokers.values().forEach(list -> list.forEach(info -> brokers.putIfAbsent(getInstanceKey(info), info))); + Map brokers = getBrokers(httpHeaders.getHeaderString(DATABASE)); return getRunningQueries(brokers, timeoutMs, createRequestHeaders(httpHeaders)); } catch (Exception e) { throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) @@ -310,4 +382,12 @@ private static Map createRequestHeaders(HttpHeaders httpHeaders) }); return requestHeaders; } + + private Map getBrokers(String database) { + Map> tableBrokers = + _pinotHelixResourceManager.getTableToLiveBrokersMapping(database); + Map brokers = new HashMap<>(); + tableBrokers.values().forEach(list -> list.forEach(info -> brokers.putIfAbsent(getInstanceKey(info), info))); + return brokers; + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java index 28e272be4b96..7d8d022d20d9 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/ArithmeticFunctionsTest.java @@ -54,16 +54,6 @@ public void testArithmeticFunctions(String functionExpression, List expe testFunction(functionExpression, expectedArguments, row, expectedResult); } - @Test - public void testRandomFunction() { - GenericRow row = new GenericRow(); - row.putValue("a", 1000L); - row.putValue("b", 2000L); - testFunction("rand(a,b)", Lists.newArrayList("a", "b"), row, result -> { - Assert.assertTrue((Long) result >= 1000L && (Long) result <= 2000L); - }); - } - @DataProvider(name = "arithmeticFunctionsDataProvider") public Object[][] arithmeticFunctionsDataProvider() { List inputs = new ArrayList<>(); diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 299d60c75496..a54fcd2ba0bf 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -873,6 +873,10 @@ public static String getBrokerQueryApiUrl(String brokerBaseApiUrl, boolean useMu return useMultiStageQueryEngine ? brokerBaseApiUrl + "/query" : brokerBaseApiUrl + "/query/sql"; } + public static String getBrokerQueryCancelUrl(String brokerBaseApiUrl, String brokerId, String clientQueryId) { + return brokerBaseApiUrl + "/clientQuery/" + brokerId + "/" + clientQueryId; + } + private static int getH2ExpectedValues(Set expectedValues, List expectedOrderByValues, ResultSet h2ResultSet, ResultSetMetaData h2MetaData, Collection orderByColumns) throws SQLException { diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index d2b4db8a1eca..85b1383aaade 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -565,6 +565,13 @@ public JsonNode postQueryToController(String query) return postQueryToController(query, getControllerBaseApiUrl(), null, getExtraQueryPropertiesForController()); } + public JsonNode cancelQuery(String clientQueryId) + throws Exception { + URI cancelURI = URI.create(getControllerRequestURLBuilder().forCancelQueryByClientId(clientQueryId)); + Object o = _httpClient.sendDeleteRequest(cancelURI); + return null; // TODO + } + private Map getExtraQueryPropertiesForController() { if (!useMultiStageQueryEngine()) { return Map.of(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java new file mode 100644 index 000000000000..1ce1a13f2157 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Timer; +import java.util.UUID; +import org.apache.commons.io.FileUtils; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.pinot.common.utils.ServiceStatus; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.InstanceTypeUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +/** + * Integration test that checks the query cancellation feature. + */ +public class CancelQueryIntegrationTests extends BaseClusterIntegrationTestSet { + private static final int NUM_BROKERS = 1; + private static final int NUM_SERVERS = 4; + + private final List _serviceStatusCallbacks = + new ArrayList<>(getNumBrokers() + getNumServers()); + + protected int getNumBrokers() { + return NUM_BROKERS; + } + + protected int getNumServers() { + return NUM_SERVERS; + } + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, "true"); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + super.overrideServerConf(serverConf); + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION, "true"); + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + // Set hyperloglog log2m value to 12. + HelixConfigScope scope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName()) + .build(); + _helixManager.getConfigAccessor() + .set(scope, CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(12)); + startBrokers(getNumBrokers()); + startServers(getNumServers()); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name + // and validate correctness with parallel push protection enabled. + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + // Create a copy of _tarDir to create multiple segments with the same name. + File tarDir2 = new File(_tempDir, "tarDir2"); + FileUtils.copyDirectory(_tarDir, tarDir2); + + List tarDirs = new ArrayList<>(); + tarDirs.add(_tarDir); + tarDirs.add(tarDir2); + try { + uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); + } catch (Exception e) { + // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one + // of the three exception: + // - 409 conflict of the second call enters ProcessExistingSegment + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently + // In such cases we upload all the segments again to ensure that the data is set up correctly. + assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() + .contains("Failed to create ZK metadata for segment") || e.getMessage() + .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); + uploadSegments(getTableName(), _tarDir); + } + + // Set up the H2 connection + setUpH2Connection(avroFiles); + + // Initialize the query generator + setUpQueryGenerator(avroFiles); + + // Set up service status callbacks + // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the + // resources to monitor + registerCallbackHandlers(); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + private void registerCallbackHandlers() { + List instances = _helixAdmin.getInstancesInCluster(getHelixClusterName()); + instances.removeIf( + instanceId -> !InstanceTypeUtils.isBroker(instanceId) && !InstanceTypeUtils.isServer(instanceId)); + List resourcesInCluster = _helixAdmin.getResourcesInCluster(getHelixClusterName()); + resourcesInCluster.removeIf(resource -> (!TableNameBuilder.isTableResource(resource) + && !CommonConstants.Helix.BROKER_RESOURCE_INSTANCE.equals(resource))); + for (String instance : instances) { + List resourcesToMonitor = new ArrayList<>(); + for (String resourceName : resourcesInCluster) { + IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), resourceName); + for (String partitionName : idealState.getPartitionSet()) { + if (idealState.getInstanceSet(partitionName).contains(instance)) { + resourcesToMonitor.add(resourceName); + break; + } + } + } + _serviceStatusCallbacks.add(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of( + new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0), + new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(_helixManager, getHelixClusterName(), + instance, resourcesToMonitor, 100.0)))); + } + } + + @Test + public void testInstancesStarted() { + assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + getNumServers()); + for (ServiceStatus.ServiceStatusCallback serviceStatusCallback : _serviceStatusCallbacks) { + assertEquals(serviceStatusCallback.getServiceStatus(), ServiceStatus.Status.GOOD); + } + } + + @Test(dataProvider = "useBothQueryEngines") + public void testCancelByClientQueryId(boolean useMultiStageQueryEngine) + throws Exception { + setUseMultiStageQueryEngine(useMultiStageQueryEngine); + String clientRequestId = UUID.randomUUID().toString(); + // tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time + String sqlQuery = + "SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; " + + "SELECT sleep(ActualElapsedTime+60000) FROM mytable WHERE ActualElapsedTime > 0 limit 1"; + + new Timer().schedule(new java.util.TimerTask() { + @Override + public void run() { + try { + sendCancel(clientRequestId); + } catch (Exception e) { + fail("No exception should be thrown", e); + } + } + }, 1000); + + JsonNode result = postQuery(sqlQuery); + // ugly: error message differs from SSQE to MSQE + assertQueryCancellation(result, useMultiStageQueryEngine ? "InterruptedException" : "QueryCancellationError"); + } + + private void sendCancel(String clientRequestId) + throws Exception { + cancelQuery(clientRequestId); + } + + private void assertQueryCancellation(JsonNode result, String errorText) { + assertNotNull(result); + JsonNode exceptions = result.get("exceptions"); + assertNotNull(exceptions); + assertTrue(exceptions.isArray()); + assertFalse(exceptions.isEmpty()); + for (JsonNode exception: exceptions) { + JsonNode message = exception.get("message"); + if (message != null && message.asText().contains(errorText)) { + return; + } + } + fail("At least one QueryCancellationError expected."); + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 25415c7b5671..9c0ede26839f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -614,4 +614,8 @@ private static String encode(String s) { public String forSegmentUpload() { return StringUtil.join("/", _baseUrl, "v2/segments"); } + + public String forCancelQueryByClientId(String clientRequestId) { + return StringUtil.join("/", _baseUrl, "clientQuery", clientRequestId); + } } From 47fb9bb786577e884b103a93cd2c23442fa6cb69 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 10:09:10 +0100 Subject: [PATCH 12/22] reuse same broker endpoint for internal and client-based cancellation --- .../api/resources/PinotClientRequest.java | 65 +++++-------------- .../resources/PinotRunningQueryResource.java | 2 +- .../tests/CancelQueryIntegrationTests.java | 2 +- 3 files changed, 20 insertions(+), 49 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 3aee5279dc34..cec17c240eec 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -380,77 +380,48 @@ public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspe } @DELETE - @Path("query/{queryId}") + @Path("query/{id}") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the " - + "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as " + @ApiOperation(value = "Cancel a query as identified by the id", notes = "No effect if no query exists for the " + + "given id on the requested broker. Query may continue to run for a short while after calling cancel as " + "it's done in a non-blocking manner. The cancel method can be called multiple times.") @ApiResponses(value = { @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), @ApiResponse(code = 404, message = "Query not found on the requested broker") }) public String cancelQuery( - @ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId, + @ApiParam(value = "Query id", required = true) @PathParam("id") String id, + @ApiParam(value = "Determines is query id is internal or provided by the client") @QueryParam("client") + @DefaultValue("false") boolean isClient, @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") @DefaultValue("3000") int timeoutMs, @ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") boolean verbose) { try { Map serverResponses = verbose ? new HashMap<>() : null; - if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) { - String resp = "Cancelled query: " + queryId; + if (isClient && _requestHandler.cancelQueryByClientId(id, timeoutMs, _executor, _httpConnMgr, serverResponses)) { + String resp = "Cancelled client query: " + id; if (verbose) { resp += " with responses from servers: " + serverResponses; } return resp; + } else if (_requestHandler.cancelQuery(Long.parseLong(id), timeoutMs, _executor, _httpConnMgr, serverResponses)) { + String resp = "Cancelled query: " + id; + if (verbose) { + resp += " with responses from servers: " + serverResponses; + } + return resp; } + } catch (NumberFormatException e) { + Response.status(Response.Status.BAD_REQUEST).entity(String.format("Invalid internal query id: %s", id)); } catch (Exception e) { throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(String.format("Failed to cancel query: %s on the broker due to error: %s", queryId, e.getMessage())) + .entity(String.format("Failed to cancel query: %s on the broker due to error: %s", id, e.getMessage())) .build()); } throw new WebApplicationException( - Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", queryId)) - .build()); - } - - @DELETE - @Path("clientQuery/{clientQueryId}") - @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) - @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for" - + "the given clientQueryId on the requested broker. Query may continue to run for a short while after calling" - + "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.") - @ApiResponses(value = { - @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), - @ApiResponse(code = 404, message = "Query not found on the requested broker") - }) - public String cancelClientQuery( - @ApiParam(value = "ClientQueryId given by the client", required = true) - @PathParam("clientQueryId") String clientQueryId, - @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") - @DefaultValue("3000") int timeoutMs, - @ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") - boolean verbose) { - try { - Map serverResponses = verbose ? new HashMap<>() : null; - if (_requestHandler.cancelQueryByClientId(clientQueryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) { - String resp = "Cancelled client query: " + clientQueryId; - if (verbose) { - resp += " with responses from servers: " + serverResponses; - } - return resp; - } - } catch (Exception e) { - throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) - .entity(String.format( - "Failed to cancel client query: %s on the broker due to error: %s", clientQueryId, e.getMessage())) - .build()); - } - throw new WebApplicationException( - Response.status(Response.Status.NOT_FOUND).entity( - String.format("Client query: %s not found on the broker", clientQueryId)) + Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", id)) .build()); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java index 653e9a433202..c2a9f3adb2a1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java @@ -258,7 +258,7 @@ public String cancelClientQuery( for (InstanceInfo broker: getBrokers(httpHeaders.getHeaderString(DATABASE)).values()) { int port = portOverride > 0 ? portOverride : broker.getPort(); HttpDelete delete = new HttpDelete(String.format( - "%s://%s:%d/clientQuery/%s?verbose=%b", protocol, broker.getHost(), port, clientQueryId, verbose)); + "%s://%s:%d/query/%s?client=true&verbose=%b", protocol, broker.getHost(), port, clientQueryId, verbose)); requestHeaders.forEach(delete::setHeader); brokerDeletes.add(delete); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java index 1ce1a13f2157..99f31adcbc8c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java @@ -193,7 +193,7 @@ public void run() { fail("No exception should be thrown", e); } } - }, 1000); + }, 500); JsonNode result = postQuery(sqlQuery); // ugly: error message differs from SSQE to MSQE From 52998d3e7007010030b0a287512c422cef7990fc Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 10:43:49 +0100 Subject: [PATCH 13/22] add javadoc --- .../pinot/broker/requesthandler/BaseBrokerRequestHandler.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index d4e5d6508042..7bcb7a56c91c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -198,6 +198,10 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq @Nullable HttpHeaders httpHeaders, AccessControl accessControl) throws Exception; + /** + * Attemps to cancel an ongoing query identified by its broker-generated id. + * @return true if the query was successfully cancelled, false otherwise. + */ protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, Map serverResponses) throws Exception; From e2678af9c9d7f2b749165ba78e5ec3ef37c1495c Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 11:04:58 +0100 Subject: [PATCH 14/22] add mapping comments --- .../pinot/broker/requesthandler/BaseBrokerRequestHandler.java | 2 ++ .../requesthandler/BaseSingleStageBrokerRequestHandler.java | 1 + .../apache/pinot/query/service/dispatch/QueryDispatcher.java | 1 + 3 files changed, 4 insertions(+) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 7bcb7a56c91c..bd424b7fedd3 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -81,7 +81,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final QueryLogger _queryLogger; @Nullable protected final String _enableNullHandling; + // maps broker-generated query id to the query string protected final Map _queriesById; + // maps broker-generated query id to client-provided query id protected final Map _clientQueryIds; public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index e24b8e50d381..4769965da452 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -141,6 +141,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ protected final boolean _enableQueryLimitOverride; protected final boolean _enableDistinctCountBitmapOverride; protected final int _queryResponseLimit; + // maps broker-generated query id with the servers that are running the query protected final Map _serversById; // if >= 0, then overrides default limit of 10, otherwise setting is ignored protected final int _defaultQueryLimit; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 40e18118bf9a..e3c8d07ef4a1 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -107,6 +107,7 @@ public class QueryDispatcher { private final Map _timeSeriesDispatchClientMap = new ConcurrentHashMap<>(); @Nullable private final TlsConfig _tlsConfig; + // maps broker-generated query id to the set of servers that the query was dispatched to private final Map> _serversByQuery; private final PhysicalTimeSeriesBrokerPlanVisitor _timeSeriesBrokerPlanVisitor = new PhysicalTimeSeriesBrokerPlanVisitor(); From c201cf4c59f520207bf567e42c9e6d2af0a4d420 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 12:11:07 +0100 Subject: [PATCH 15/22] refactor base broker methods --- .../BaseBrokerRequestHandler.java | 29 +++++++++---------- .../BaseSingleStageBrokerRequestHandler.java | 21 +++++++++----- .../MultiStageBrokerRequestHandler.java | 5 ++-- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index bd424b7fedd3..683bebe71c6a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -266,7 +266,7 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC try { return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses); } finally { - maybeRemoveQuery(queryId); + onQueryFinish(queryId); } } @@ -285,23 +285,22 @@ public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Execut } } - protected String maybeSaveQuery(long requestId, SqlNodeAndOptions sqlNodeAndOptions, String query) { - if (isQueryCancellationEnabled()) { - String clientRequestId = sqlNodeAndOptions.getOptions() != null - ? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null; - _queriesById.put(requestId, query); - if (StringUtils.isNotBlank(clientRequestId)) { - _clientQueryIds.put(requestId, clientRequestId); - LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); - } else { - LOGGER.debug("Keep track of running query: {}", requestId); - } - return clientRequestId; + protected String extractClientRequestId(SqlNodeAndOptions sqlNodeAndOptions) { + return sqlNodeAndOptions.getOptions() != null + ? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null; + } + + protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) { + _queriesById.put(requestId, query); + if (isQueryCancellationEnabled() && StringUtils.isNotBlank(clientRequestId)) { + _clientQueryIds.put(requestId, clientRequestId); + LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); + } else { + LOGGER.debug("Keep track of running query: {}", requestId); } - return null; } - protected void maybeRemoveQuery(long requestId) { + protected void onQueryFinish(long requestId) { if (isQueryCancellationEnabled()) { _queriesById.remove(requestId); _clientQueryIds.remove(requestId); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 4769965da452..9d3431622201 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -227,8 +227,8 @@ Set getRunningServers(long requestId) { } @Override - protected void maybeRemoveQuery(long requestId) { - super.maybeRemoveQuery(requestId); + protected void onQueryFinish(long requestId) { + super.onQueryFinish(requestId); if (isQueryCancellationEnabled()) { _serversById.remove(requestId); } @@ -825,17 +825,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // can always list the running queries and cancel query again until it ends. Just that such race // condition makes cancel API less reliable. This should be rare as it assumes sending queries out to // servers takes time, but will address later if needed. - String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); - if (isQueryCancellationEnabled()) { - _serversById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); - } + String clientRequestId = extractClientRequestId(sqlNodeAndOptions); + onQueryStart( + requestId, clientRequestId, query, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); try { brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); brokerResponse.setClientRequestId(clientRequestId); } finally { - maybeRemoveQuery(requestId); + onQueryFinish(requestId); LOGGER.debug("Remove track of running query: {}", requestId); } } else { @@ -919,6 +918,14 @@ static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRouti return errorMessage; } + @Override + protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) { + super.onQueryStart(requestId, clientRequestId, query, extras); + if (isQueryCancellationEnabled() && extras.length > 0 && extras[0] instanceof QueryServers) { + _serversById.put(requestId, (QueryServers) extras[0]); + } + } + private static String getRoutingPolicy(TableConfig tableConfig) { RoutingConfig routingConfig = tableConfig.getRoutingConfig(); if (routingConfig == null) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 7e275bc28fa5..bcb1b329e5a9 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -257,7 +257,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO return new BrokerResponseNative(QueryException.EXECUTION_TIMEOUT_ERROR); } - String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); + String clientRequestId = extractClientRequestId(sqlNodeAndOptions); + onQueryStart(requestId, clientRequestId, query); try { Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId), ThreadExecutionContext.TaskType.MSE); @@ -291,7 +292,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO QueryException.getException(queryException, consolidatedMessage)); } finally { Tracing.getThreadAccountant().clear(); - maybeRemoveQuery(requestId); + onQueryFinish(requestId); } long executionEndTimeNs = System.nanoTime(); updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, From c60b9538a6410bf716a216357737b41349f10c33 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 12:28:13 +0100 Subject: [PATCH 16/22] return immutable view instead of copy --- .../pinot/broker/requesthandler/BaseBrokerRequestHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 683bebe71c6a..75b664c1e9a9 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -255,7 +256,7 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons @Override public Map getRunningQueries() { Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); - return new HashMap<>(_queriesById); + return Collections.unmodifiableMap(_queriesById); } @Override From 6042ad2f1461a10b42117e026ee819609a0167d1 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 12:43:33 +0100 Subject: [PATCH 17/22] enable sleep(ms) function only during testing --- .../org/apache/pinot/common/function/FunctionUtils.java | 6 ++++++ .../pinot/common/function/scalar/DateTimeFunctions.java | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java index 60b6991733c8..10745ae6e010 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java @@ -216,4 +216,10 @@ public static RelDataType getRelDataType(RelDataTypeFactory typeFactory, Class Date: Fri, 31 Jan 2025 12:47:06 +0100 Subject: [PATCH 18/22] reduce unit test wait time --- .../pinot/core/data/function/DateTimeFunctionsTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java index adae37b814fe..0f9c4d8761f6 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionsTest.java @@ -791,10 +791,10 @@ public void testDateTimeConvertMultipleInvocations() { @Test public void testSleepFunction() { long startTime = System.currentTimeMillis(); - testFunction("sleep(500)", Collections.emptyList(), new GenericRow(), result -> { - assertTrue((long) result >= 500); + testFunction("sleep(50)", Collections.emptyList(), new GenericRow(), result -> { + assertTrue((long) result >= 50); }); long endTime = System.currentTimeMillis(); - assertTrue(endTime - startTime >= 500); + assertTrue(endTime - startTime >= 50); } } From 9d0f335f54cdf62a1a868e598dd43d17828ff127 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 12:51:08 +0100 Subject: [PATCH 19/22] replace constant with literal on test --- .../pinot/integration/tests/CancelQueryIntegrationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java index 99f31adcbc8c..913ad2984be3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CancelQueryIntegrationTests.java @@ -181,7 +181,7 @@ public void testCancelByClientQueryId(boolean useMultiStageQueryEngine) String clientRequestId = UUID.randomUUID().toString(); // tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time String sqlQuery = - "SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; " + "SET clientQueryId='" + clientRequestId + "'; " + "SELECT sleep(ActualElapsedTime+60000) FROM mytable WHERE ActualElapsedTime > 0 limit 1"; new Timer().schedule(new java.util.TimerTask() { From 1e00bf6b393036c4b6ab32781398a8963429c483 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 31 Jan 2025 13:13:51 +0100 Subject: [PATCH 20/22] linter --- .../pinot/broker/requesthandler/BaseBrokerRequestHandler.java | 1 - .../java/org/apache/pinot/common/function/FunctionUtils.java | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 75b664c1e9a9..00e93abd4c92 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -23,7 +23,6 @@ import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java index 10745ae6e010..c1445e98bde7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/FunctionUtils.java @@ -219,7 +219,9 @@ public static RelDataType getRelDataType(RelDataTypeFactory typeFactory, Class Date: Fri, 31 Jan 2025 16:45:26 +0100 Subject: [PATCH 21/22] remove embarassing npe --- .../requesthandler/BaseBrokerRequestHandler.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 00e93abd4c92..0261574394c1 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -291,12 +291,14 @@ protected String extractClientRequestId(SqlNodeAndOptions sqlNodeAndOptions) { } protected void onQueryStart(long requestId, String clientRequestId, String query, Object... extras) { - _queriesById.put(requestId, query); - if (isQueryCancellationEnabled() && StringUtils.isNotBlank(clientRequestId)) { - _clientQueryIds.put(requestId, clientRequestId); - LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); - } else { - LOGGER.debug("Keep track of running query: {}", requestId); + if (isQueryCancellationEnabled()) { + _queriesById.put(requestId, query); + if (StringUtils.isNotBlank(clientRequestId)) { + _clientQueryIds.put(requestId, clientRequestId); + LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); + } else { + LOGGER.debug("Keep track of running query: {}", requestId); + } } } From f8d23cde30d24a5a2a314629efc9f4d3a4b5e097 Mon Sep 17 00:00:00 2001 From: Alberto Bastos Date: Fri, 7 Feb 2025 16:32:02 +0100 Subject: [PATCH 22/22] minor pr suggestions --- .../broker/requesthandler/BaseBrokerRequestHandler.java | 8 ++++++-- .../pinot/common/function/scalar/DateTimeFunctions.java | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 0261574394c1..11c63c699510 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -81,9 +81,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final QueryLogger _queryLogger; @Nullable protected final String _enableNullHandling; - // maps broker-generated query id to the query string + /** + * Maps broker-generated query id to the query string. + */ protected final Map _queriesById; - // maps broker-generated query id to client-provided query id + /** + * Maps broker-generated query id to client-provided query id. + */ protected final Map _clientQueryIds; public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java index 18a052b0a42a..e9c0fc3aec37 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java @@ -555,8 +555,8 @@ public static long sleep(long millis) { Thread.sleep(millis); } } catch (InterruptedException e) { - //TODO: handle interruption - //Thread.currentThread().interrupt(); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); } return millis; }