diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java index 906356fba9c9..ac92b5ba5684 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotControllerPeriodicTaskRestletResource.java @@ -26,8 +26,10 @@ import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; import java.util.List; +import java.util.Map; import javax.inject.Inject; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; @@ -103,6 +105,43 @@ public Response runPeriodicTask( .build(); } + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("/run") + @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.EXECUTE_TASK) + @ApiOperation(value = "Run periodic task against table with custom properties. If table name is missing, task will " + + "run against all tables.") + public Response runPeriodicTaskWithProperties( + @ApiParam(value = "Periodic task name", required = true) @QueryParam("taskname") String periodicTaskName, + @ApiParam(value = "Name of the table") @QueryParam("tableName") String tableName, + @ApiParam(value = "OFFLINE | REALTIME") @QueryParam("type") String tableType, + @ApiParam(value = "Task properties") Map taskProperties, + @Context HttpHeaders headers) { + + if (!_periodicTaskScheduler.hasTask(periodicTaskName)) { + throw new WebApplicationException("Periodic task '" + periodicTaskName + "' not found.", + Response.Status.NOT_FOUND); + } + + if (tableName != null) { + tableName = DatabaseUtils.translateTableName(tableName, headers); + List matchingTableNamesWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, + Constants.validateTableType(tableType), LOGGER); + + if (matchingTableNamesWithType.size() > 1) { + throw new WebApplicationException( + "More than one table matches Table '" + tableName + "'. Matching names: " + matchingTableNamesWithType); + } + + tableName = matchingTableNamesWithType.get(0); + } + + return Response.ok() + .entity(_pinotHelixResourceManager.invokeControllerPeriodicTask(tableName, periodicTaskName, taskProperties)) + .build(); + } + @GET @Produces(MediaType.APPLICATION_JSON) @Path("/names") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index dbe229ebc9da..72b763824b4e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -60,6 +61,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask= _segmentLevelValidationIntervalInSeconds) { + if (shouldRunSegmentValidation(periodicTaskProperties, currentTimeMs)) { LOGGER.info("Run segment-level validation"); context._runSegmentLevelValidation = true; _lastSegmentLevelValidationRunTimeMs = currentTimeMs; @@ -177,6 +178,24 @@ private void runSegmentLevelValidation(TableConfig tableConfig) { } } + private boolean shouldRunSegmentValidation(Properties periodicTaskProperties, long currentTimeMs) { + boolean runValidation = Optional.ofNullable( + periodicTaskProperties.getProperty(RUN_SEGMENT_LEVEL_VALIDATION)) + .map(value -> { + try { + return Boolean.parseBoolean(value); + } catch (Exception e) { + return false; + } + }) + .orElse(false); + + boolean timeThresholdMet = TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationRunTimeMs) + >= _segmentLevelValidationIntervalInSeconds; + + return runValidation || timeThresholdMet; + } + @Override protected void nonLeaderCleanup(List tableNamesWithType) { for (String tableNameWithType : tableNamesWithType) {