Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker setting to override default implicit query response limit #14452

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected final int _defaultHllLog2m;
protected final boolean _enableQueryLimitOverride;
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
protected final int _queryResponseLimitOverride;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the variable name unchanged. This is not only for override, but actually the cap of the allowed limit

// if >= 0, then overrides default limit of 10, otherwise setting is ignored
protected final int _defaultQueryResponseLimit;
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved
protected final Map<Long, QueryServers> _queriesById;
protected final boolean _enableMultistageMigrationMetric;
protected ExecutorService _multistageCompileExecutor;
Expand All @@ -157,8 +159,10 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
_enableQueryLimitOverride = _config.getProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false);
_enableDistinctCountBitmapOverride =
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
_queryResponseLimitOverride =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_LIMIT_OVERRIDE);
_defaultQueryResponseLimit = config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_RESPONSE_LIMIT,
Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
Expand All @@ -170,9 +174,10 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
_multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
}

LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, query log max length: {}, "
+ "query log max rate: {}, query cancellation enabled: {}", getClass().getSimpleName(), _brokerId,
_brokerTimeoutMs, _queryResponseLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, "
+ "default query response limit {}, query log max length: {}, query log max rate: {}, query cancellation "
+ "enabled: {}", getClass().getSimpleName(), _brokerId, _brokerTimeoutMs, _queryResponseLimitOverride,
_defaultQueryResponseLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
enableQueryCancellation);
}

Expand Down Expand Up @@ -308,6 +313,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}

if (isDefaultQueryResponseLimitEnabled() && !pinotQuery.isSetLimit()) {
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we ever hit this? In PinotQuery thrift definition, limit has a default value 10. Will limit always be set?

pinotQuery.setLimit(_defaultQueryResponseLimit);
}

if (isLiteralOnlyQuery(pinotQuery)) {
LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
try {
Expand Down Expand Up @@ -391,7 +400,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
handleHLLLog2mOverride(serverPinotQuery, _defaultHllLog2m);
}
if (_enableQueryLimitOverride) {
handleQueryLimitOverride(serverPinotQuery, _queryResponseLimit);
handleQueryLimitOverride(serverPinotQuery, _queryResponseLimitOverride);
}
handleSegmentPartitionedDistinctCountOverride(serverPinotQuery,
getSegmentPartitionedColumns(_tableCache, tableName));
Expand Down Expand Up @@ -514,7 +523,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO

// Validate the request
try {
validateRequest(serverPinotQuery, _queryResponseLimit);
validateRequest(serverPinotQuery, _queryResponseLimitOverride);
} catch (Exception e) {
LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage());
requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
Expand Down Expand Up @@ -855,6 +864,10 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}

private boolean isDefaultQueryResponseLimitEnabled() {
return _defaultQueryResponseLimit > -1;
}

@VisibleForTesting
static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy,
String offlineRoutingPolicy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void setUp()
properties.put(Helix.CONFIG_OF_ZOOKEEPR_SERVER, getZkUrl());
properties.put(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, true);
properties.put(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
properties.put(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_RESPONSE_LIMIT, 1000);

_brokerStarter = new HelixBrokerStarter();
_brokerStarter.init(new PinotConfiguration(properties));
Expand Down Expand Up @@ -145,6 +146,7 @@ public void testClusterConfigOverride() {

// NOTE: It is disabled in cluster config, but enabled in instance config. Instance config should take precedence.
assertTrue(config.getProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false));
assertEquals(config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_RESPONSE_LIMIT, 1), 1000);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public void clear() {
this.groupByList = null;
this.orderByList = null;
this.havingExpression = null;
// if instance gets cleared and reused make sure default limit is applied (if set)
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved
this.limit = 10;

this.offset = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.util.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.io.File;

import static org.testng.Assert.*;


// this test uses separate cluster because it needs to change broker configuration
// which is only done once per suite
public class BrokerQueryLimitTest extends BaseClusterIntegrationTest {
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved

protected static final Logger LOGGER = LoggerFactory.getLogger(BrokerQueryLimitTest.class);
private static final String LONG_COLUMN = "longCol";
public static final int DEFAULT_LIMIT = 5;

@Test
public void testWhenLimitIsNOTSetExplicitlyThenDefaultLimitIsApplied()
throws Exception {
setUseMultiStageQueryEngine(false);
String query = String.format("SELECT %s FROM %s", LONG_COLUMN, getTableName());

JsonNode result = postQuery(query).get("resultTable");
JsonNode columnDataTypesNode = result.get("dataSchema").get("columnDataTypes");
assertEquals(columnDataTypesNode.get(0).textValue(), "LONG");

JsonNode rows = result.get("rows");
assertEquals(rows.size(), DEFAULT_LIMIT);

for (int rowNum = 0; rowNum < rows.size(); rowNum++) {
JsonNode row = rows.get(rowNum);
assertEquals(row.size(), 1);
assertEquals(row.get(0).asLong(), rowNum);
}
}

@Test
public void testWhenLimitISSetExplicitlyThenDefaultLimitIsNotApplied()
throws Exception {
setUseMultiStageQueryEngine(false);
String query = String.format("SELECT %s FROM %s limit 20", LONG_COLUMN, getTableName());

JsonNode result = postQuery(query).get("resultTable");
JsonNode columnDataTypesNode = result.get("dataSchema").get("columnDataTypes");
assertEquals(columnDataTypesNode.get(0).textValue(), "LONG");

JsonNode rows = result.get("rows");
assertEquals(rows.size(), 20);

for (int rowNum = 0; rowNum < rows.size(); rowNum++) {
JsonNode row = rows.get(rowNum);
assertEquals(row.size(), 1);
assertEquals(row.get(0).asLong(), rowNum);
}
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_RESPONSE_LIMIT, DEFAULT_LIMIT);
}

@Override
public String getTableName() {
return DEFAULT_TABLE_NAME;
}

@Override
public Schema createSchema() {
return new Schema.SchemaBuilder().setSchemaName(getTableName())
.addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
.build();
}

public File createAvroFile()
throws Exception {
org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord("myRecord", null, null, false);
avroSchema.setFields(ImmutableList.of(
new org.apache.avro.Schema.Field(LONG_COLUMN, org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG),
null, null)));

File avroFile = new File(_tempDir, "data.avro");
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
writer.create(avroSchema, avroFile);
for (int i = 0; i < getCountStarResult(); i++) {
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(LONG_COLUMN, i);
writer.append(record);
}
}
return avroFile;
}

@BeforeClass
public void setUp()
throws Exception {
LOGGER.warn("Setting up integration test class: {}", getClass().getSimpleName());
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
LOGGER.warn("Start Kafka in the integration test class");
startKafka();
startController();
startBroker();
startServer();

if (_controllerRequestURLBuilder == null) {
_controllerRequestURLBuilder =
ControllerRequestURLBuilder.baseUrl("http://localhost:" + getControllerPort());
}
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
// create & upload schema AND table config
Schema schema = createSchema();
addSchema(schema);

File avroFile = createAvroFile();
// create offline table
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);

// create & upload segments
ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFile, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);

waitForAllDocsLoaded(60_000);
LOGGER.warn("Finished setting up integration test class: {}", getClass().getSimpleName());
}

@AfterClass
public void tearDown()
throws Exception {
LOGGER.warn("Tearing down integration test class: {}", getClass().getSimpleName());
dropOfflineTable(getTableName());
FileUtils.deleteDirectory(_tempDir);

// Stop Kafka
LOGGER.warn("Stop Kafka in the integration test class");
stopKafka();
// Shutdown the Pinot cluster
stopServer();
stopBroker();
stopController();
stopZk();
FileUtils.deleteDirectory(_tempDir);
LOGGER.warn("Finished tearing down integration test class: {}", getClass().getSimpleName());
}

@Override
public TableConfig createOfflineTableConfig() {
return new TableConfigBuilder(TableType.OFFLINE).setTableName(getTableName()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class SortOperator extends MultiStageOperator {

public SortOperator(OpChainExecutionContext context, MultiStageOperator input, SortNode node) {
this(context, input, node, SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY,
CommonConstants.Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
CommonConstants.Broker.DEFAULT_BROKER_QUERY_LIMIT_OVERRIDE);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ public static class Broker {

public static final String CONFIG_OF_BROKER_QUERY_REWRITER_CLASS_NAMES = "pinot.broker.query.rewriter.class.names";
public static final String CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT = "pinot.broker.query.response.limit";
public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
public static final String CONFIG_OF_BROKER_DEFAULT_QUERY_RESPONSE_LIMIT =
"pinot.broker.default.query.response.limit";
bziobrowski marked this conversation as resolved.
Show resolved Hide resolved

public static final int DEFAULT_BROKER_QUERY_LIMIT_OVERRIDE = Integer.MAX_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the name consistent with config key, and make it next to CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT (i.e. keep line 258 unchanged)

public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = -1; // -1 means no limit
public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE;
public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND =
Expand Down
Loading