diff --git a/.github/dependabot.yml b/.github/dependabot.yml
index d135c26b02cc..9719e1a53392 100644
--- a/.github/dependabot.yml
+++ b/.github/dependabot.yml
@@ -24,6 +24,7 @@ updates:
directory: "/"
schedule:
interval: "daily"
+ open-pull-requests-limit: 20
- package-ecosystem: "npm"
directory: "/pinot-controller/src/main/resources"
diff --git a/.github/workflows/build-multi-arch-pinot-docker-image.yml b/.github/workflows/build-multi-arch-pinot-docker-image.yml
index 25460a77bf70..e3314bec9c12 100644
--- a/.github/workflows/build-multi-arch-pinot-docker-image.yml
+++ b/.github/workflows/build-multi-arch-pinot-docker-image.yml
@@ -64,7 +64,7 @@ jobs:
needs: [ generate-build-info ]
steps:
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
@@ -72,7 +72,7 @@ jobs:
name: Set up QEMU
with:
platforms: linux/${{ matrix.arch }}
- - uses: docker/setup-buildx-action@v2
+ - uses: docker/setup-buildx-action@v3
name: Set up Docker Buildx
- uses: actions/checkout@v4
- name: Build and push the Docker image
@@ -91,13 +91,13 @@ jobs:
needs: [ generate-build-info, build-pinot-docker-image ]
steps:
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: docker/setup-qemu-action@v3
name: Set up QEMU
- - uses: docker/setup-buildx-action@v2
+ - uses: docker/setup-buildx-action@v3
name: Set up Docker Buildx
- uses: actions/checkout@v4
- name: Create Multi-Arch Manifest
diff --git a/.github/workflows/build-pinot-base-docker-image.yml b/.github/workflows/build-pinot-base-docker-image.yml
index 36162d49051f..e079fde1c304 100644
--- a/.github/workflows/build-pinot-base-docker-image.yml
+++ b/.github/workflows/build-pinot-base-docker-image.yml
@@ -32,13 +32,13 @@ jobs:
openJdkDist: [ "amazoncorretto", "ms-openjdk" ]
steps:
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: docker/setup-qemu-action@v3
name: Set up QEMU
- - uses: docker/setup-buildx-action@v2
+ - uses: docker/setup-buildx-action@v3
name: Set up Docker Buildx
- uses: actions/checkout@v4
- name: Build and push the Docker image
diff --git a/.github/workflows/build-pinot-docker-image.yml b/.github/workflows/build-pinot-docker-image.yml
index c8652f2775c7..c21e8bbadd6e 100644
--- a/.github/workflows/build-pinot-docker-image.yml
+++ b/.github/workflows/build-pinot-docker-image.yml
@@ -47,13 +47,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: docker/setup-qemu-action@v3
name: Set up QEMU
- - uses: docker/setup-buildx-action@v2
+ - uses: docker/setup-buildx-action@v3
name: Set up Docker Buildx
- uses: actions/checkout@v4
- name: Build and push the Docker image
diff --git a/.github/workflows/build-presto-docker-image.yml b/.github/workflows/build-presto-docker-image.yml
index 6c76671a3e3f..d054074f1551 100644
--- a/.github/workflows/build-presto-docker-image.yml
+++ b/.github/workflows/build-presto-docker-image.yml
@@ -47,13 +47,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: docker/setup-qemu-action@v3
name: Set up QEMU
- - uses: docker/setup-buildx-action@v2
+ - uses: docker/setup-buildx-action@v3
name: Set up Docker Buildx
- uses: actions/checkout@v4
- name: Build and push the Docker image
diff --git a/.github/workflows/build-superset-docker-image.yml b/.github/workflows/build-superset-docker-image.yml
index 19b720905ac3..9cc362d7c502 100644
--- a/.github/workflows/build-superset-docker-image.yml
+++ b/.github/workflows/build-superset-docker-image.yml
@@ -43,13 +43,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Login to DockerHub
- uses: docker/login-action@v2
+ uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- uses: docker/setup-qemu-action@v3
name: Set up QEMU
- - uses: docker/setup-buildx-action@v2
+ - uses: docker/setup-buildx-action@v3
name: Set up Docker Buildx
- uses: actions/checkout@v4
- name: Build and push the Docker image
diff --git a/.github/workflows/pinot_compatibility_tests.yml b/.github/workflows/pinot_compatibility_tests.yml
index f2ff0f95f7f8..e9d242fede91 100644
--- a/.github/workflows/pinot_compatibility_tests.yml
+++ b/.github/workflows/pinot_compatibility_tests.yml
@@ -44,7 +44,7 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: Setup node
- uses: actions/setup-node@v3
+ uses: actions/setup-node@v4
with:
node-version: v10.16.1
cache: 'npm'
diff --git a/.github/workflows/pinot_tests.yml b/.github/workflows/pinot_tests.yml
index a0c4b4861e10..1d90331719c4 100644
--- a/.github/workflows/pinot_tests.yml
+++ b/.github/workflows/pinot_tests.yml
@@ -271,7 +271,7 @@ jobs:
]
name: Pinot Compatibility Regression Testing against ${{ matrix.old_commit }} on ${{ matrix.test_suite }}
steps:
- - uses: actions/checkout@v2
+ - uses: actions/checkout@v4
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
@@ -279,7 +279,7 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: Setup node
- uses: actions/setup-node@v3
+ uses: actions/setup-node@v4
with:
node-version: v10.16.1
cache: 'npm'
diff --git a/.github/workflows/pinot_vuln_check.yml b/.github/workflows/pinot_vuln_check.yml
index 4eebc1fb1074..b4924d3bc848 100644
--- a/.github/workflows/pinot_vuln_check.yml
+++ b/.github/workflows/pinot_vuln_check.yml
@@ -60,6 +60,6 @@ jobs:
severity: 'CRITICAL,HIGH'
timeout: '10m'
- name: Upload Trivy scan results to GitHub Security tab
- uses: github/codeql-action/upload-sarif@v2
+ uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: 'trivy-results.sarif'
diff --git a/.mvn/extensions.xml b/.mvn/extensions.xml
index 009c706a2258..177fc5d6f1e8 100644
--- a/.mvn/extensions.xml
+++ b/.mvn/extensions.xml
@@ -23,11 +23,11 @@
com.gradle
gradle-enterprise-maven-extension
- 1.19.2
+ 1.20.1
com.gradle
common-custom-user-data-maven-extension
- 1.13
+ 2
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 31bae2215aac..b2d4e24d3f32 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -257,7 +257,7 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
- @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception {
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
@@ -307,7 +307,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
- HttpHeaders httpHeaders)
+ @Nullable HttpHeaders httpHeaders)
throws Exception {
LOGGER.debug("SQL query for request {}: {}", requestId, query);
@@ -378,8 +378,9 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
boolean ignoreCase = _tableCache.isIgnoreCase();
String tableName;
try {
- tableName = getActualTableName(
- DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase), _tableCache);
+ tableName =
+ getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase),
+ _tableCache);
} catch (DatabaseConflictException e) {
LOGGER.info("{}. Request {}: {}", e.getMessage(), requestId, query);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
@@ -912,7 +913,7 @@ private String getServerTenant(String tableNameWithType) {
*
Currently only supports subquery within the filter.
*/
private void handleSubquery(PinotQuery pinotQuery, long requestId, JsonNode jsonRequest,
- @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception {
Expression filterExpression = pinotQuery.getFilterExpression();
if (filterExpression != null) {
@@ -928,7 +929,7 @@ private void handleSubquery(PinotQuery pinotQuery, long requestId, JsonNode json
* IN_ID_SET transform function.
*/
private void handleSubquery(Expression expression, long requestId, JsonNode jsonRequest,
- @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception {
Function function = expression.getFunctionCall();
if (function == null) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 99df35a09ffa..4aa7b26eaab4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -39,11 +39,11 @@ public interface BrokerRequestHandler {
void shutDown();
BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
- @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception;
default BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
- RequestContext requestContext, HttpHeaders httpHeaders)
+ RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception {
return handleRequest(request, null, requesterIdentity, requestContext, httpHeaders);
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index fb87f715c185..0da84e6eadc4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -82,7 +82,7 @@ public void shutDown() {
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
- @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders)
throws Exception {
requestContext.setBrokerId(_brokerId);
if (sqlNodeAndOptions == null) {
@@ -96,13 +96,13 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
}
}
if (request.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) {
- sqlNodeAndOptions.setExtraOptions(RequestUtils.getOptionsFromJson(request,
- CommonConstants.Broker.Request.QUERY_OPTIONS));
+ sqlNodeAndOptions.setExtraOptions(
+ RequestUtils.getOptionsFromJson(request, CommonConstants.Broker.Request.QUERY_OPTIONS));
}
- if (_multiStageBrokerRequestHandler != null && Boolean.parseBoolean(sqlNodeAndOptions.getOptions().get(
- CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
- return _multiStageBrokerRequestHandler.handleRequest(request, requesterIdentity, requestContext, httpHeaders);
+ if (_multiStageBrokerRequestHandler != null && Boolean.parseBoolean(
+ sqlNodeAndOptions.getOptions().get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
+ return _multiStageBrokerRequestHandler.handleRequest(request, requesterIdentity, requestContext, httpHeaders);
} else {
return _singleStageBrokerRequestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity,
requestContext, httpHeaders);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index ee9f6cf19a74..01e4884d6a08 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -31,7 +31,6 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
-import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.RequesterIdentity;
@@ -39,6 +38,7 @@
import org.apache.pinot.broker.querylog.QueryLogger;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 6135982e185c..423eb527faee 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -37,7 +35,6 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +65,7 @@ public static List getSegmentPruners(TableConfig tableConfig,
List configuredSegmentPruners = new ArrayList<>(segmentPrunerTypes.size());
for (String segmentPrunerType : segmentPrunerTypes) {
if (RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
- SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
+ SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig);
if (partitionSegmentPruner != null) {
configuredSegmentPruners.add(partitionSegmentPruner);
}
@@ -91,7 +88,7 @@ public static List getSegmentPruners(TableConfig tableConfig,
if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase(
routingTableBuilderName)) || (tableType == TableType.REALTIME
&& LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) {
- SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
+ SegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig);
if (partitionSegmentPruner != null) {
segmentPruners.add(partitionSegmentPruner);
}
@@ -102,8 +99,7 @@ public static List getSegmentPruners(TableConfig tableConfig,
}
@Nullable
- private static SegmentPruner getPartitionSegmentPruner(TableConfig tableConfig,
- ZkHelixPropertyStore propertyStore) {
+ private static SegmentPruner getPartitionSegmentPruner(TableConfig tableConfig) {
String tableNameWithType = tableConfig.getTableName();
SegmentPartitionConfig segmentPartitionConfig = tableConfig.getIndexingConfig().getSegmentPartitionConfig();
if (segmentPartitionConfig == null) {
@@ -137,26 +133,20 @@ private static TimeSegmentPruner getTimeSegmentPruner(TableConfig tableConfig,
LOGGER.warn("Cannot enable time range pruning without time column for table: {}", tableNameWithType);
return null;
}
- return createTimeSegmentPruner(tableConfig, propertyStore);
- }
-
- @VisibleForTesting
- static TimeSegmentPruner createTimeSegmentPruner(TableConfig tableConfig,
- ZkHelixPropertyStore propertyStore) {
- String tableNameWithType = tableConfig.getTableName();
- String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
- Preconditions.checkNotNull(timeColumn, "Time column must be configured in table config for table: %s",
- tableNameWithType);
- Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableNameWithType);
- Preconditions.checkNotNull(schema, "Failed to find schema for table: %s", tableNameWithType);
- DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumn);
- Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s",
- timeColumn, tableNameWithType);
- DateTimeFormatSpec timeFormatSpec = dateTimeSpec.getFormatSpec();
-
- LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with DateTimeFormatSpec: {}",
- timeColumn, tableNameWithType, timeFormatSpec);
- return new TimeSegmentPruner(tableConfig, timeColumn, timeFormatSpec);
+ Schema schema = ZKMetadataProvider.getTableSchema(propertyStore, tableConfig);
+ if (schema == null) {
+ LOGGER.warn("Cannot enable time range pruning without schema for table: {}", tableNameWithType);
+ return null;
+ }
+ DateTimeFieldSpec timeFieldSpec = schema.getSpecForTimeColumn(timeColumn);
+ if (timeFieldSpec == null) {
+ LOGGER.warn("Cannot enable time range pruning without field spec for table: {}, time column: {}",
+ tableNameWithType, timeColumn);
+ return null;
+ }
+ LOGGER.info("Using TimeRangePruner on time column: {} for table: {} with DateTimeFieldSpec: {}", timeColumn,
+ tableNameWithType, timeFieldSpec);
+ return new TimeSegmentPruner(tableConfig, timeFieldSpec);
}
private static List sortSegmentPruners(List pruners) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
index a7ac4fce4bdf..c2e6b20cce54 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -37,7 +38,9 @@
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.Literal;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Query.Range;
@@ -64,10 +67,10 @@ public class TimeSegmentPruner implements SegmentPruner {
private volatile IntervalTree _intervalTree;
private final Map _intervalMap = new HashMap<>();
- public TimeSegmentPruner(TableConfig tableConfig, String timeColumn, DateTimeFormatSpec timeFormatSpec) {
+ public TimeSegmentPruner(TableConfig tableConfig, DateTimeFieldSpec timeFieldSpec) {
_tableNameWithType = tableConfig.getTableName();
- _timeColumn = timeColumn;
- _timeFormatSpec = timeFormatSpec;
+ _timeColumn = timeFieldSpec.getName();
+ _timeFormatSpec = timeFieldSpec.getFormatSpec();
}
@Override
@@ -206,97 +209,53 @@ private List getFilterTimeIntervals(Expression filterExpression) {
} else {
return getComplementSortedIntervals(childIntervals);
}
- case EQUALS: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- return Collections.singletonList(new Interval(timeStamp, timeStamp));
- } else {
- return null;
+ case EQUALS:
+ if (isTimeColumn(operands.get(0))) {
+ long timestamp = toMillisSinceEpoch(operands.get(1));
+ return List.of(new Interval(timestamp, timestamp));
}
- }
- case IN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
+ return null;
+ case IN:
+ if (isTimeColumn(operands.get(0))) {
int numOperands = operands.size();
List intervals = new ArrayList<>(numOperands - 1);
for (int i = 1; i < numOperands; i++) {
- long timeStamp =
- _timeFormatSpec.fromFormatToMillis(operands.get(i).getLiteral().getFieldValue().toString());
- intervals.add(new Interval(timeStamp, timeStamp));
+ long timestamp = toMillisSinceEpoch(operands.get(i));
+ intervals.add(new Interval(timestamp, timestamp));
}
return intervals;
- } else {
- return null;
}
- }
- case GREATER_THAN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- return Collections.singletonList(new Interval(timeStamp + 1, MAX_END_TIME));
- } else {
- return null;
+ return null;
+ case GREATER_THAN:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(toMillisSinceEpoch(operands.get(1)) + 1, MAX_END_TIME);
}
- }
- case GREATER_THAN_OR_EQUAL: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- return Collections.singletonList(new Interval(timeStamp, MAX_END_TIME));
- } else {
- return null;
+ return null;
+ case GREATER_THAN_OR_EQUAL:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(toMillisSinceEpoch(operands.get(1)), MAX_END_TIME);
}
- }
- case LESS_THAN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- if (timeStamp > MIN_START_TIME) {
- return Collections.singletonList(new Interval(MIN_START_TIME, timeStamp - 1));
- } else {
- return Collections.emptyList();
- }
- } else {
- return null;
+ return null;
+ case LESS_THAN:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(MIN_START_TIME, toMillisSinceEpoch(operands.get(1)) - 1);
}
- }
- case LESS_THAN_OR_EQUAL: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long timeStamp = _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- if (timeStamp >= MIN_START_TIME) {
- return Collections.singletonList(new Interval(MIN_START_TIME, timeStamp));
- } else {
- return Collections.emptyList();
- }
- } else {
- return null;
+ return null;
+ case LESS_THAN_OR_EQUAL:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(MIN_START_TIME, toMillisSinceEpoch(operands.get(1)));
}
- }
- case BETWEEN: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
- long startTimestamp =
- _timeFormatSpec.fromFormatToMillis(operands.get(1).getLiteral().getFieldValue().toString());
- long endTimestamp =
- _timeFormatSpec.fromFormatToMillis(operands.get(2).getLiteral().getFieldValue().toString());
- if (endTimestamp >= startTimestamp) {
- return Collections.singletonList(new Interval(startTimestamp, endTimestamp));
- } else {
- return Collections.emptyList();
- }
- } else {
- return null;
+ return null;
+ case BETWEEN:
+ if (isTimeColumn(operands.get(0))) {
+ return getInterval(toMillisSinceEpoch(operands.get(1)), toMillisSinceEpoch(operands.get(2)));
}
- }
- case RANGE: {
- Identifier identifier = operands.get(0).getIdentifier();
- if (identifier != null && identifier.getName().equals(_timeColumn)) {
+ return null;
+ case RANGE:
+ if (isTimeColumn(operands.get(0))) {
return parseInterval(operands.get(1).getLiteral().getFieldValue().toString());
}
return null;
- }
default:
return null;
}
@@ -408,6 +367,17 @@ private List getComplementSortedIntervals(List intervals) {
return res;
}
+ private boolean isTimeColumn(Expression expression) {
+ Identifier identifier = expression.getIdentifier();
+ return identifier != null && identifier.getName().equals(_timeColumn);
+ }
+
+ private long toMillisSinceEpoch(Expression expression) {
+ Literal literal = expression.getLiteral();
+ Preconditions.checkArgument(literal != null, "Literal is required for time column filter, got: %s", expression);
+ return _timeFormatSpec.fromFormatToMillis(literal.getFieldValue().toString());
+ }
+
/**
* Parse interval to millisecond as [min, max] with both sides included.
* E.g. '(* 16311]' is parsed as [0, 16311], '(1455 16311)' is parsed as [1456, 16310]
@@ -432,10 +402,10 @@ private List parseInterval(String rangeString) {
endTime--;
}
}
+ return getInterval(startTime, endTime);
+ }
- if (startTime > endTime) {
- return Collections.emptyList();
- }
- return Collections.singletonList(new Interval(startTime, endTime));
+ private static List getInterval(long inclusiveStart, long inclusiveEnd) {
+ return inclusiveStart <= inclusiveEnd ? List.of(new Interval(inclusiveStart, inclusiveEnd)) : List.of();
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
deleted file mode 100644
index b0fee613222a..000000000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionLevelConsumer;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-
-
-public class FakeStreamConsumerFactory extends StreamConsumerFactory {
- @Override
- public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
- return new FakePartitionLevelConsumer();
- }
- @Override
- public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
- return new FakesStreamMetadataProvider();
- }
-
- @Override
- public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
- return new FakesStreamMetadataProvider();
- }
-
- public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
-
- @Override
- public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
- throws TimeoutException {
- return null;
- }
-
- @Override
- public void close()
- throws IOException {
- }
- }
-
- public class FakesStreamMetadataProvider implements StreamMetadataProvider {
-
- @Override
- public List computePartitionGroupMetadata(String clientId, StreamConfig streamConfig,
- List partitionGroupConsumptionStatuses, int timeoutMillis)
- throws IOException, TimeoutException {
- return Collections.singletonList(new PartitionGroupMetadata(0, new LongMsgOffset(0)));
- }
-
- @Override
- public int fetchPartitionCount(long timeoutMillis) {
- return 1;
- }
-
- @Override
- public Set fetchPartitionIds(long timeoutMillis) {
- return Collections.singleton(0);
- }
-
- @Override
- public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis)
- throws TimeoutException {
- return new LongMsgOffset(0);
- }
-
- @Override
- public void close()
- throws IOException {
- }
- }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 039d7a42053e..b08feaf87822 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -124,7 +124,7 @@ private Map getStreamConfigs() {
streamConfigs.put("stream.kafka.decoder.class.name",
"org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
streamConfigs.put("stream.kafka.consumer.factory.class.name",
- "org.apache.pinot.broker.broker.FakeStreamConsumerFactory");
+ "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
return streamConfigs;
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index feaad35169ba..5e48a981ccc4 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -18,8 +18,7 @@
*/
package org.apache.pinot.broker.routing.segmentpruner;
-import java.util.Arrays;
-import java.util.Collections;
+import java.sql.Timestamp;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -50,11 +49,11 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.mockito.Mockito;
@@ -78,29 +77,45 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String SDF_PATTERN = "yyyyMMdd";
private static final String QUERY_1 = "SELECT * FROM testTable";
- private static final String QUERY_2 = "SELECT * FROM testTable where memberId = 0";
- private static final String QUERY_3 = "SELECT * FROM testTable where memberId IN (1, 2)";
- private static final String QUERY_4 = "SELECT * FROM testTable where memberId = 0 AND memberName='xyz'";
-
- private static final String TIME_QUERY_1 = "SELECT * FROM testTable where timeColumn = 40";
- private static final String TIME_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20 AND 30";
- private static final String TIME_QUERY_3 = "SELECT * FROM testTable where 30 < timeColumn AND timeColumn <= 50";
- private static final String TIME_QUERY_4 = "SELECT * FROM testTable where timeColumn < 15 OR timeColumn > 45";
+ private static final String QUERY_2 = "SELECT * FROM testTable WHERE memberId = 0";
+ private static final String QUERY_3 = "SELECT * FROM testTable WHERE memberId IN (1, 2)";
+ private static final String QUERY_4 = "SELECT * FROM testTable WHERE memberId = 0 AND memberName = 'xyz'";
+
+ private static final String TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn = 40";
+ private static final String TIME_QUERY_2 = "SELECT * FROM testTable WHERE timeColumn BETWEEN 20 AND 30";
+ private static final String TIME_QUERY_3 = "SELECT * FROM testTable WHERE 30 < timeColumn AND timeColumn <= 50";
+ private static final String TIME_QUERY_4 = "SELECT * FROM testTable WHERE timeColumn < 15 OR timeColumn > 45";
private static final String TIME_QUERY_5 =
- "SELECT * FROM testTable where timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)";
- private static final String TIME_QUERY_6 = "SELECT * FROM testTable where timeColumn < 0 AND timeColumn > 0";
+ "SELECT * FROM testTable WHERE timeColumn < 15 OR (60 < timeColumn AND timeColumn < 70)";
+ private static final String TIME_QUERY_6 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30";
+ private static final String TIME_QUERY_7 = "SELECT * FROM testTable WHERE NOT timeColumn > 30";
+ private static final String TIME_QUERY_8 = "SELECT * FROM testTable WHERE timeColumn < 0 AND timeColumn > 0";
- private static final String SDF_QUERY_1 = "SELECT * FROM testTable where timeColumn = 20200131";
- private static final String SDF_QUERY_2 = "SELECT * FROM testTable where timeColumn BETWEEN 20200101 AND 20200331";
+ private static final String SDF_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn = 20200131";
+ private static final String SDF_QUERY_2 = "SELECT * FROM testTable WHERE timeColumn BETWEEN 20200101 AND 20200331";
private static final String SDF_QUERY_3 =
- "SELECT * FROM testTable where 20200430 < timeColumn AND timeColumn < 20200630";
+ "SELECT * FROM testTable WHERE 20200430 < timeColumn AND timeColumn < 20200630";
private static final String SDF_QUERY_4 =
- "SELECT * FROM testTable where timeColumn <= 20200101 OR timeColumn in (20200201, 20200401)";
+ "SELECT * FROM testTable WHERE timeColumn <= 20200101 OR timeColumn IN (20200201, 20200401)";
private static final String SDF_QUERY_5 =
- "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";
-
- private static final String SQL_TIME_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn NOT BETWEEN 20 AND 30";
- private static final String SQL_TIME_QUERY_2 = "SELECT * FROM testTable WHERE NOT timeColumn > 30";
+ "SELECT * FROM testTable WHERE timeColumn IN (20200101, 20200102) AND timeColumn >= 20200530";
+
+ // Timestamp can be passed as string or long
+ private static final String TIMESTAMP_QUERY_1 = "SELECT * FROM testTable WHERE timeColumn = '2020-01-31 00:00:00'";
+ private static final String TIMESTAMP_QUERY_2 = String.format("SELECT * FROM testTable WHERE timeColumn = %d",
+ Timestamp.valueOf("2020-01-31 00:00:00").getTime());
+ private static final String TIMESTAMP_QUERY_3 =
+ "SELECT * FROM testTable WHERE timeColumn BETWEEN '2020-01-01 00:00:00' AND '2020-03-31 00:00:00'";
+ private static final String TIMESTAMP_QUERY_4 =
+ String.format("SELECT * FROM testTable WHERE timeColumn BETWEEN %d AND %d",
+ Timestamp.valueOf("2020-01-01 00:00:00").getTime(), Timestamp.valueOf("2020-03-31 00:00:00").getTime());
+ private static final String TIMESTAMP_QUERY_5 =
+ "SELECT * FROM testTable WHERE timeColumn <= '2020-01-01 00:00:00' OR timeColumn IN ('2020-02-01 00:00:00', "
+ + "'2020-04-01 00:00:00')";
+ private static final String TIMESTAMP_QUERY_6 =
+ String.format("SELECT * FROM testTable WHERE timeColumn <= %d OR timeColumn IN (%d, %d)",
+ Timestamp.valueOf("2020-01-01 00:00:00").getTime(), Timestamp.valueOf("2020-02-01 00:00:00").getTime(),
+ Timestamp.valueOf("2020-04-01 00:00:00").getTime());
// this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
// hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
@@ -127,6 +142,7 @@ public void tearDown() {
@Test
public void testSegmentPrunerFactoryForPartitionPruner() {
TableConfig tableConfig = mock(TableConfig.class);
+ when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
IndexingConfig indexingConfig = mock(IndexingConfig.class);
when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
@@ -141,8 +157,7 @@ public void testSegmentPrunerFactoryForPartitionPruner() {
assertEquals(segmentPruners.size(), 0);
// Segment partition config is missing
- when(routingConfig.getSegmentPrunerTypes()).thenReturn(
- Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
+ when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
assertEquals(segmentPruners.size(), 0);
@@ -189,8 +204,7 @@ public void testSegmentPrunerFactoryForPartitionPruner() {
@Test
public void testSegmentPrunerFactoryForTimeRangePruner() {
TableConfig tableConfig = mock(TableConfig.class);
- when(tableConfig.getTableName()).thenReturn(RAW_TABLE_NAME);
- setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.HOURS);
+ when(tableConfig.getTableName()).thenReturn(OFFLINE_TABLE_NAME);
// Routing config is missing
List segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
@@ -203,8 +217,7 @@ public void testSegmentPrunerFactoryForTimeRangePruner() {
assertEquals(segmentPruners.size(), 0);
// Validation config is missing
- when(routingConfig.getSegmentPrunerTypes()).thenReturn(
- Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
+ when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
assertEquals(segmentPruners.size(), 0);
@@ -214,41 +227,54 @@ public void testSegmentPrunerFactoryForTimeRangePruner() {
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
assertEquals(segmentPruners.size(), 0);
- // Time range pruner should be returned
+ // Schema is missing
when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 0);
+
+ // Field spec is missing
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build();
+ ZKMetadataProvider.setSchema(_propertyStore, schema);
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 0);
+
+ // Time range pruner should be returned
+ schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+ .addDateTimeField(TIME_COLUMN, DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS").build();
+ ZKMetadataProvider.setSchema(_propertyStore, schema);
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner);
}
@Test
- public void testEnablingEmptySegmentPruner() {
+ public void testSegmentPrunerFactoryForEmptySegmentPruner() {
TableConfig tableConfig = mock(TableConfig.class);
+ when(tableConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
IndexingConfig indexingConfig = mock(IndexingConfig.class);
+ when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
RoutingConfig routingConfig = mock(RoutingConfig.class);
- StreamIngestionConfig streamIngestionConfig = mock(StreamIngestionConfig.class);
+ when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
+ ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class);
// When routingConfig is configured with EmptySegmentPruner, EmptySegmentPruner should be returned.
- when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- when(routingConfig.getSegmentPrunerTypes()).thenReturn(
- Collections.singletonList(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
- List segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ when(routingConfig.getSegmentPrunerTypes()).thenReturn(List.of(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
+ List segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// When indexingConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
- when(indexingConfig.getStreamConfigs()).thenReturn(
- Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
- segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
+ StreamIngestionConfig streamIngestionConfig = mock(StreamIngestionConfig.class);
when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(
- Collections.singletonList(Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
- when(indexingConfig.getStreamConfigs()).thenReturn(
- Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
- segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ List.of(Map.of(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
+ when(indexingConfig.getStreamConfigs()).thenReturn(Map.of(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, propertyStore);
assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
}
@@ -259,95 +285,76 @@ public void testPartitionAwareSegmentPruner() {
BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_2);
BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_3);
BrokerRequest brokerRequest4 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_4);
+
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
SinglePartitionColumnSegmentPruner singlePartitionColumnSegmentPruner =
new SinglePartitionColumnSegmentPruner(OFFLINE_TABLE_NAME, PARTITION_COLUMN_1);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME,
- _propertyStore);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(singlePartitionColumnSegmentPruner);
Set onlineSegments = new HashSet<>();
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, Collections.emptySet()),
- Collections.emptySet());
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, Collections.emptySet()),
- Collections.emptySet());
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, Collections.emptySet()),
- Collections.emptySet());
+
+ Set input = Set.of();
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input);
// Segments without metadata (not updated yet) should not be pruned
String newSegment = "newSegment";
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
+ onlineSegments.add(newSegment);
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
+ input = Set.of(newSegment);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input);
// Segments without partition metadata should not be pruned
String segmentWithoutPartitionMetadata = "segmentWithoutPartitionMetadata";
- onlineSegments.add(segmentWithoutPartitionMetadata);
- SegmentZKMetadata segmentZKMetadataWithoutPartitionMetadata =
- new SegmentZKMetadata(segmentWithoutPartitionMetadata);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME,
- segmentZKMetadataWithoutPartitionMetadata);
+ new SegmentZKMetadata(segmentWithoutPartitionMetadata));
+ onlineSegments.add(segmentWithoutPartitionMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1,
- new HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
- Collections.singletonList(segmentWithoutPartitionMetadata));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2,
- new HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
- Collections.singletonList(segmentWithoutPartitionMetadata));
- assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3,
- new HashSet<>(Collections.singletonList(segmentWithoutPartitionMetadata))),
- Collections.singletonList(segmentWithoutPartitionMetadata));
+ input = Set.of(segmentWithoutPartitionMetadata);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input);
// Test different partition functions and number of partitions
// 0 % 5 = 0; 1 % 5 = 1; 2 % 5 = 2
String segment0 = "segment0";
- onlineSegments.add(segment0);
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 5, 0);
+ onlineSegments.add(segment0);
// Murmur(0) % 4 = 0; Murmur(1) % 4 = 3; Murmur(2) % 4 = 0
String segment1 = "segment1";
- onlineSegments.add(segment1);
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment1, "Murmur", 4, 0);
+ onlineSegments.add(segment1);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
+ input = Set.of(segment0, segment1);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), Set.of(segment1));
// Update partition metadata without refreshing should have no effect
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment0, "Modulo", 4, 1);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), Set.of(segment1));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), input);
// Refresh the changed segment should update the segment pruner
segmentZkMetadataFetcher.refreshSegment(segment0);
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
- assertEquals(
- singlePartitionColumnSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest2, input), Set.of(segment1));
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(singlePartitionColumnSegmentPruner.prune(brokerRequest4, input), Set.of(segment1));
// Multi-column partitioned segment.
MultiPartitionColumnsSegmentPruner multiPartitionColumnsSegmentPruner =
@@ -356,38 +363,25 @@ public void testPartitionAwareSegmentPruner() {
segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(OFFLINE_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(multiPartitionColumnsSegmentPruner);
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, Collections.emptySet()),
- Collections.emptySet());
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, Collections.emptySet()),
- Collections.emptySet());
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, Collections.emptySet()),
- Collections.emptySet());
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, Collections.emptySet()),
- Collections.emptySet());
+
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, input), Set.of(segment1));
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, input), Set.of(segment1));
String segment2 = "segment2";
- onlineSegments.add(segment2);
Map columnPartitionMetadataMap = new HashMap<>();
- columnPartitionMetadataMap.put(PARTITION_COLUMN_1,
- new ColumnPartitionMetadata("Modulo", 4, Collections.singleton(0), null));
- Map partitionColumn2FunctionConfig = new HashMap<>();
- partitionColumn2FunctionConfig.put("columnValues", "xyz|abc");
- partitionColumn2FunctionConfig.put("columnValuesDelimiter", "|");
- columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new ColumnPartitionMetadata(
- "BoundedColumnValue", 3, Collections.singleton(1), partitionColumn2FunctionConfig));
+ columnPartitionMetadataMap.put(PARTITION_COLUMN_1, new ColumnPartitionMetadata("Modulo", 4, Set.of(0), null));
+ columnPartitionMetadataMap.put(PARTITION_COLUMN_2, new ColumnPartitionMetadata("BoundedColumnValue", 3, Set.of(1),
+ Map.of("columnValues", "xyz|abc", "columnValuesDelimiter", "|")));
setSegmentZKPartitionMetadata(OFFLINE_TABLE_NAME, segment2, columnPartitionMetadataMap);
+ onlineSegments.add(segment2);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(
- multiPartitionColumnsSegmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(
- multiPartitionColumnsSegmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment1)));
- assertEquals(
- multiPartitionColumnsSegmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4,
- new HashSet<>(Arrays.asList(segment0, segment1, segment2))), new HashSet<>(Arrays.asList(segment1, segment2)));
+ input = Set.of(segment0, segment1, segment2);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest2, input), Set.of(segment1, segment2));
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1));
+ assertEquals(multiPartitionColumnsSegmentPruner.prune(brokerRequest4, input), Set.of(segment1, segment2));
}
@Test
@@ -399,143 +393,112 @@ public void testTimeSegmentPruner() {
BrokerRequest brokerRequest5 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_4);
BrokerRequest brokerRequest6 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_5);
BrokerRequest brokerRequest7 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_6);
+ BrokerRequest brokerRequest8 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_7);
+ BrokerRequest brokerRequest9 = CalciteSqlCompiler.compileToBrokerRequest(TIME_QUERY_8);
+
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
- setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
- TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig,
- _propertyStore);
- Set onlineSegments = new HashSet<>();
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ DateTimeFieldSpec timeFieldSpec = new DateTimeFieldSpec(TIME_COLUMN, DataType.INT, "EPOCH|DAYS", "1:DAYS");
+ TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, timeFieldSpec);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
+ Set onlineSegments = new HashSet<>();
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest4, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest5, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest6, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest7, Collections.emptySet()), Collections.emptySet());
-
- // Initialize with non-empty onlineSegments
+
+ Set input = Set.of();
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), input);
+
// Segments without metadata (not updated yet) should not be pruned
- segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
- segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
- segmentZkMetadataFetcher.register(segmentPruner);
String newSegment = "newSegment";
onlineSegments.add(newSegment);
- segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest4, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest5, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest6, Collections.singleton(newSegment)),
- Collections.singletonList(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest7, Collections.singleton(newSegment)),
- Collections.emptySet()); // query with invalid range will always have empty filtered result
+ segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
+ input = Set.of(newSegment);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range
// Segments without time range metadata should not be pruned
String segmentWithoutTimeRangeMetadata = "segmentWithoutTimeRangeMetadata";
- onlineSegments.add(segmentWithoutTimeRangeMetadata);
SegmentZKMetadata segmentZKMetadataWithoutTimeRangeMetadata =
new SegmentZKMetadata(segmentWithoutTimeRangeMetadata);
- segmentZKMetadataWithoutTimeRangeMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME,
segmentZKMetadataWithoutTimeRangeMetadata);
+ onlineSegments.add(segmentWithoutTimeRangeMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(
- segmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest2, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest3, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest4, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest5, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest6, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.singletonList(segmentWithoutTimeRangeMetadata));
- assertEquals(
- segmentPruner.prune(brokerRequest7, new HashSet<>(Collections.singletonList(segmentWithoutTimeRangeMetadata))),
- Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest7, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range
// Test different time range
String segment0 = "segment0";
- onlineSegments.add(segment0);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS);
-
+ onlineSegments.add(segment0);
String segment1 = "segment1";
- onlineSegments.add(segment1);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS);
-
+ onlineSegments.add(segment1);
String segment2 = "segment2";
- onlineSegments.add(segment2);
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS);
-
+ onlineSegments.add(segment2);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(segmentPruner.prune(brokerRequest4, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest5, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest6, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest7, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.emptySet());
+ input = Set.of(segment0, segment1, segment2);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0, segment1));
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range
// Update metadata without external view change or refreshing should have no effect
setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30, TimeUnit.DAYS);
- assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(segmentPruner.prune(brokerRequest4, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest5, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest6, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest7, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0, segment2));
+ assertEquals(segmentPruner.prune(brokerRequest8, input), Set.of(segment0, segment1));
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range
// Refresh the changed segment should update the segment pruner
segmentZkMetadataFetcher.refreshSegment(segment2);
- assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest4, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest5, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest6, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest7, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest6, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest7, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest8, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest9, input), Set.of()); // Query with invalid range
}
@Test
@@ -545,215 +508,175 @@ public void testTimeSegmentPrunerSimpleDateFormat() {
BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_3);
BrokerRequest brokerRequest4 = CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_4);
BrokerRequest brokerRequest5 = CalciteSqlCompiler.compileToBrokerRequest(SDF_QUERY_5);
+
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
- setSchemaDateTimeFieldSpecSDF(RAW_TABLE_NAME, SDF_PATTERN);
-
- TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ DateTimeFieldSpec timeFieldSpec =
+ new DateTimeFieldSpec(TIME_COLUMN, DataType.STRING, "SIMPLE_DATE_FORMAT|" + SDF_PATTERN, "1:DAYS");
+ TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, timeFieldSpec);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
- Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, RAW_TABLE_NAME);
- DateTimeFormatSpec dateTimeFormatSpec = schema.getSpecForTimeColumn(TIME_COLUMN).getFormatSpec();
-
+ DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec();
Set onlineSegments = new HashSet<>();
String segment0 = "segment0";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, timeFormatSpec.fromFormatToMillis("20200101"),
+ timeFormatSpec.fromFormatToMillis("20200228"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment0);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, dateTimeFormatSpec.fromFormatToMillis("20200101"),
- dateTimeFormatSpec.fromFormatToMillis("20200228"), TimeUnit.MILLISECONDS);
-
String segment1 = "segment1";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, timeFormatSpec.fromFormatToMillis("20200201"),
+ timeFormatSpec.fromFormatToMillis("20200530"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment1);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, dateTimeFormatSpec.fromFormatToMillis("20200201"),
- dateTimeFormatSpec.fromFormatToMillis("20200530"), TimeUnit.MILLISECONDS);
-
String segment2 = "segment2";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, timeFormatSpec.fromFormatToMillis("20200401"),
+ timeFormatSpec.fromFormatToMillis("20200430"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment2);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, dateTimeFormatSpec.fromFormatToMillis("20200401"),
- dateTimeFormatSpec.fromFormatToMillis("20200430"), TimeUnit.MILLISECONDS);
-
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Collections.singleton(segment0));
- assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1)));
- assertEquals(segmentPruner.prune(brokerRequest3, onlineSegments), Collections.singleton(segment1));
- assertEquals(segmentPruner.prune(brokerRequest4, onlineSegments),
- new HashSet<>(Arrays.asList(segment0, segment1, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet());
+
+ Set input = Set.of(segment0, segment1, segment2);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0, segment1));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest5, input), Set.of());
}
@Test
- public void testTimeSegmentPrunerSql() {
- BrokerRequest brokerRequest1 = CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_1);
- BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(SQL_TIME_QUERY_2);
+ public void testTimeSegmentPrunerTimestampFormat() {
+ BrokerRequest brokerRequest1 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_1);
+ BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_2);
+ BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_3);
+ BrokerRequest brokerRequest4 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_4);
+ BrokerRequest brokerRequest5 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_5);
+ BrokerRequest brokerRequest6 = CalciteSqlCompiler.compileToBrokerRequest(TIMESTAMP_QUERY_6);
+
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
- setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.DAYS);
-
- TimeSegmentPruner segmentPruner = SegmentPrunerFactory.createTimeSegmentPruner(tableConfig, _propertyStore);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN).build();
+ // Intentionally put EPOCH as the format which Pinot should handle
+ DateTimeFieldSpec timeFieldSpec =
+ new DateTimeFieldSpec(TIME_COLUMN, DataType.TIMESTAMP, "EPOCH|MILLISECONDS", "1:DAYS");
+ TimeSegmentPruner segmentPruner = new TimeSegmentPruner(tableConfig, timeFieldSpec);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
+ DateTimeFormatSpec timeFormatSpec = timeFieldSpec.getFormatSpec();
Set onlineSegments = new HashSet<>();
String segment0 = "segment0";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0,
+ timeFormatSpec.fromFormatToMillis("2020-01-01 00:00:00"),
+ timeFormatSpec.fromFormatToMillis("2020-02-28 00:00:00"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment0);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment0, 10, 60, TimeUnit.DAYS);
String segment1 = "segment1";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1,
+ timeFormatSpec.fromFormatToMillis("2020-02-01 00:00:00"),
+ timeFormatSpec.fromFormatToMillis("2020-05-30 00:00:00"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment1);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment1, 20, 30, TimeUnit.DAYS);
String segment2 = "segment2";
+ setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2,
+ timeFormatSpec.fromFormatToMillis("2020-04-01 00:00:00"),
+ timeFormatSpec.fromFormatToMillis("2020-04-30 00:00:00"), TimeUnit.MILLISECONDS);
onlineSegments.add(segment2);
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 50, 65, TimeUnit.DAYS);
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, onlineSegments), new HashSet<>(Arrays.asList(segment0, segment1)));
+ Set input = Set.of(segment0, segment1, segment2);
+ assertEquals(segmentPruner.prune(brokerRequest1, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest2, input), Set.of(segment0));
+ assertEquals(segmentPruner.prune(brokerRequest3, input), Set.of(segment0, segment1));
+ assertEquals(segmentPruner.prune(brokerRequest4, input), Set.of(segment0, segment1));
+ assertEquals(segmentPruner.prune(brokerRequest5, input), input);
+ assertEquals(segmentPruner.prune(brokerRequest6, input), input);
}
@Test
public void testEmptySegmentPruner() {
BrokerRequest brokerRequest1 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_1);
- BrokerRequest brokerRequest2 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_2);
- BrokerRequest brokerRequest3 = CalciteSqlCompiler.compileToBrokerRequest(QUERY_3);
+
// NOTE: Ideal state and external view are not used in the current implementation
IdealState idealState = Mockito.mock(IdealState.class);
ExternalView externalView = Mockito.mock(ExternalView.class);
- TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
- // init with list of segments
+ // Init with a list of segments
EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig);
- SegmentZkMetadataFetcher segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME,
- _propertyStore);
+ SegmentZkMetadataFetcher segmentZkMetadataFetcher =
+ new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
Set onlineSegments = new HashSet<>();
String segment0 = "segment0";
- onlineSegments.add(segment0);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10);
+ onlineSegments.add(segment0);
String segment1 = "segment1";
- onlineSegments.add(segment1);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
+ onlineSegments.add(segment1);
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))),
- new HashSet<>(Collections.singletonList(segment0)));
-
- // init with empty list of segments
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0));
+
+ // Init with no segment
segmentPruner = new EmptySegmentPruner(tableConfig);
segmentZkMetadataFetcher = new SegmentZkMetadataFetcher(REALTIME_TABLE_NAME, _propertyStore);
segmentZkMetadataFetcher.register(segmentPruner);
onlineSegments.clear();
segmentZkMetadataFetcher.init(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet());
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments);
// Segments without metadata (not updated yet) should not be pruned
String newSegment = "newSegment";
onlineSegments.add(newSegment);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(newSegment)),
- Collections.singleton(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(newSegment)),
- Collections.singleton(newSegment));
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(newSegment)),
- Collections.singleton(newSegment));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments);
// Segments without totalDocs metadata should not be pruned
- onlineSegments.clear();
String segmentWithoutTotalDocsMetadata = "segmentWithoutTotalDocsMetadata";
- onlineSegments.add(segmentWithoutTotalDocsMetadata);
SegmentZKMetadata segmentZKMetadataWithoutTotalDocsMetadata =
new SegmentZKMetadata(segmentWithoutTotalDocsMetadata);
- segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME,
segmentZKMetadataWithoutTotalDocsMetadata);
+ onlineSegments.add(segmentWithoutTotalDocsMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(segmentWithoutTotalDocsMetadata)),
- Collections.singleton(segmentWithoutTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(segmentWithoutTotalDocsMetadata)),
- Collections.singleton(segmentWithoutTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(segmentWithoutTotalDocsMetadata)),
- Collections.singleton(segmentWithoutTotalDocsMetadata));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments);
// Segments with -1 totalDocs should not be pruned
- onlineSegments.clear();
String segmentWithNegativeTotalDocsMetadata = "segmentWithNegativeTotalDocsMetadata";
- onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segmentWithNegativeTotalDocsMetadata, -1);
+ onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
- Collections.singleton(segmentWithNegativeTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest2, Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
- Collections.singleton(segmentWithNegativeTotalDocsMetadata));
- assertEquals(segmentPruner.prune(brokerRequest3, Collections.singleton(segmentWithNegativeTotalDocsMetadata)),
- Collections.singleton(segmentWithNegativeTotalDocsMetadata));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), onlineSegments);
// Prune segments with 0 total docs
onlineSegments.clear();
- onlineSegments.add(segment0);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment0, 10);
- onlineSegments.add(segment1);
+ onlineSegments.add(segment0);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment1, 0);
+ onlineSegments.add(segment1);
String segment2 = "segment2";
- onlineSegments.add(segment2);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, -1);
-
+ onlineSegments.add(segment2);
segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, onlineSegments);
- assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0, segment2));
// Update metadata without external view change or refreshing should have no effect
- setSegmentZKTimeRangeMetadata(REALTIME_TABLE_NAME, segment2, 20, 30, TimeUnit.DAYS);
setSegmentZKTotalDocsMetadata(REALTIME_TABLE_NAME, segment2, 0);
- assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
- assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0, segment2));
// Refresh the changed segment should update the segment pruner
segmentZkMetadataFetcher.refreshSegment(segment2);
- assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Collections.singletonList(segment0)));
- assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
- new HashSet<>(Collections.singletonList(segment0)));
- }
-
- private TableConfig getTableConfig(String rawTableName, TableType type) {
- return new TableConfigBuilder(type).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN).build();
- }
-
- private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit timeUnit) {
- ZKMetadataProvider.setSchema(_propertyStore, new Schema.SchemaBuilder().setSchemaName(rawTableName)
- .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:" + timeUnit + ":EPOCH", "1:" + timeUnit).build());
- }
-
- private void setSchemaDateTimeFieldSpecSDF(String rawTableName, String format) {
- ZKMetadataProvider.setSchema(_propertyStore, new Schema.SchemaBuilder().setSchemaName(rawTableName)
- .addDateTime(TIME_COLUMN, FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:" + format, "1:DAYS").build());
+ assertEquals(segmentPruner.prune(brokerRequest1, onlineSegments), Set.of(segment0));
}
private void setSegmentZKPartitionMetadata(String tableNameWithType, String segment, String partitionFunction,
int numPartitions, int partitionId) {
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segment);
- segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN_1,
- new ColumnPartitionMetadata(partitionFunction, numPartitions, Collections.singleton(partitionId), null))));
+ segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Map.of(PARTITION_COLUMN_1,
+ new ColumnPartitionMetadata(partitionFunction, numPartitions, Set.of(partitionId), null))));
ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata);
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 45f34803a036..f0a1fdd1365c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -38,6 +38,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS("seconds", false),
LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS("seconds", false),
REALTIME_OFFHEAP_MEMORY_USED("bytes", false),
+ REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN("bytes", false),
REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false),
LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
// Upsert metrics
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 02005a3814be..ed9769a68e61 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -40,6 +40,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
INVALID_REALTIME_ROWS_DROPPED("rows", false),
INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
+ REALTIME_MERGED_TEXT_IDX_TRUNCATED_DOCUMENT_SIZE("bytes", false),
REALTIME_OFFSET_COMMITS("commits", true),
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
index 3260ced7b362..6e44f55ae168 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
@@ -52,6 +52,9 @@ public class MinionClient {
private static final String ACCEPT = "accept";
private static final String APPLICATION_JSON = "application/json";
private static final String HTTP = "http";
+ private static final TypeReference