Skip to content

Commit

Permalink
RefreshSegmentTask - Support for scheduling adhoc tasks (#14596)
Browse files Browse the repository at this point in the history
  • Loading branch information
vvivekiyer authored Dec 5, 2024
1 parent 1c98d43 commit 3cb3c54
Showing 1 changed file with 75 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,84 +54,97 @@ public String getTaskType() {

@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
String taskType = RefreshSegmentTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager();

int tableNumTasks = 0;

for (TableConfig tableConfig : tableConfigs) {
String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType);

// Get the task configs for the table. This is used to restrict the maximum number of allowed tasks per table at
// any given point.
Map<String, String> taskConfigs;
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig == null) {
LOGGER.warn("Failed to find task config for table: {}", tableNameWithType);
LOGGER.warn("Failed to find task config for table: {}", tableConfig.getTableName());
continue;
}
taskConfigs = tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE);
Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType);
int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
pinotTaskConfigs.addAll(generateTasksForTable(tableConfig, taskConfigs));
}

return pinotTaskConfigs;
}

@Override
public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
throws Exception {
return generateTasksForTable(tableConfig, taskConfigs);
}

private List<PinotTaskConfig> generateTasksForTable(TableConfig tableConfig, Map<String, String> taskConfigs) {
String tableNameWithType = tableConfig.getTableName();
Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType);


String taskType = RefreshSegmentTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager();

LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType);

int tableNumTasks = 0;
int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
if (tableMaxNumTasksConfig != null) {
try {
tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
} catch (Exception e) {
tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType);
}
}

// Get info about table and schema.
Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());

// Get the running segments for a table.
Set<Segment> runningSegments =
TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor);

// Make a single ZK call to get the segments.
List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);

for (SegmentZKMetadata segmentZKMetadata : allSegments) {
// Skip if we have reached the maximum number of permissible tasks per iteration.
if (tableNumTasks >= tableMaxNumTasks) {
break;
}

// Get info about table and schema.
Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());

// Get the running segments for a table.
Set<Segment> runningSegments =
TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor);

// Make a single ZK call to get the segments.
List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);

for (SegmentZKMetadata segmentZKMetadata : allSegments) {
// Skip if we have reached the maximum number of permissible tasks per iteration.
if (tableNumTasks >= tableMaxNumTasks) {
break;
}

// Skip consuming segments.
if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) {
continue;
}

// Skip segments for which a task is already running.
if (runningSegments.contains(new Segment(tableNameWithType, segmentZKMetadata.getSegmentName()))) {
continue;
}

String segmentName = segmentZKMetadata.getSegmentName();

// Skip if the segment is already up-to-date and doesn't have to be refreshed.
if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) {
continue;
}

Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc()));
pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
tableNumTasks++;
// Skip consuming segments.
if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) {
continue;
}

// Skip segments for which a task is already running.
if (runningSegments.contains(new Segment(tableNameWithType, segmentZKMetadata.getSegmentName()))) {
continue;
}

String segmentName = segmentZKMetadata.getSegmentName();

// Skip if the segment is already up-to-date and doesn't have to be refreshed.
if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) {
continue;
}

LOGGER.info("Finished generating {} tasks configs for table: {} " + "for task: {}", tableNumTasks,
tableNameWithType, taskType);
Map<String, String> configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl());
configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc()));
pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
tableNumTasks++;
}

LOGGER.info("Finished generating {} tasks configs for table: {} for task: {}", tableNumTasks, tableNameWithType,
taskType);
return pinotTaskConfigs;
}

Expand Down

0 comments on commit 3cb3c54

Please sign in to comment.