From 3cb3c54630de81682fc0497147b6a6758e688c42 Mon Sep 17 00:00:00 2001 From: Vivek Iyer Vaidyanathan Date: Fri, 6 Dec 2024 05:29:27 +0530 Subject: [PATCH] RefreshSegmentTask - Support for scheduling adhoc tasks (#14596) --- .../RefreshSegmentTaskGenerator.java | 137 ++++++++++-------- 1 file changed, 75 insertions(+), 62 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java index 59e85c1b1e8e..cc4f4d5781eb 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java @@ -54,84 +54,97 @@ public String getTaskType() { @Override public List generateTasks(List tableConfigs) { - String taskType = RefreshSegmentTask.TASK_TYPE; List pinotTaskConfigs = new ArrayList<>(); - PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); - - int tableNumTasks = 0; - for (TableConfig tableConfig : tableConfigs) { - String tableNameWithType = tableConfig.getTableName(); - LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType); - // Get the task configs for the table. This is used to restrict the maximum number of allowed tasks per table at // any given point. Map taskConfigs; TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig(); if (tableTaskConfig == null) { - LOGGER.warn("Failed to find task config for table: {}", tableNameWithType); + LOGGER.warn("Failed to find task config for table: {}", tableConfig.getTableName()); continue; } taskConfigs = tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE); - Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType); - int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; - String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY); - if (tableMaxNumTasksConfig != null) { - try { - tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig); - } catch (Exception e) { - tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; - LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType); - } + pinotTaskConfigs.addAll(generateTasksForTable(tableConfig, taskConfigs)); + } + + return pinotTaskConfigs; + } + + @Override + public List generateTasks(TableConfig tableConfig, Map taskConfigs) + throws Exception { + return generateTasksForTable(tableConfig, taskConfigs); + } + + private List generateTasksForTable(TableConfig tableConfig, Map taskConfigs) { + String tableNameWithType = tableConfig.getTableName(); + Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for Table: %s", tableNameWithType); + + + String taskType = RefreshSegmentTask.TASK_TYPE; + List pinotTaskConfigs = new ArrayList<>(); + PinotHelixResourceManager pinotHelixResourceManager = _clusterInfoAccessor.getPinotHelixResourceManager(); + + LOGGER.info("Start generating RefreshSegment tasks for table: {}", tableNameWithType); + + int tableNumTasks = 0; + int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; + String tableMaxNumTasksConfig = taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY); + if (tableMaxNumTasksConfig != null) { + try { + tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig); + } catch (Exception e) { + tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE; + LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and task {}", tableNameWithType, taskType); + } + } + + // Get info about table and schema. + Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType); + Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); + Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName()); + + // Get the running segments for a table. + Set runningSegments = + TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor); + + // Make a single ZK call to get the segments. + List allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); + + for (SegmentZKMetadata segmentZKMetadata : allSegments) { + // Skip if we have reached the maximum number of permissible tasks per iteration. + if (tableNumTasks >= tableMaxNumTasks) { + break; } - // Get info about table and schema. - Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType); - Schema schema = pinotHelixResourceManager.getSchemaForTableConfig(tableConfig); - Stat schemaStat = pinotHelixResourceManager.getSchemaStat(schema.getSchemaName()); - - // Get the running segments for a table. - Set runningSegments = - TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, _clusterInfoAccessor); - - // Make a single ZK call to get the segments. - List allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType); - - for (SegmentZKMetadata segmentZKMetadata : allSegments) { - // Skip if we have reached the maximum number of permissible tasks per iteration. - if (tableNumTasks >= tableMaxNumTasks) { - break; - } - - // Skip consuming segments. - if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) { - continue; - } - - // Skip segments for which a task is already running. - if (runningSegments.contains(new Segment(tableNameWithType, segmentZKMetadata.getSegmentName()))) { - continue; - } - - String segmentName = segmentZKMetadata.getSegmentName(); - - // Skip if the segment is already up-to-date and doesn't have to be refreshed. - if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) { - continue; - } - - Map configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName))); - configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl()); - configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); - configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc())); - pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs)); - tableNumTasks++; + // Skip consuming segments. + if (tableConfig.getTableType() == TableType.REALTIME && !segmentZKMetadata.getStatus().isCompleted()) { + continue; + } + + // Skip segments for which a task is already running. + if (runningSegments.contains(new Segment(tableNameWithType, segmentZKMetadata.getSegmentName()))) { + continue; + } + + String segmentName = segmentZKMetadata.getSegmentName(); + + // Skip if the segment is already up-to-date and doesn't have to be refreshed. + if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, schemaStat)) { + continue; } - LOGGER.info("Finished generating {} tasks configs for table: {} " + "for task: {}", tableNumTasks, - tableNameWithType, taskType); + Map configs = new HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName))); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, segmentZKMetadata.getDownloadUrl()); + configs.put(MinionConstants.UPLOAD_URL_KEY, _clusterInfoAccessor.getVipUrl() + "/segments"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, String.valueOf(segmentZKMetadata.getCrc())); + pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs)); + tableNumTasks++; } + LOGGER.info("Finished generating {} tasks configs for table: {} for task: {}", tableNumTasks, tableNameWithType, + taskType); return pinotTaskConfigs; }